# Introduction
Link Google Slide `https://docs.google.com/presentation/d/1Vtk3Qc7Lo-hX6Qb5ctC4Qkkn2HUeJvJ0VpmygEM5t_4/edit?usp=sharing`

CODA-RMT-002 - Daniswara Eka Saputra

Membuat Data Warehouse untuk kebutuhan analisis perusahaan terkait dengan penjualan perusahaan ecommerce 'The Look'. Dataset yang digunakan berasal dari Google BigQuery database "thelook_ecommerce".

## Getting Started

In [1]:
import pyspark
from pyspark.sql.functions import avg, max, min, sum, col, when, concat_ws
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder \
    .appName("WriteToPostgres") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .getOrCreate()
spark

## Data Transformation and Data Cleaning

# Fact Table

## Order Table

In [3]:
fact_order = spark.read.csv('Fact_Order.csv', header=True, inferSchema=True)
fact_order.show()

+--------+-------+------------------+-----------+
|order_id|user_id|transaction_amount|num_of_item|
+--------+-------+------------------+-----------+
|  104671|  83488|0.0199999995529651|          1|
|   75102|  59812|0.4900000095367431|          1|
|   95464|  76145|0.4900000095367431|          1|
|   44227|  35291|0.4900000095367431|          1|
|   29481|  23526|0.4900000095367431|          1|
|   83889|  66888|0.4900000095367431|          1|
|   47466|  37845|               1.5|          1|
|    3418|   2733|               1.5|          1|
|   22090|  17520|               1.5|          1|
|  115183|  91834|               1.5|          1|
|   79624|  63441|               1.5|          1|
|  110920|  88445|               1.5|          1|
|   62914|  50114|1.5099999904632568|          1|
|   87795|  70019|1.5099999904632568|          1|
|     212|    169|1.5099999904632568|          1|
|    9503|   7548|1.5099999904632568|          1|
|  106627|  85019|1.5099999904632568|          1|


Menampilkan skema data dari tabel fact_order

In [4]:
fact_order.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- transaction_amount: double (nullable = true)
 |-- num_of_item: integer (nullable = true)



Menampilkan statistik data dari tabel fact_order

In [5]:
fact_order.describe().show()

+-------+------------------+------------------+------------------+------------------+
|summary|          order_id|           user_id|transaction_amount|       num_of_item|
+-------+------------------+------------------+------------------+------------------+
|  count|            125431|            125431|            125431|            125431|
|   mean|           62716.0| 49994.80973603017|  86.7678536465046|1.4523921518603853|
| stddev|36208.955144641426|28877.211338771685| 93.94987146188836|0.8089699810984198|
|    min|                 1|                 1|0.0199999995529651|                 1|
|    max|            125431|             99998|1696.9900016784668|                 4|
+-------+------------------+------------------+------------------+------------------+



Melihat null value dari tabel

In [6]:
fact_order.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in fact_order.columns)).show()

+--------+-------+------------------+-----------+
|order_id|user_id|transaction_amount|num_of_item|
+--------+-------+------------------+-----------+
|       0|      0|                 0|          0|
+--------+-------+------------------+-----------+



Bisa dilihat dari semua kolom tidak terdapat null value

Mengecek data duplicates yang ada pada tabel

In [7]:
fact_order.exceptAll(fact_order.dropDuplicates()).show()

+--------+-------+------------------+-----------+
|order_id|user_id|transaction_amount|num_of_item|
+--------+-------+------------------+-----------+
+--------+-------+------------------+-----------+



Dari semua kolom tidak ada data yang duplicates

# Dimension Table

## Users table

In [8]:
dim_users = spark.read.csv('dim_users.csv', header=True, inferSchema=True)
dim_users.show()

+-----+----------+---------+--------------------+---+------+-----+--------------------+-----------+--------+-------+------------+------------+--------------+
|   id|first_name|last_name|               email|age|gender|state|      street_address|postal_code|    city|country|    latitude|   longitude|traffic_source|
+-----+----------+---------+--------------------+---+------+-----+--------------------+-----------+--------+-------+------------+------------+--------------+
|35166|   Michael|   Stokes|michaelstokes@exa...| 61|     M| Acre|53410 Wilson Poin...|  69980-000|    null| Brasil|-8.065346116|-72.87094866|        Search|
|94007|      Paul|  Cochran|paulcochran@examp...| 65|     M| Acre|   72561 Nathan Lake|  69980-000|    null| Brasil|-8.065346116|-72.87094866|       Organic|
|40471|   Breanna| Thompson|breannathompson@e...| 51|     F| Acre|8287 Fisher Shoal...|  69980-000|    null| Brasil|-8.065346116|-72.87094866|        Search|
|93377|    Thomas|   Moreno|thomasmoreno@exam...| 39

Menampilkan skema data dari tabel dim_users

In [9]:
dim_users.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- state: string (nullable = true)
 |-- street_address: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- traffic_source: string (nullable = true)



Menampilkan statistik data dari tabel dim_users

In [10]:
dim_users.describe().show()

+-------+------------------+----------+---------+--------------------+------------------+------+-------------+--------------------+------------------+-----------+-------------+------------------+------------------+--------------+
|summary|                id|first_name|last_name|               email|               age|gender|        state|      street_address|       postal_code|       city|      country|          latitude|         longitude|traffic_source|
+-------+------------------+----------+---------+--------------------+------------------+------+-------------+--------------------+------------------+-----------+-------------+------------------+------------------+--------------+
|  count|            100000|    100000|   100000|              100000|            100000|100000|       100000|              100000|            100000|     100000|       100000|            100000|            100000|        100000|
|   mean|           50000.5|      NULL|     NULL|                NULL|          

Mengecek null value di semua kolom

In [11]:
dim_users.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in dim_users.columns)).show()

+---+----------+---------+-----+---+------+-----+--------------+-----------+----+-------+--------+---------+--------------+
| id|first_name|last_name|email|age|gender|state|street_address|postal_code|city|country|latitude|longitude|traffic_source|
+---+----------+---------+-----+---+------+-----+--------------+-----------+----+-------+--------+---------+--------------+
|  0|         0|        0|    0|  0|     0|    0|             0|          0|   0|      0|       0|        0|             0|
+---+----------+---------+-----+---+------+-----+--------------+-----------+----+-------+--------+---------+--------------+



Dilihat dari hasil semua kolom tidak ada null value, akan tetapi dilihat dari data diatas banyak terdapat null pada kolom city. 

Disini memfilter 'null' string value yang ada pada kolom city, ternyata memang banyak data dari yang berisikan string 'null' ini.

In [12]:
dim_users.filter(dim_users.city == 'null').show(5)

+-----+----------+---------+--------------------+---+------+-----+--------------------+-----------+----+-------+------------+------------+--------------+
|   id|first_name|last_name|               email|age|gender|state|      street_address|postal_code|city|country|    latitude|   longitude|traffic_source|
+-----+----------+---------+--------------------+---+------+-----+--------------------+-----------+----+-------+------------+------------+--------------+
|35166|   Michael|   Stokes|michaelstokes@exa...| 61|     M| Acre|53410 Wilson Poin...|  69980-000|null| Brasil|-8.065346116|-72.87094866|        Search|
|94007|      Paul|  Cochran|paulcochran@examp...| 65|     M| Acre|   72561 Nathan Lake|  69980-000|null| Brasil|-8.065346116|-72.87094866|       Organic|
|40471|   Breanna| Thompson|breannathompson@e...| 51|     F| Acre|8287 Fisher Shoal...|  69980-000|null| Brasil|-8.065346116|-72.87094866|        Search|
|93377|    Thomas|   Moreno|thomasmoreno@exam...| 39|     M| Acre|     5183 

Disini mengganti 'null' (string) dengan null value (none)

In [13]:
# Replace 'null' (string) with null (None)
dim_users_cleaned = dim_users.withColumn("city", when(col("city") == "null", None).otherwise(col("city")))

dim_users_cleaned.show()


+-----+----------+---------+--------------------+---+------+-----+--------------------+-----------+--------+-------+------------+------------+--------------+
|   id|first_name|last_name|               email|age|gender|state|      street_address|postal_code|    city|country|    latitude|   longitude|traffic_source|
+-----+----------+---------+--------------------+---+------+-----+--------------------+-----------+--------+-------+------------+------------+--------------+
|35166|   Michael|   Stokes|michaelstokes@exa...| 61|     M| Acre|53410 Wilson Poin...|  69980-000|    NULL| Brasil|-8.065346116|-72.87094866|        Search|
|94007|      Paul|  Cochran|paulcochran@examp...| 65|     M| Acre|   72561 Nathan Lake|  69980-000|    NULL| Brasil|-8.065346116|-72.87094866|       Organic|
|40471|   Breanna| Thompson|breannathompson@e...| 51|     F| Acre|8287 Fisher Shoal...|  69980-000|    NULL| Brasil|-8.065346116|-72.87094866|        Search|
|93377|    Thomas|   Moreno|thomasmoreno@exam...| 39

Melihat kembali null value yang ada pada tabel

In [14]:
dim_users_cleaned.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in dim_users_cleaned.columns)).show()

+---+----------+---------+-----+---+------+-----+--------------+-----------+----+-------+--------+---------+--------------+
| id|first_name|last_name|email|age|gender|state|street_address|postal_code|city|country|latitude|longitude|traffic_source|
+---+----------+---------+-----+---+------+-----+--------------+-----------+----+-------+--------+---------+--------------+
|  0|         0|        0|    0|  0|     0|    0|             0|          0| 984|      0|       0|        0|             0|
+---+----------+---------+-----+---+------+-----+--------------+-----------+----+-------+--------+---------+--------------+



Bisa dilihat bahwa sekarang null value di kolom city ada 984

Mengecek duplicates data yang ada pada tabel dim_uesers

In [15]:
dim_users_cleaned.exceptAll(dim_users_cleaned.dropDuplicates()).show()

+---+----------+---------+-----+---+------+-----+--------------+-----------+----+-------+--------+---------+--------------+
| id|first_name|last_name|email|age|gender|state|street_address|postal_code|city|country|latitude|longitude|traffic_source|
+---+----------+---------+-----+---+------+-----+--------------+-----------+----+-------+--------+---------+--------------+
+---+----------+---------+-----+---+------+-----+--------------+-----------+----+-------+--------+---------+--------------+



Dari hasil diatas tidak ada data yang duplicates.

Di script dibawah ini bertujuan untuk menggabungkan kolom 'first_name' dan 'last_name' agar menjadi 1 kolom yang bernama 'name'

In [16]:

# Combine first_name and last_name into full_name
dim_users_cleaned = dim_users_cleaned.withColumn("name", concat_ws(" ", dim_users_cleaned["first_name"], dim_users_cleaned["last_name"]))

# Drop the original first_name and last_name columns if no longer needed
dim_users_cleaned = dim_users_cleaned.drop("first_name", "last_name")

dim_users_cleaned.show(5)


+-----+--------------------+---+------+-----+--------------------+-----------+----+-------+------------+------------+--------------+----------------+
|   id|               email|age|gender|state|      street_address|postal_code|city|country|    latitude|   longitude|traffic_source|            name|
+-----+--------------------+---+------+-----+--------------------+-----------+----+-------+------------+------------+--------------+----------------+
|35166|michaelstokes@exa...| 61|     M| Acre|53410 Wilson Poin...|  69980-000|NULL| Brasil|-8.065346116|-72.87094866|        Search|  Michael Stokes|
|94007|paulcochran@examp...| 65|     M| Acre|   72561 Nathan Lake|  69980-000|NULL| Brasil|-8.065346116|-72.87094866|       Organic|    Paul Cochran|
|40471|breannathompson@e...| 51|     F| Acre|8287 Fisher Shoal...|  69980-000|NULL| Brasil|-8.065346116|-72.87094866|        Search|Breanna Thompson|
|93377|thomasmoreno@exam...| 39|     M| Acre|     5183 Julia Port|  69980-000|NULL| Brasil|-8.065346

Setelah dilihat hasilnya bisa dilihat kolom 'first_name' dan 'last_name' sudah tidak ada dan menjadi 1 kolom baru yaitu kolom 'name'

In [17]:
dim_users_cleaned.printSchema()

root
 |-- id: integer (nullable = true)
 |-- email: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- state: string (nullable = true)
 |-- street_address: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- name: string (nullable = false)



## Product Table

In [18]:
dim_product = spark.read.csv('dim_product.csv', header=True, inferSchema=True)
dim_product.show()

+-----+------------------+-----------+--------------------+-----+------------------+----------+--------------------+
|   id|              cost|   category|                name|brand|      retail_price|department|                 sku|
+-----+------------------+-----------+--------------------+-----+------------------+----------+--------------------+
|13842| 2.518749990849756|Accessories|Low Profile Dyed ...|   MG|              6.25|     Women|EBD58B8A3F1D72F42...|
|13928|2.3383499148894105|Accessories|Low Profile Dyed ...|   MG| 5.949999809265137|     Women|2EAC42424D12436BD...|
|14115| 4.879559879379869|Accessories|Enzyme Regular So...|   MG|10.989999771118164|     Women|EE364229B2791D1EF...|
|14157| 4.648769887297898|Accessories|Enzyme Regular So...|   MG|10.989999771118164|     Women|00BD13095D06C20B1...|
|14273| 6.507929886473045|Accessories|Washed Canvas Ivy...|   MG|15.989999771118164|     Women|F531DC20FDE20B7AD...|
|15674|3.1062499998370185|       Plus|Low Profile Dyed ...|   MG

Melihat skema data dari tabel dim_product

In [19]:
dim_product.printSchema()

root
 |-- id: integer (nullable = true)
 |-- cost: double (nullable = true)
 |-- category: string (nullable = true)
 |-- name: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- retail_price: double (nullable = true)
 |-- department: string (nullable = true)
 |-- sku: string (nullable = true)



Melihat statistik data dari tabel dim_product

In [20]:
dim_product.describe().show()

+-------+-----------------+------------------+-----------+--------------------+---------+------------------+----------+--------------------+
|summary|               id|              cost|   category|                name|    brand|      retail_price|department|                 sku|
+-------+-----------------+------------------+-----------+--------------------+---------+------------------+----------+--------------------+
|  count|            29120|             29120|      29120|               29118|    29096|             29120|     29120|               29120|
|   mean|          14560.5|   28.481774240437|       NULL|                NULL| Infinity|59.220163865731955|      NULL|                NULL|
| stddev|8406.364255729115|30.624681214377695|       NULL|                NULL|      NaN|  65.8889266957586|      NULL|                NULL|
|    min|                1|0.0082999997779726|Accessories|!iT Jeans Junior'...|!it Jeans|0.0199999995529651|       Men|00003E3B9E5336685...|
|    max|    

Melihat nilai null value di semua kolom

In [21]:
dim_product.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in dim_product.columns)).show()

+---+----+--------+----+-----+------------+----------+---+
| id|cost|category|name|brand|retail_price|department|sku|
+---+----+--------+----+-----+------------+----------+---+
|  0|   0|       0|   2|   24|           0|         0|  0|
+---+----+--------+----+-----+------------+----------+---+



Dari hasil diatas terdapat null value pada kolom 'name' yaitu 2, dan kolom brand yaitu 24.

Disini membuang nilai null yang ada pada kolom 'name' karena menurut saya jika pada kolom tersebut terdapat nilai null maka tidak perlu digunakan jadi dibuang saja.

In [22]:
dim_product_cleaned = dim_product.filter(dim_product.name.isNotNull())

In [23]:
dim_product_cleaned.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in dim_product_cleaned.columns)).show()

+---+----+--------+----+-----+------------+----------+---+
| id|cost|category|name|brand|retail_price|department|sku|
+---+----+--------+----+-----+------------+----------+---+
|  0|   0|       0|   0|   24|           0|         0|  0|
+---+----+--------+----+-----+------------+----------+---+



Disini sudah terlihat bahwa kolom name sudah tidak ada nilai null, pada kolom brand nilai null tidak dibersihkan karena memang pada data bisa saja tidak ada nama brandnya jadi nilai null tersebut tidak dibersihkan.

Mengecek duplicates data 

In [24]:
dim_product_cleaned.exceptAll(dim_product_cleaned.dropDuplicates()).show()

+---+----+--------+----+-----+------------+----------+---+
| id|cost|category|name|brand|retail_price|department|sku|
+---+----+--------+----+-----+------------+----------+---+
+---+----+--------+----+-----+------------+----------+---+



Dari hasil diatas tidak ada data yang duplicates.

## Order Item Table

In [25]:
dim_order_item = spark.read.csv('dim_order_item.csv', header=True, inferSchema=True)
dim_order_item.show()

+------+--------+----------+
|    id|order_id|product_id|
+------+--------+----------+
|125384|   86236|     14235|
|151956|  104671|     14235|
| 11373|    7846|     14235|
| 76647|   52643|     14159|
|  4644|    3242|     14159|
|108287|   74360|     14159|
|109376|   75102|     14159|
|145345|  100046|     14159|
|138687|   95464|     14159|
| 64424|   44227|     14159|
| 42842|   29481|     14159|
|122008|   83889|     14159|
| 90561|   62178|     28700|
| 24568|   16934|     28700|
| 52004|   35738|     14202|
| 66708|   45773|     14202|
| 69179|   47466|     28700|
|116588|   80159|     28700|
|164235|  113127|     28700|
|  4888|    3418|     14202|
+------+--------+----------+
only showing top 20 rows



Melihat skema data pada table dim_order_item

In [26]:
dim_order_item.printSchema()

root
 |-- id: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)



Melihat statistik data yang ada pada table dim_order_item

In [27]:
dim_order_item.describe().show()

+-------+------------------+------------------+------------------+
|summary|                id|          order_id|        product_id|
+-------+------------------+------------------+------------------+
|  count|            182175|            182175|            182175|
|   mean|           91088.0|62655.830207218336|15274.241597365171|
| stddev|52589.536982179416|36211.444583372606| 8410.335743637832|
|    min|                 1|                 1|                 1|
|    max|            182175|            125431|             29120|
+-------+------------------+------------------+------------------+



Mengecek null value yang ada pada tabel

In [28]:
dim_order_item.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in dim_order_item.columns)).show()

+---+--------+----------+
| id|order_id|product_id|
+---+--------+----------+
|  0|       0|         0|
+---+--------+----------+



Dapat dilihat bahwa semua kolom tidak memiliki null value.

Mengecek apakah ada data yang duplicates

In [29]:
dim_order_item.exceptAll(dim_order_item.dropDuplicates()).show()

+---+--------+----------+
| id|order_id|product_id|
+---+--------+----------+
+---+--------+----------+



Dapat dilihat bahwa tidak ada data yang duplicates.

# Data Load to data warehouse

In [32]:
# PostgreSQL JDBC Connection
postgres_url = "jdbc:postgresql://host.docker.internal:5432/datawarehouse_ddl_daniswara"
postgres_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

#Write DataFrame to PostgreSQL
fact_order.write \
    .jdbc(url=postgres_url, table="orders", mode="overwrite", properties=postgres_properties)
dim_users_cleaned.write \
    .jdbc(url=postgres_url, table="users", mode="overwrite", properties=postgres_properties)
dim_product_cleaned.write \
    .jdbc(url=postgres_url, table="products", mode="overwrite", properties=postgres_properties)
dim_order_item.write \
    .jdbc(url=postgres_url, table="order_items", mode="overwrite", properties=postgres_properties)
