In [1]:
from pyspark.sql import functions as F

StatementMeta(, 05b6b04c-cb82-4345-affd-f78de230a4e1, 3, Finished, Available, Finished)

In [3]:
# Populate dim_geolocation

# Read silver geolocation table
silver_geo = spark.table("olist_geolocation_cleaned")

# Transform and select columns for Gold
gold_geo = silver_geo.select(
    "geolocation_zip_code_prefix",
    "geolocation_city",
    "geolocation_state",
     F.col("coordinates").alias("geolocation_coordinates")
)

gold_geo.show(5, truncate=False)

# Overwrite GoldLakehouse table
gold_geo.write.format("delta").mode("overwrite").saveAsTable("GoldLakehouse.dim_geolocation")



StatementMeta(, 63f0f727-6186-40d5-9d5b-39ca947bd422, 5, Finished, Available, Finished)

+---------------------------+----------------+-----------------+-----------------------------------------+
|geolocation_zip_code_prefix|geolocation_city|geolocation_state|geolocation_coordinates                  |
+---------------------------+----------------+-----------------+-----------------------------------------+
|38470                      |Grupiara        |MG               |[-18.49613532574363, -47.7213801592461]  |
|38470                      |Grupiara        |MG               |[-18.498736131595933, -47.72273981353448]|
|38470                      |Grupiara        |MG               |[-18.50020798897437, -47.72361850109356] |
|38470                      |Grupiara        |MG               |[-18.49811866095442, -47.72317360025173] |
|38470                      |Grupiara        |MG               |[-18.495609757400818, -47.72280882202118]|
+---------------------------+----------------+-----------------+-----------------------------------------+
only showing top 5 rows



In [6]:
# Populate dim_customers

# Read silver customers table
silver_customers = spark.table("olist_customers_cleaned")

# Select relevant columns (matching Gold schema)
gold_customers = silver_customers.select(
    "customer_id",
    "customer_unique_id",
    "customer_zip_code_prefix",
    "customer_city",
    "customer_state"
)

spark.sql("DROP TABLE IF EXISTS GoldLakehouse.dim_customers")

# Overwrite GoldLakehouse.dim_customers
gold_customers.write.format("delta").mode("overwrite").saveAsTable("GoldLakehouse.dim_customers")

StatementMeta(, 1de9d9d5-7c21-48d4-81a8-889e6ecda60a, 8, Finished, Available, Finished)

In [5]:
# Populate dim_sellers

# Read silver sellers table
silver_sellers = spark.table("olist_sellers_cleaned")

# Select relevant columns (matching Gold schema)
gold_sellers = silver_sellers.select(
    "seller_id",
    "seller_zip_code_prefix",
    "seller_city",
    "seller_state"
)

spark.sql("DROP TABLE IF EXISTS GoldLakehouse.dim_sellers")

# Overwrite GoldLakehouse.dim_sellers
gold_sellers.write.format("delta").mode("overwrite").saveAsTable("GoldLakehouse.dim_sellers")

StatementMeta(, 1de9d9d5-7c21-48d4-81a8-889e6ecda60a, 7, Finished, Available, Finished)

In [15]:
# Populate dim_products

# Read silver products table
silver_products = spark.table("olist_products_cleaned")

# Select relevant columns (matching Gold schema)
gold_products = silver_products.select(
    "product_id",
    "product_category_name_english",
    F.col("product_name_length").cast("INT").alias("product_name_length"),
    F.col("product_description_length").cast("INT").alias("product_description_length"),
    F.col("product_photos_qty").cast("INT").alias("product_photos_qty"),
    F.col("product_weight_g").cast("FLOAT").alias("product_weight_g"),
    F.col("product_length_cm").cast("FLOAT").alias("product_length_cm"),
    F.col("product_height_cm").cast("FLOAT").alias("product_height_cm"),
    F.col("product_width_cm").cast("FLOAT").alias("product_width_cm")
)

# Overwrite GoldLakehouse.dim_products
gold_products.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("GoldLakehouse.dim_products")

StatementMeta(, 11348a84-0b46-49b9-b1fa-c994499d2d4a, 17, Finished, Available, Finished)

In [3]:
# Populate dim_dates

from pyspark.sql.types import DateType
import datetime

StatementMeta(, 05b6b04c-cb82-4345-affd-f78de230a4e1, 5, Finished, Available, Finished)

In [4]:
# Generate date range
start_date = datetime.date(2016, 9, 4)  # earliest order date in Olist
end_date = datetime.date(2018, 10, 18)    # latest order date + 1

date_df = spark.createDataFrame(
    [(start_date + datetime.timedelta(days=i),) for i in range((end_date - start_date).days + 1)],
    ["date_id"]
)

dim_dates = date_df.withColumn("day", F.dayofmonth("date_id")) \
                   .withColumn("month", F.month("date_id")) \
                   .withColumn("year", F.year("date_id")) \
                   .withColumn("quarter", F.quarter("date_id")) \
                   .withColumn("day_of_week", F.date_format("date_id", "EEEE")) \
                   .orderBy("date_id")

spark.sql("DROP TABLE IF EXISTS GoldLakehouse.dim_dates")

dim_dates.write.format("delta").mode("overwrite").saveAsTable("GoldLakehouse.dim_dates")

StatementMeta(, 05b6b04c-cb82-4345-affd-f78de230a4e1, 6, Finished, Available, Finished)

In [19]:
# Populate fact_order_items

# Read silver items and orders tables
silver_items = spark.table("olist_items_cleaned")
silver_orders = spark.table("olist_orders_cleaned")

# Join items with orders to get date_id
gold_order_items = silver_items.join(
    silver_orders,
    silver_items["order_id"] == silver_orders["order_id"],
    "inner"
).select(
    F.col("order_item_id").cast("INT").alias("order_item_id"),
    F.col("olist_items_cleaned.order_id").alias("order_id"),
    F.col("product_id").alias("product_id"),
    F.col("seller_id").alias("seller_id"),
    F.col("shipping_limit_date").cast("timestamp").alias("shipping_limit_date"),
    F.col("price").cast("FLOAT").alias("price"),
    F.col("freight_value").cast("FLOAT").alias("freight_value"),
    F.col("order_purchase_timestamp").cast("date").alias("date_id")
)

# Overwrite GoldLakehouse.fact_order_items
gold_order_items.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("GoldLakehouse.fact_order_items")


StatementMeta(, 11348a84-0b46-49b9-b1fa-c994499d2d4a, 21, Finished, Available, Finished)

In [4]:
# Populate fact_orders

# Read silver tables
silver_orders = spark.table("olist_orders_cleaned")
silver_items = spark.table("olist_items_cleaned")
silver_payments = spark.table("olist_payments_cleaned")
silver_reviews = spark.table("olist_reviews_cleaned")

# Calculate total_order_value from items (sum of price per order_id)
payments_agg = silver_items.groupBy("order_id").agg(
    F.sum(F.col("price")).cast("FLOAT").alias("total_order_value")
)

# Calculate total freight_value from items (sum per order_id)
freight_agg = silver_items.groupBy("order_id").agg(
    F.sum(F.col("freight_value")).cast("FLOAT").alias("total_freight_value")
)

# Get review score from reviews table
review_scores = silver_reviews.select(
    "order_id",
    F.col("review_score").cast("INT").alias("review_score")
)

# Join all together
gold_orders = silver_orders \
    .join(payments_agg, "order_id", "left") \
    .join(freight_agg, "order_id", "left") \
    .join(review_scores, "order_id", "left") \
    .select(
        F.col("order_id"),
        F.col("customer_id"),
        F.col("order_status"),
        F.col("order_purchase_timestamp").cast("timestamp").alias("order_purchase_timestamp"),
        F.col("order_approved_at").cast("timestamp").alias("order_approved_at"),
        F.col("order_delivered_carrier_date").cast("timestamp").alias("order_delivered_carrier_date"),
        F.col("order_delivered_customer_date").cast("timestamp").alias("order_delivered_customer_date"),
        F.col("order_estimated_delivery_date").cast("timestamp").alias("order_estimated_delivery_date"),
        F.col("total_order_value"),
        F.col("total_freight_value").cast("float").alias("total_freight_value"), 
        F.col("review_score"),
        F.col("order_purchase_timestamp").cast("date").alias("date_id")
    )

# Clean invalid or non-numeric freight values before writing
gold_orders = gold_orders.withColumn("total_freight_value",
    F.when(
        F.col("total_freight_value").cast("float").isNotNull(),
        F.col("total_freight_value").cast("float")
    ).otherwise(F.lit(None).cast("float"))
)

spark.sql("DROP TABLE IF EXISTS GoldLakehouse.fact_orders")

# Overwrite GoldLakehouse.fact_orders
gold_orders.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("GoldLakehouse.fact_orders")

StatementMeta(, 1de9d9d5-7c21-48d4-81a8-889e6ecda60a, 6, Finished, Available, Finished)

In [22]:
# Populate fact_payments

# Read silver tables
silver_payments = spark.table("olist_payments_cleaned")
silver_orders = spark.table("olist_orders_cleaned")

# Extract date_id and customer_id from orders table
orders_info = silver_orders.select(
    "order_id",
    "customer_id",
    F.to_date("order_purchase_timestamp").cast("date").alias("date_id")
)

# Join payments with orders
gold_payments = silver_payments \
    .join(orders_info, "order_id", "left") \
    .select(
        "order_id",
        "customer_id",
        F.col("payment_sequential").cast("INT").alias("payment_sequential"),
        F.col("payment_type").alias("payment_type"),
        F.col("payment_installments").cast("INT").alias("payment_installments"),
        F.col("payment_value").cast("FLOAT").alias("payment_value"),
        F.col("date_id")
    )

# Overwrite GoldLakehouse.fact_payments
gold_payments.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("GoldLakehouse.fact_payments")


StatementMeta(, 11348a84-0b46-49b9-b1fa-c994499d2d4a, 24, Finished, Available, Finished)

In [10]:
# Populate fact_reviews
from pyspark.sql.functions import coalesce, lit

# Read silver tables
silver_reviews = spark.table("olist_reviews_cleaned")
silver_orders = spark.table("olist_orders_cleaned")

# Extract order_status and customer_id from orders table
orders_info = silver_orders.select(
    "order_id",
    "order_status",
    "customer_id"
)

# Join reviews with orders
gold_reviews = silver_reviews \
    .join(orders_info, "order_id", "left") \
    .select(
        "review_id",
        "order_id",
        "customer_id",
        F.col("review_score").cast("INT").alias("review_score"),
        "order_status",
        "review_comment_title",
        coalesce(F.col("review_comment_message").cast("string"), lit("")).alias("review_comment_message"),  # to avoid Nulls from showing up as Invalid Date
        F.col("review_creation_date").cast("date").alias("review_creation_date"),
        F.col("review_answer_timestamp").cast("timestamp").alias("review_answer_timestamp")
    )

gold_reviews.show()

# Overwrite GoldLakehouse.fact_reviews
gold_reviews.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("GoldLakehouse.fact_reviews")

StatementMeta(, abe0e860-5eee-447f-872d-2a6a0cd4842d, 12, Finished, Available, Finished)

+--------------------+--------------------+--------------------+------------+------------+--------------------+----------------------+--------------------+-----------------------+
|           review_id|            order_id|         customer_id|review_score|order_status|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------------------+--------------------+--------------------+------------+------------+--------------------+----------------------+--------------------+-----------------------+
|7c847e0e0207fe7d1...|8a3a8f42d50b55a79...|778aed33f729f0922...|           5|   delivered|                NULL|                      |          2017-09-16|    2017-09-18 17:56:38|
|02d08166f29d0b5f2...|08ba140da4dd8ad47...|abe3fe8adfe5af3ee...|           5|   delivered|                NULL|                      |          2018-04-28|    2018-05-01 10:07:11|
|693730cd575735179...|df23e811bccb321eb...|412ef0bba96a560e2...|           5|   delivered|          

In [21]:
# Populate dim_br_states_2016

from pyspark.sql import functions as F

# Read silver tables
br_states = spark.table("br_states")
br_income = spark.table("br_income")
state_pop_2016 = spark.table("state_pop_2016")

# Join br_states to state_pop_2016
joined_df1 = br_states.select(F.col('code'), F.col('name'), F.col('area_sqkm')).join(state_pop_2016.select(F.col('code'), F.col('population')), on='code', how='inner')

# Join to br_income
joined_df2 = joined_df1.join(br_income.select(F.col('state'), F.col('resident_income_2016')), joined_df1.name == br_income.state, how='inner').drop(F.col('state'))

# Add calculated columns for pop_density_2016 and market_potential
dim_br_states_2016 = joined_df2.withColumn('pop_density_2016', F.col('population') / F.col('area_sqkm')).withColumn('market_potential', F.col('population') * F.col('resident_income_2016'))


# Overwrite GoldLakehouse.dim_br_states_2016
dim_br_states_2016.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("GoldLakehouse.dim_br_states_2016")


StatementMeta(, 44c59933-fbe3-4d3e-9cef-fa5e5e2ba998, 23, Finished, Available, Finished)

In [22]:
# Populate dim_br_states_2017

from pyspark.sql import functions as F

# Read silver tables
br_states = spark.table("br_states")
br_income = spark.table("br_income")
state_pop_2017 = spark.table("state_pop_2017")

# Join br_states to state_pop_2017
joined_df1 = br_states.select(F.col('code'), F.col('name'), F.col('area_sqkm')).join(state_pop_2017.select(F.col('code'), F.col('population')), on='code', how='inner')

# Join to br_income
joined_df2 = joined_df1.join(br_income.select(F.col('state'), F.col('resident_income_2017')), joined_df1.name == br_income.state, how='inner').drop(F.col('state'))

# Add calculated columns for pop_density_2017 and market_potential
dim_br_states_2017 = joined_df2.withColumn('pop_density_2017', F.col('population') / F.col('area_sqkm')).withColumn('market_potential', F.col('population') * F.col('resident_income_2017'))


# Overwrite GoldLakehouse.dim_br_states_2017
dim_br_states_2017.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("GoldLakehouse.dim_br_states_2017")


StatementMeta(, 44c59933-fbe3-4d3e-9cef-fa5e5e2ba998, 24, Finished, Available, Finished)

In [23]:
# Populate dim_br_states_2018

from pyspark.sql import functions as F

# Read silver tables
br_states = spark.table("br_states")
br_income = spark.table("br_income")
state_pop_2018 = spark.table("state_pop_2018")

# Join br_states to state_pop_2018
joined_df1 = br_states.select(F.col('code'), F.col('name'), F.col('area_sqkm')).join(state_pop_2018.select(F.col('code'), F.col('population')), on='code', how='inner')

# Join to br_income
joined_df2 = joined_df1.join(br_income.select(F.col('state'), F.col('resident_income_2018')), joined_df1.name == br_income.state, how='inner').drop(F.col('state'))

# Add calculated columns for pop_density_2018 and market_potential
dim_br_states_2018 = joined_df2.withColumn('pop_density_2018', F.col('population') / F.col('area_sqkm')).withColumn('market_potential', F.col('population') * F.col('resident_income_2018'))


# Overwrite GoldLakehouse.dim_br_states_2018
dim_br_states_2018.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("GoldLakehouse.dim_br_states_2018")


StatementMeta(, 44c59933-fbe3-4d3e-9cef-fa5e5e2ba998, 25, Finished, Available, Finished)

In [8]:
from pyspark.sql import functions as F

# Populate fact_sales

fact_orders = spark.table("GoldLakehouse.dbo.fact_orders")
dim_customers = spark.table("GoldLakehouse.dbo.dim_customers")

fact_sales = fact_orders \
    .join(dim_customers, "customer_id", "left") \
    .select(
        F.col("order_purchase_timestamp").cast("date").alias("date_id"),
        F.col("order_id"),
        F.col("total_order_value"),
        F.col("customer_id"),
        F.col("customer_unique_id"),
        F.col("customer_state")
  )
# fact_sales.show()

# Overwrite GoldLakehouse.fact_sales
fact_sales.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("GoldLakehouse.fact_sales")


StatementMeta(, 3a9d6a18-7c3b-436b-94f2-2c05e06c6944, 10, Finished, Available, Finished)