In [14]:
import kagglehub
import os
import glob
import logging

from spark import spark

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType

# Extract

In [22]:
path = kagglehub.dataset_download("mustafakeser4/looker-ecommerce-bigquery-dataset")
csv_files = glob.glob(os.path.join(path, '*.csv'))

schemas = {
    "products": StructType([
        StructField("id", StringType(), True),
        StructField("cost", DoubleType(), True),
        StructField("category", StringType(), True),
        StructField("name", StringType(), True),
        StructField("brand", StringType(), True),
        StructField("retail_price", DoubleType(), True),
        StructField("department", StringType(), True),
        StructField("sku", StringType(), True),
        StructField("distribution_center_id", IntegerType(), True)
    ]),

    "orders": StructType([
        StructField("order_id", StringType(), True),
        StructField("user_id", StringType(), True),
        StructField("status", StringType(), True),
        StructField("gender", StringType(), True),
        StructField("created_at", TimestampType(), True),
        StructField("returned_at", TimestampType(), True),
        StructField("shipped_at", TimestampType(), True),
        StructField("delivered_at", TimestampType(), True),
        StructField("num_of_item", IntegerType(), True)
    ]),

    "inventory_items": StructType([
        StructField("id", StringType(), True),
        StructField("product_id", StringType(), True),
        StructField("created_at", TimestampType(), True),
        StructField("sold_at", TimestampType(), True),
        StructField("cost", DoubleType(), True),
        StructField("product_category", StringType(), True),
        StructField("product_name", StringType(), True),
        StructField("product_brand", StringType(), True),
        StructField("product_retail_price", DoubleType(), True),
        StructField("product_department", StringType(), True),
        StructField("product_sku", StringType(), True),
        StructField("product_distribution_center_id", IntegerType(), True)
    ]),

    "users": StructType([
        StructField("id", StringType(), True),
        StructField("first_name", StringType(), True),
        StructField("last_name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("gender", StringType(), True),
        StructField("state", StringType(), True),
        StructField("street_address", StringType(), True),
        StructField("postal_code", StringType(), True),
        StructField("city", StringType(), True),
        StructField("country", StringType(), True),
        StructField("latitude", DoubleType(), True),
        StructField("longitude", DoubleType(), True),
        StructField("traffic_source", StringType(), True),
        StructField("created_at", TimestampType(), True)
    ]),

    "distribution_centers": StructType([
        StructField("id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("latitude", DoubleType(), True),
        StructField("longitude", DoubleType(), True)
    ]),

    "events": StructType([
        StructField("id", StringType(), True),
        StructField("user_id", StringType(), True),
        StructField("sequence_number", IntegerType(), True),
        StructField("session_id", StringType(), True),
        StructField("created_at", TimestampType(), True),
        StructField("ip_address", StringType(), True),
        StructField("city", StringType(), True),
        StructField("state", StringType(), True),
        StructField("postal_code", StringType(), True),
        StructField("browser", StringType(), True),
        StructField("traffic_source", StringType(), True),
        StructField("uri", StringType(), True),
        StructField("event_type", StringType(), True)
    ]),

    "order_items": StructType([
        StructField("id", StringType(), True),
        StructField("order_id", StringType(), True),
        StructField("user_id", StringType(), True),
        StructField("product_id", StringType(), True),
        StructField("inventory_item_id", StringType(), True),
        StructField("status", StringType(), True),
        StructField("created_at", TimestampType(), True),
        StructField("shipped_at", TimestampType(), True),
        StructField("delivered_at", TimestampType(), True),
        StructField("returned_at", TimestampType(), True),
        StructField("sale_price", DoubleType(), True)
    ])
}




for file in csv_files:
    file_name = os.path.basename(file).split('.')[0]
    df = spark.read.option("header", "true").csv(file, schema=schemas[file_name])

    file_parquet = f"{file_name}.parquet"
    df.write.mode("overwrite").parquet(f"../data/{file_parquet}")

                                                                                

# Transform (aggregation)

In [29]:
file_name = 'order_items'
file_parquet = f"{file_name}.parquet"
df = spark.read.option("header", "true").parquet(f"../data/{file_parquet}", schema=schemas[file_name])
print(file_name)
df.show(2)
df.printSchema()
print()

order_items
+------+--------+-------+----------+-----------------+---------+-------------------+-------------------+-------------------+-----------+------------------+
|    id|order_id|user_id|product_id|inventory_item_id|   status|         created_at|         shipped_at|       delivered_at|returned_at|        sale_price|
+------+--------+-------+----------+-----------------+---------+-------------------+-------------------+-------------------+-----------+------------------+
|152013|  104663|  83582|     14235|           410368|Cancelled|2023-05-07 15:08:40|               NULL|               NULL|       NULL|0.0199999995529651|
| 40993|   28204|  22551|     14235|           110590| Complete|2023-03-14 12:47:21|2023-03-16 07:57:00|2023-03-18 10:08:00|       NULL|0.0199999995529651|
+------+--------+-------+----------+-----------------+---------+-------------------+-------------------+-------------------+-----------+------------------+
only showing top 2 rows

root
 |-- id: string (nulla

In [30]:
file_name = 'orders'
file_parquet = f"{file_name}.parquet"
df = spark.read.option("header", "true").parquet(f"../data/{file_parquet}", schema=schemas[file_name])
print(file_name)
df.show(2)
df.printSchema()
print()

orders
+--------+-------+---------+------+-------------------+-----------+----------+------------+-----------+
|order_id|user_id|   status|gender|         created_at|returned_at|shipped_at|delivered_at|num_of_item|
+--------+-------+---------+------+-------------------+-----------+----------+------------+-----------+
|       8|      5|Cancelled|     F|2022-10-20 19:03:00|       NULL|      NULL|        NULL|          3|
|      60|     44|Cancelled|     F|2023-01-20 11:12:00|       NULL|      NULL|        NULL|          1|
+--------+-------+---------+------+-------------------+-----------+----------+------------+-----------+
only showing top 2 rows

root
 |-- order_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- status: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- returned_at: timestamp (nullable = true)
 |-- shipped_at: timestamp (nullable = true)
 |-- delivered_at: timestamp (nullable = true)
 