Bronze Layer

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("e-commerce_app").getOrCreate()

customer_df = spark.read.csv("/Volumes/workspace/ecommerce_sales/bronze_layer/olist_customers_dataset.csv", header="true", inferSchema="true")
order_df = spark.read.csv("/Volumes/workspace/ecommerce_sales/bronze_layer/olist_orders_dataset.csv", header="true", inferSchema="true")
order_items_df = spark.read.csv("/Volumes/workspace/ecommerce_sales/bronze_layer/olist_order_items_dataset.csv", header="true", inferSchema="true")
products_df = spark.read.csv("/Volumes/workspace/ecommerce_sales/bronze_layer/olist_products_dataset.csv", header="true", inferSchema="true")
products_name_df = spark.read.csv('/Volumes/workspace/ecommerce_sales/bronze_layer/product_category_name_translation.csv', header='true', inferSchema='true')
cities_df = spark.read.csv('/Volumes/workspace/ecommerce_sales/bronze_layer/BRAZIL_CITIES.csv', header='true', inferSchema='true')

customer_df.show(5)
order_df.show(5)

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
+--------------------+--------------------+------------------------+--------------------+--------------+
only showing top 5 rows
+--------------------+---------

Silver layer

In [0]:
from pyspark.sql.functions import date_format, col

cities_df = cities_df.withColumnRenamed('STATE', 'customer_state')

customer_df = (
    customer_df
    .na.drop()
    .dropDuplicates(['customer_id'])
    .join(cities_df, 'customer_state', 'left')
)

orders_df = (
    order_df
    .filter(order_df.order_status == 'delivered')
    .withColumn('purchased_date', date_format(col('order_purchase_timestamp'), 'yyyy-MM-dd'))
    .withColumn('delivery_date', date_format(col('order_delivered_customer_date'), 'yyyy-MM-dd'))
    .drop(
        "order_status",
        "order_purchase_timestamp",
        "order_approved_at",
        "order_delivered_carrier_date",
        "order_delivered_customer_date",
        "order_estimated_delivery_date"
    )
)

order_items_df = (
    order_items_df
    .drop('seller_id', 'shipping_limit_date')
    .filter(order_items_df.price > 0)
)

products_df = (
    products_df
    .na.drop()
    .drop(
        'product_name_lenght',
        'product_description_lenght',
        'product_photos_qty',
        "product_weight_g",
        "product_length_cm",
        "product_height_cm",
        "product_width_cm"
    )
    .join(products_name_df, 'product_category_name', 'left')
    .drop('product_category_name')
    .withColumnRenamed('product_category_name_english', 'product_category_name')
)

customer_df.write.format('delta').mode('overwrite').save('/Volumes/workspace/ecommerce_sales/silver_layer/customer_clean')
orders_df.write.format('delta').mode('overwrite').save('/Volumes/workspace/ecommerce_sales/silver_layer/orders_clean')
order_items_df.write.format('delta').mode('overwrite').save('/Volumes/workspace/ecommerce_sales/silver_layer/order_items_clean')
products_df.write.format('delta').mode('overwrite').save('/Volumes/workspace/ecommerce_sales/silver_layer/products_clean')

Gold Layer

In [0]:
customer_clean_df = spark.read.format('delta').load('/Volumes/workspace/ecommerce_sales/silver_layer/customer_clean')
orders_clean_df = spark.read.format('delta').load('/Volumes/workspace/ecommerce_sales/silver_layer/orders_clean')
order_items_clean_df = spark.read.format('delta').load('/Volumes/workspace/ecommerce_sales/silver_layer/order_items_clean')
products_clean_df = spark.read.format('delta').load('/Volumes/workspace/ecommerce_sales/silver_layer/products_clean')
# display(customer_clean_df)
# display(orders_clean_df)
# display(order_items_clean_df)
# display(products_clean_df)

In [0]:
merged_df = order_items_clean_df.join(orders_clean_df, "order_id", "inner") \
    .join(products_clean_df, "product_id", "inner") \
    .join(customer_clean_df, "customer_id", "inner")

merged_df.write.format('delta').mode('overwrite').save('/Volumes/workspace/ecommerce_sales/gold_layer/sales_fact')

merged_df.write.mode('overwrite').saveAsTable('workspace.ecommerce_sales.sales_fact')

display(merged_df)

customer_id,product_id,order_id,order_item_id,price,freight_value,purchased_date,delivery_date,product_category_name,customer_state,customer_unique_id,customer_zip_code_prefix,customer_city,CITY,LAT,LONG
369ec69ef76c47e51980b88a5740dc1b,afeeea6271148ee1bb15173b8187c431,0438fc3e115633e11a93c878a1591016,1,249.0,15.22,2018-08-11,2018-08-20,telephony,BA,1acfddcc366d45e07b84eedc46282cdc,48990,andorinha,Xique-Xique,-10.82497442,-42.725508
6ecb390e8056057a25a1b49c37206958,568ba441f7f464dc73d23a91cf8ca671,0098dbda25722a3f019fe252a0cd10b3,1,143.54,21.49,2017-09-19,2017-10-02,bed_bath_table,GO,8c81d87b1a876ac44f968c7f112501f5,73760,sao joao d'alianca,Vila Propício,-15.45463482,-48.88242208
5f8529d3f9d41718127f8ed44d471c6e,64eb8b21706b8e2c733ba16bf02a1537,14f38e83e237f237348b97dd9f228d40,1,40.5,13.08,2018-01-31,2018-02-16,health_beauty,SP,d55e594accb5b8cd7e3c26cdd854f5bd,2878,sao paulo,Zacarias,-21.05011043,-50.05573952
d3d5a2955341122b1b68164dcbbc948e,b9754f9e21b6e6960b37594d4387fb1c,0904fb2c91a18099a0b7315f1ac9501b,1,385.0,21.23,2018-02-13,2018-03-17,perfumery,DF,925d26d2589c983d6e2c33ca7da030d0,72210,brasilia,Brasília,-15.79408736,-47.88790548
58dbd0b2d70206bf40e62cd34e84d795,ac6c3623068f30de03045865e4e10089,00042b26cf59d7ce69dfabb4e55b4fd9,1,199.9,18.14,2017-02-04,2017-03-01,garden_tools,SP,64b576fb70d441e8f1b2d7d446e483c5,13226,varzea paulista,Zacarias,-21.05011043,-50.05573952
d316653e26343ebd09a65316574afd7b,9e8762176dba8eada7ed0259e69c3423,0d193d461bbcef442c5e6786a16d5bca,1,29.9,11.85,2017-11-20,2017-11-27,telephony,SP,980097869b622b2054accc551ed1c86f,5335,sao paulo,Zacarias,-21.05011043,-50.05573952
c0971253b7225c85c559bfa40dae5303,3a05a947f28cf46d789adedc107d7595,0a82501b7148667eaff8dc8811a58f06,1,119.9,21.68,2017-10-20,2017-10-27,sports_leisure,SP,bbfdfbd3af995f06a0da9d30e99fae15,5005,sao paulo,Zacarias,-21.05011043,-50.05573952
acb6cc6ec99086b8b53c858a0731c189,33bd5538a16d23f0a39da1c9bfd20980,00a77f9cc25b05b2004752593d7b8888,1,177.9,18.5,2017-06-22,2017-07-07,home_confort,RJ,87dff19086a1aa321cf06195ab40631c,22785,rio de janeiro,Volta Redonda,-22.5099676,-44.09352188
71cd7e5e11ae80ed42c6c57ff51a347d,0ce1e35bdc6af7e582e7ef9363e29b5b,012bea13f9a355c983efb59a3f23583a,1,21.99,15.1,2017-11-24,2017-12-15,telephony,RS,26f8790baee110479bb7cb362e16134a,95795,sao vendelino,Xangri-Lá,-29.80537819,-50.03908142
fb6895f16f2b933e56af615dbcecf0aa,9c0f79b9daf5436386477b4e17b94389,02d273267575a29864f552b2de598580,1,150.0,9.67,2018-05-28,2018-06-13,furniture_decor,DF,6d06d8848091c6055d728ba3ec366536,71967,brasilia,Brasília,-15.79408736,-47.88790548


In [0]:
from pyspark.sql.functions import sum as _sum, countDistinct

#Revenue by product category
revenue_by_category = (
    merged_df.groupBy('product_category_name') \
    .agg(_sum('price').alias('total_revenue')) \
    .orderBy('total_revenue', ascending=False)
)

revenue_by_category.write.mode('overwrite').saveAsTable('workspace.ecommerce_sales.revenue_by_category')

# Orders by city
orders_by_city = merged_df.groupBy('customer_state') \
        .agg(countDistinct('order_id').alias('num_orders')) \
        .orderBy('num_orders', ascending = False)

display(revenue_by_category)

product_category_name,total_revenue
office_furniture,437271.2999999775
telephony,216672.1199999832
sports_leisure,186535.1999999936
bed_bath_table,164117.33999999246
garden_tools,128935.49999999849
housewares,77280.0
furniture_decor,27912.100000000715
health_beauty,26122.5
home_confort,16366.799999999976
watches_gifts,12122.0
