# DATA  WAREHOUSE FOR SALES ANALYSIS

Program ini dibuat untuk menyusun Data Warehouse untuk kebutuhan analisis perusahaan terkait dengan penjualan.

In [1]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder\
    .appName("WriteToPostgres")\
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0")\
    .getOrCreate()

# Create Dataframe from Each Table

## Fact Transaction (fact_transaction)

In [2]:
fact_transaction = spark.read.csv('fact_transaction.csv', header=True, inferSchema=True)
fact_transaction.show()

+--------------+--------+-------+----------+--------+------------------+------------------+--------------------+------------------+----------------+----------+------------+-----------+-----------+
|transaction_id|order_id|user_id|product_id| date_id|        sale_price|              cost|              profit|transaction_status|is_order_created|is_shipped|is_delivered|is_returned|num_of_item|
+--------------+--------+-------+----------+--------+------------------+------------------+--------------------+------------------+----------------+----------+------------+-----------+-----------+
|        177152|  122160|  98069|     14235|20250412|0.0199999995529651|0.0082999997779726|0.011699999774992502|         Cancelled|            true|     false|       false|      false|          1|
|         28849|   19870|  15771|     14235|20240529|0.0199999995529651|0.0082999997779726|0.011699999774992502|          Complete|            true|      true|        true|      false|          4|
|         27428

## Dimension Product (dim_product)

In [3]:
dim_product = spark.read.csv('dim_product.csv', header=True, inferSchema=True)
dim_product.show()

+-----------+--------------------+-----+-----------+----------+------------------+------------------+
|product_key|                name|brand|   category|department|      retail_price|              cost|
+-----------+--------------------+-----+-----------+----------+------------------+------------------+
|      13842|Low Profile Dyed ...|   MG|Accessories|     Women|              6.25| 2.518749990849756|
|      13928|Low Profile Dyed ...|   MG|Accessories|     Women| 5.949999809265137|2.3383499148894105|
|      14115|Enzyme Regular So...|   MG|Accessories|     Women|10.989999771118164| 4.879559879379869|
|      14157|Enzyme Regular So...|   MG|Accessories|     Women|10.989999771118164| 4.648769887297898|
|      14273|Washed Canvas Ivy...|   MG|Accessories|     Women|15.989999771118164| 6.507929886473045|
|      15674|Low Profile Dyed ...|   MG|       Plus|     Women|              6.25|3.1062499998370185|
|      15816|Low Profile Dyed ...|   MG|       Plus|     Women| 5.949999809265137|

## Dimension User (dim_user)

In [4]:
dim_user = spark.read.csv('dim_user.csv', header=True, inferSchema=True)
dim_user.show()

+--------+----------------+---+------+-----+--------+-------+--------------+
|user_key|   customer_name|age|gender|state|    city|country|traffic_source|
+--------+----------------+---+------+-----+--------+-------+--------------+
|   25075|Clifford Johnson| 36|     M| Acre|    null| Brasil|        Search|
|   97806|    Angela Lopez| 50|     F| Acre|    null| Brasil|        Search|
|   41816|    Susan Kelley| 55|     F| Acre|    null| Brasil|        Search|
|   82351|Jacqueline Zhang| 62|     F| Acre|    null| Brasil|        Search|
|   70916|    Marie Parker| 66|     F| Acre|    null| Brasil|        Search|
|   37563|      Daisy Rios| 18|     F| Acre|    null| Brasil|       Organic|
|   85698|   Robert Wright| 45|     M| Acre|    null| Brasil|        Search|
|   90954| Jennifer Rivera| 23|     F| Acre|    null| Brasil|        Search|
|   29803| Allison Wheeler| 51|     F| Acre|    null| Brasil|       Organic|
|   42991|      Sean Combs| 26|     M| Acre|    null| Brasil|        Search|

## Dimension Date (dim_date)

In [5]:
dim_date = spark.read.csv('dim_date.csv', header=True, inferSchema=True)
dim_date.show()

+--------+----------+----+-------+-----+----+---+----------+---------+----------+
|date_key| full_date|year|quarter|month|week|day|month_name| day_name|is_weekend|
+--------+----------+----+-------+-----+----+---+----------+---------+----------+
|20190110|2019-01-10|2019|      1|    1|   1| 10|   January| Thursday|     false|
|20190111|2019-01-11|2019|      1|    1|   1| 11|   January|   Friday|     false|
|20190117|2019-01-17|2019|      1|    1|   2| 17|   January| Thursday|     false|
|20190118|2019-01-18|2019|      1|    1|   2| 18|   January|   Friday|     false|
|20190122|2019-01-22|2019|      1|    1|   3| 22|   January|  Tuesday|     false|
|20190123|2019-01-23|2019|      1|    1|   3| 23|   January|Wednesday|     false|
|20190124|2019-01-24|2019|      1|    1|   3| 24|   January| Thursday|     false|
|20190127|2019-01-27|2019|      1|    1|   4| 27|   January|   Sunday|      true|
|20190128|2019-01-28|2019|      1|    1|   4| 28|   January|   Monday|     false|
|20190129|2019-0

# Simple Data Exploration

In [6]:
fact_transaction.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- date_id: integer (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- cost: double (nullable = true)
 |-- profit: double (nullable = true)
 |-- transaction_status: string (nullable = true)
 |-- is_order_created: boolean (nullable = true)
 |-- is_shipped: boolean (nullable = true)
 |-- is_delivered: boolean (nullable = true)
 |-- is_returned: boolean (nullable = true)
 |-- num_of_item: integer (nullable = true)



In [7]:
fact_transaction.describe().show()

+-------+-----------------+------------------+-----------------+------------------+-------------------+------------------+------------------+--------------------+------------------+------------------+
|summary|   transaction_id|          order_id|          user_id|        product_id|            date_id|        sale_price|              cost|              profit|transaction_status|       num_of_item|
+-------+-----------------+------------------+-----------------+------------------+-------------------+------------------+------------------+--------------------+------------------+------------------+
|  count|           180753|            180753|           180753|            180753|             180753|            180753|            180753|              180753|            180753|            180753|
|   mean|          90377.0| 62346.73523537645|49965.45678356652|15247.829800888505|2.023395831183438E7| 59.56965505978409|28.638876275239355|  30.930778784544685|              NULL| 1.900488511947

In [8]:
dim_product.printSchema()

root
 |-- product_key: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: string (nullable = true)
 |-- department: string (nullable = true)
 |-- retail_price: double (nullable = true)
 |-- cost: double (nullable = true)



In [9]:
dim_product.describe().show()

+-------+-----------------+--------------------+---------+-----------+----------+------------------+------------------+
|summary|      product_key|                name|    brand|   category|department|      retail_price|              cost|
+-------+-----------------+--------------------+---------+-----------+----------+------------------+------------------+
|  count|            29120|               29118|    29096|      29120|     29120|             29120|             29120|
|   mean|          14560.5|                NULL| Infinity|       NULL|      NULL|59.220163865731955| 28.48177424043702|
| stddev|8406.364255729119|                NULL|      NaN|       NULL|      NULL| 65.88892669575863|30.624681214377663|
|    min|                1|!iT Jeans Junior'...|!it Jeans|Accessories|       Men|0.0199999995529651|0.0082999997779726|
|    max|            29120|Â Â Exclusive Haw...|wear ease|  Underwear|     Women|             999.0| 557.1510021798313|
+-------+-----------------+-------------

In [10]:
dim_user.printSchema()

root
 |-- user_key: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- traffic_source: string (nullable = true)



In [11]:
dim_user.describe().show()

+-------+-----------------+-------------+-----------------+------+-------------+-----------+-------------+--------------+
|summary|         user_key|customer_name|              age|gender|        state|       city|      country|traffic_source|
+-------+-----------------+-------------+-----------------+------+-------------+-----------+-------------+--------------+
|  count|           100000|       100000|           100000|100000|       100000|     100000|       100000|        100000|
|   mean|          50000.5|         NULL|         41.01956|  NULL|         NULL|       NULL|         NULL|          NULL|
| stddev|28867.65779668779|         NULL|17.07791480123204|  NULL|         NULL|       NULL|         NULL|          NULL|
|    min|                1|  Aaron Adams|               12|     F|         Acre|A Ramallosa|    Australia|       Display|
|    max|           100000| Zoe Williams|               70|     M|Île-de-France|      海 名|United States|        Search|
+-------+-----------------

In [12]:
dim_date.printSchema()

root
 |-- date_key: integer (nullable = true)
 |-- full_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month_name: string (nullable = true)
 |-- day_name: string (nullable = true)
 |-- is_weekend: boolean (nullable = true)



In [13]:
dim_date.describe().show()

+-------+--------------------+------------------+------------------+------------------+------------------+------------------+----------+---------+
|summary|            date_key|              year|           quarter|             month|              week|               day|month_name| day_name|
+-------+--------------------+------------------+------------------+------------------+------------------+------------------+----------+---------+
|  count|                2286|              2286|              2286|              2286|              2286|              2286|      2286|     2286|
|   mean|2.0218042123797026E7| 2021.738845144357| 2.462379702537183| 6.379702537182852|25.537620297462816|15.702099737532809|      NULL|     NULL|
| stddev|   18229.85144329581|1.8277092298728972|1.1207735400039358|3.4550200180380863| 15.09877420002332| 8.796561142317465|      NULL|     NULL|
|    min|            20190110|              2019|                 1|                 1|                 0|            

# Data Cleaning and Transformation

## Data Type Format Review
Based on the analysis, the data types of all attributes in each DataFrame are already appropriate and correctly assigned. Therefore, no data type conversion or modification is necessary. The current format is suitable for further data processing and analysis.

## Handling Missing Value

In [14]:
# Function Checking Missing Values

from pyspark.sql.functions import col, isnan, when, count
from pyspark.sql.types import DoubleType, FloatType

def check_missing_values(df, df_name):
    print(f"Missing values in '{df_name}':")

    exprs = []
    for c in df.columns:
        dtype = dict(df.dtypes)[c]
        if dtype in ['double', 'float']:
            exprs.append(count(when(col(c).isNull() | isnan(col(c)), c)).alias(c))
        else:
            exprs.append(count(when(col(c).isNull(), c)).alias(c))
    
    df.select(exprs).show()

In [15]:
check_missing_values(fact_transaction, "fact_transaction")
check_missing_values(dim_product, "dim_product")
check_missing_values(dim_user, "dim_user")
check_missing_values(dim_date, "dim_date")

Missing values in 'fact_transaction':
+--------------+--------+-------+----------+-------+----------+----+------+------------------+----------------+----------+------------+-----------+-----------+
|transaction_id|order_id|user_id|product_id|date_id|sale_price|cost|profit|transaction_status|is_order_created|is_shipped|is_delivered|is_returned|num_of_item|
+--------------+--------+-------+----------+-------+----------+----+------+------------------+----------------+----------+------------+-----------+-----------+
|             0|       0|      0|         0|      0|         0|   0|     0|                 0|               0|         0|           0|          0|          0|
+--------------+--------+-------+----------+-------+----------+----+------+------------------+----------------+----------+------------+-----------+-----------+

Missing values in 'dim_product':
+-----------+----+-----+--------+----------+------------+----+
|product_key|name|brand|category|department|retail_price|cost|
+-

Explanation
1. **fact_transaction**: In the fact_transaction, there are no missing values. The data can be directly used for further analysis or transformation without risk of data quality issues related to incompleteness.

2. **dim_product**: In the dim_product, there are missing values that require proper handling such as imputation or drop columns. To determine the percentage of missing data in the products DataFrame and decide the appropriate handling method (such as imputation or drop column), we can calculate the percentage of missing values for each column. This will help us understand the severity of missing data and guide us in deciding whether to impute values, drop columns, or perform other data preprocessing steps.

3. **dim_user**:In the dim_user, there are no missing values. The data can be directly used for further analysis or transformation without risk of data quality issues related to incompleteness.

4. **dim_date**: In the dim_date, there are no missing values. The data can be directly used for further analysis or transformation without risk of data quality issues related to incompleteness.

In [16]:
# Show missing values as percentage
total_rows = dim_product.count()
missing_percentage = dim_product.select([
    (count(when(col(c).isNull(), c)) / total_rows * 100).alias(c)
    for c in dim_product.columns])
missing_percentage.show()

+-----------+--------------------+-------------------+--------+----------+------------+----+
|product_key|                name|              brand|category|department|retail_price|cost|
+-----------+--------------------+-------------------+--------+----------+------------+----+
|        0.0|0.006868131868131868|0.08241758241758242|     0.0|       0.0|         0.0| 0.0|
+-----------+--------------------+-------------------+--------+----------+------------+----+



Based on the calculation of the percentage of missing values in each column of the products table, it is found that several columns contain missing values. However, all columns have a missing value percentage of less than 30%. This indicates that the amount of missing data is still within an acceptable range and not too significant. Therefore, imputation (filling in the missing values) can be performed instead of dropping the columns.

In [17]:
# Impute missing values in 'brand'
dim_product_imputed = dim_product.withColumn(
    'brand', when(col('brand').isNull(), 'non-brand').otherwise(col('brand'))
)

# Impute missing values in 'product_name'
dim_product_imputed = dim_product_imputed.withColumn(
    'name', when(col('name').isNull(), 'unknown').otherwise(col('name'))
)

dim_product_imputed.show()

+-----------+--------------------+-----+-----------+----------+------------------+------------------+
|product_key|                name|brand|   category|department|      retail_price|              cost|
+-----------+--------------------+-----+-----------+----------+------------------+------------------+
|      13842|Low Profile Dyed ...|   MG|Accessories|     Women|              6.25| 2.518749990849756|
|      13928|Low Profile Dyed ...|   MG|Accessories|     Women| 5.949999809265137|2.3383499148894105|
|      14115|Enzyme Regular So...|   MG|Accessories|     Women|10.989999771118164| 4.879559879379869|
|      14157|Enzyme Regular So...|   MG|Accessories|     Women|10.989999771118164| 4.648769887297898|
|      14273|Washed Canvas Ivy...|   MG|Accessories|     Women|15.989999771118164| 6.507929886473045|
|      15674|Low Profile Dyed ...|   MG|       Plus|     Women|              6.25|3.1062499998370185|
|      15816|Low Profile Dyed ...|   MG|       Plus|     Women| 5.949999809265137|

In [18]:
# Show missing values as percentage after imputation
total_rows = dim_product_imputed.count()
missing_percentage = dim_product_imputed.select([
    (count(when(col(c).isNull(), c)) / total_rows * 100).alias(c)
    for c in dim_product_imputed.columns])
missing_percentage.show()

+-----------+----+-----+--------+----------+------------+----+
|product_key|name|brand|category|department|retail_price|cost|
+-----------+----+-----+--------+----------+------------+----+
|        0.0| 0.0|  0.0|     0.0|       0.0|         0.0| 0.0|
+-----------+----+-----+--------+----------+------------+----+



After performing imputation, the missing values in the brand and product_name columns have been successfully handled. Specifically, all null values in the brand column were replaced with "non-brand", while missing entries in the product_name column were filled with "unknown". As a result, there are no more missing values in the dataset, and the products table is now complete and ready for further analysis without the risk of data quality issues due to null values.

## Handling Duplicates Row

In [19]:
fact_transaction.show(5)

+--------------+--------+-------+----------+--------+------------------+------------------+--------------------+------------------+----------------+----------+------------+-----------+-----------+
|transaction_id|order_id|user_id|product_id| date_id|        sale_price|              cost|              profit|transaction_status|is_order_created|is_shipped|is_delivered|is_returned|num_of_item|
+--------------+--------+-------+----------+--------+------------------+------------------+--------------------+------------------+----------------+----------+------------+-----------+-----------+
|        177152|  122160|  98069|     14235|20250412|0.0199999995529651|0.0082999997779726|0.011699999774992502|         Cancelled|            true|     false|       false|      false|          1|
|         28849|   19870|  15771|     14235|20240529|0.0199999995529651|0.0082999997779726|0.011699999774992502|          Complete|            true|      true|        true|      false|          4|
|         27428

In [20]:
# Checking is there any duplicates row for fact_transaction
fact_transaction_duplicate_rows = fact_transaction.groupBy(fact_transaction.columns).count().filter("count > 1")
fact_transaction_duplicate_count = fact_transaction_duplicate_rows.count()

if fact_transaction_duplicate_count > 0:
    print(f"Found {fact_transaction_duplicate_count} duplicate rows. Dropping duplicates...")
    fact_transaction_clean = fact_transaction.dropDuplicates()
else:
    print("No duplicate rows found.")
    fact_transaction_clean = fact_transaction

# The result
print("After Checking and Dropping Duplicate Rows:")
fact_transaction_clean.show(5)

No duplicate rows found.
After Checking and Dropping Duplicate Rows:
+--------------+--------+-------+----------+--------+------------------+------------------+--------------------+------------------+----------------+----------+------------+-----------+-----------+
|transaction_id|order_id|user_id|product_id| date_id|        sale_price|              cost|              profit|transaction_status|is_order_created|is_shipped|is_delivered|is_returned|num_of_item|
+--------------+--------+-------+----------+--------+------------------+------------------+--------------------+------------------+----------------+----------+------------+-----------+-----------+
|        177152|  122160|  98069|     14235|20250412|0.0199999995529651|0.0082999997779726|0.011699999774992502|         Cancelled|            true|     false|       false|      false|          1|
|         28849|   19870|  15771|     14235|20240529|0.0199999995529651|0.0082999997779726|0.011699999774992502|          Complete|            

In [21]:
dim_product_imputed.show(5)

+-----------+--------------------+-----+-----------+----------+------------------+------------------+
|product_key|                name|brand|   category|department|      retail_price|              cost|
+-----------+--------------------+-----+-----------+----------+------------------+------------------+
|      13842|Low Profile Dyed ...|   MG|Accessories|     Women|              6.25| 2.518749990849756|
|      13928|Low Profile Dyed ...|   MG|Accessories|     Women| 5.949999809265137|2.3383499148894105|
|      14115|Enzyme Regular So...|   MG|Accessories|     Women|10.989999771118164| 4.879559879379869|
|      14157|Enzyme Regular So...|   MG|Accessories|     Women|10.989999771118164| 4.648769887297898|
|      14273|Washed Canvas Ivy...|   MG|Accessories|     Women|15.989999771118164| 6.507929886473045|
+-----------+--------------------+-----+-----------+----------+------------------+------------------+
only showing top 5 rows



In [22]:
# Checking is there any duplicates row for dim_product
dim_product_duplicate_rows = dim_product_imputed.groupBy(dim_product_imputed.columns).count().filter("count > 1")
dim_product_duplicate_count = dim_product_duplicate_rows.count()

if dim_product_duplicate_count > 0:
    print(f"Found {dim_product_duplicate_count} duplicate rows. Dropping duplicates...")
    dim_product_clean = dim_product_imputed.dropDuplicates()
else:
    print("No duplicate rows found.")
    dim_product_clean = dim_product_imputed

# The result
print("After Checking and Dropping Duplicate Rows:")
dim_product_clean.show(5)

No duplicate rows found.
After Checking and Dropping Duplicate Rows:
+-----------+--------------------+-----+-----------+----------+------------------+------------------+
|product_key|                name|brand|   category|department|      retail_price|              cost|
+-----------+--------------------+-----+-----------+----------+------------------+------------------+
|      13842|Low Profile Dyed ...|   MG|Accessories|     Women|              6.25| 2.518749990849756|
|      13928|Low Profile Dyed ...|   MG|Accessories|     Women| 5.949999809265137|2.3383499148894105|
|      14115|Enzyme Regular So...|   MG|Accessories|     Women|10.989999771118164| 4.879559879379869|
|      14157|Enzyme Regular So...|   MG|Accessories|     Women|10.989999771118164| 4.648769887297898|
|      14273|Washed Canvas Ivy...|   MG|Accessories|     Women|15.989999771118164| 6.507929886473045|
+-----------+--------------------+-----+-----------+----------+------------------+------------------+
only showing 

In [23]:
dim_user.show(5)

+--------+----------------+---+------+-----+----+-------+--------------+
|user_key|   customer_name|age|gender|state|city|country|traffic_source|
+--------+----------------+---+------+-----+----+-------+--------------+
|   25075|Clifford Johnson| 36|     M| Acre|null| Brasil|        Search|
|   97806|    Angela Lopez| 50|     F| Acre|null| Brasil|        Search|
|   41816|    Susan Kelley| 55|     F| Acre|null| Brasil|        Search|
|   82351|Jacqueline Zhang| 62|     F| Acre|null| Brasil|        Search|
|   70916|    Marie Parker| 66|     F| Acre|null| Brasil|        Search|
+--------+----------------+---+------+-----+----+-------+--------------+
only showing top 5 rows



In [24]:
# Checking is there any duplicates row for products
dim_user_duplicate_rows = dim_user.groupBy(dim_user.columns).count().filter("count > 1")
dim_user_duplicate_count = dim_user_duplicate_rows.count()

if dim_user_duplicate_count > 0:
    print(f"Found {dim_user_duplicate_count} duplicate rows. Dropping duplicates...")
    dim_user_clean = dim_user.dropDuplicates()
else:
    print("No duplicate rows found.")
    dim_user_clean = dim_user

# The result
print("After Checking and Dropping Duplicate Rows:")
dim_user_clean.show(5)

No duplicate rows found.
After Checking and Dropping Duplicate Rows:
+--------+----------------+---+------+-----+----+-------+--------------+
|user_key|   customer_name|age|gender|state|city|country|traffic_source|
+--------+----------------+---+------+-----+----+-------+--------------+
|   25075|Clifford Johnson| 36|     M| Acre|null| Brasil|        Search|
|   97806|    Angela Lopez| 50|     F| Acre|null| Brasil|        Search|
|   41816|    Susan Kelley| 55|     F| Acre|null| Brasil|        Search|
|   82351|Jacqueline Zhang| 62|     F| Acre|null| Brasil|        Search|
|   70916|    Marie Parker| 66|     F| Acre|null| Brasil|        Search|
+--------+----------------+---+------+-----+----+-------+--------------+
only showing top 5 rows



In [25]:
dim_date.show(5)

+--------+----------+----+-------+-----+----+---+----------+--------+----------+
|date_key| full_date|year|quarter|month|week|day|month_name|day_name|is_weekend|
+--------+----------+----+-------+-----+----+---+----------+--------+----------+
|20190110|2019-01-10|2019|      1|    1|   1| 10|   January|Thursday|     false|
|20190111|2019-01-11|2019|      1|    1|   1| 11|   January|  Friday|     false|
|20190117|2019-01-17|2019|      1|    1|   2| 17|   January|Thursday|     false|
|20190118|2019-01-18|2019|      1|    1|   2| 18|   January|  Friday|     false|
|20190122|2019-01-22|2019|      1|    1|   3| 22|   January| Tuesday|     false|
+--------+----------+----+-------+-----+----+---+----------+--------+----------+
only showing top 5 rows



In [26]:
# Checking is there any duplicates row for dim_date
dim_date_duplicate_rows = dim_date.groupBy(dim_date.columns).count().filter("count > 1")
dim_date_duplicate_count = dim_date_duplicate_rows.count()

if dim_date_duplicate_count > 0:
    print(f"Found {dim_date_duplicate_count} duplicate rows. Dropping duplicates...")
    dim_date_clean = dim_date.dropDuplicates()
else:
    print("No duplicate rows found.")
    dim_date_clean = dim_date

# The result
print("After Checking and Dropping Duplicate Rows:")
dim_date.show(5)

No duplicate rows found.
After Checking and Dropping Duplicate Rows:
+--------+----------+----+-------+-----+----+---+----------+--------+----------+
|date_key| full_date|year|quarter|month|week|day|month_name|day_name|is_weekend|
+--------+----------+----+-------+-----+----+---+----------+--------+----------+
|20190110|2019-01-10|2019|      1|    1|   1| 10|   January|Thursday|     false|
|20190111|2019-01-11|2019|      1|    1|   1| 11|   January|  Friday|     false|
|20190117|2019-01-17|2019|      1|    1|   2| 17|   January|Thursday|     false|
|20190118|2019-01-18|2019|      1|    1|   2| 18|   January|  Friday|     false|
|20190122|2019-01-22|2019|      1|    1|   3| 22|   January| Tuesday|     false|
+--------+----------+----+-------+-----+----+---+----------+--------+----------+
only showing top 5 rows



After performing duplicate checks across all DataFrames, it was confirmed that there are no duplicate records present. Each row in the datasets is unique, and therefore, no further action is required regarding duplicate removal. The data is clean and ready for the next steps in the analysis process.

# Transform Data to CSV file

In [27]:
fact_transaction_clean.write.mode("overwrite").csv('fact_transaction_clean.csv', header=True)

In [28]:
dim_product_clean.write.mode("overwrite").csv('dim_product_clean.csv', header=True)

In [29]:
dim_user_clean.write.mode("overwrite").csv('dim_user_clean.csv', header=True)

In [30]:
dim_date_clean.write.mode("overwrite").csv('dim_date_clean.csv', header=True)

# Create Connection

In [33]:
# PostgreSQL JDBC Connestion
postgres_url = "jdbc:postgresql://host.docker.internal:5432/gc6"
postgres_properties = {
    "user": "postgres",
    "password": "Rahmanda123",
    "driver": "org.postgresql.Driver"
}

## dim_user

In [34]:
data_dim_user = spark.read.csv('dim_user.csv', header=True, inferSchema=True)
data_dim_user = data_dim_user.drop("_c0")

In [35]:
data_dim_user.write.jdbc(url=postgres_url, table="dim_user", mode="append", properties=postgres_properties)

## dim_product

In [36]:
data_dim_product = spark.read.csv('dim_product.csv', header=True, inferSchema=True)
data_dim_product = data_dim_product.drop("_c0")

In [37]:
data_dim_product.write.jdbc(url=postgres_url, table="dim_product", mode="append", properties=postgres_properties)

## dim_date

In [38]:
data_dim_date = spark.read.csv('dim_date.csv', header=True, inferSchema=True)
data_dim_date = data_dim_date.drop("_c0")

In [39]:
data_dim_date.write.jdbc(url=postgres_url, table="dim_date", mode="append", properties=postgres_properties)

## fact_transaction

In [40]:
data_fact_transaction = spark.read.csv('fact_transaction_clean.csv', header=True, inferSchema=True)
data_fact_transaction = data_fact_transaction.drop("_c0")

In [41]:
data_fact_transaction.write.jdbc(url=postgres_url, table="fact_transaction", mode="append", properties=postgres_properties)

In [42]:
print("Jumlah data dim_user:", data_dim_user.count())
print("Jumlah data dim_product:", data_dim_product.count())
print("Jumlah data dim_date:", data_dim_date.count())
print("Jumlah data fact_transaction:", data_fact_transaction.count())

Jumlah data dim_user: 100000
Jumlah data dim_product: 29120
Jumlah data dim_date: 2286
Jumlah data fact_transaction: 180753


# Presentation Link

https://www.canva.com/design/DAGmrxYZ9OQ/cAulTq2ruKqaUUOVb1XkhA/edit?utm_content=DAGmrxYZ9OQ&utm_campaign=designshare&utm_medium=link2&utm_source=sharebutton