In [51]:
import pyspark
import os
from pyspark.sql.functions import col, lit, when, concat, expr, date_format, year, month, dayofmonth, dayofweek, hour, minute, second, quarter, weekofyear
from dotenv import load_dotenv
from delta import * 
from pyspark.sql.types import DecimalType

In [52]:
load_dotenv('.././.env')
access_key = os.getenv("API_KEY")
secret_key = os.getenv("SECRET_ACCESS_KEY")
print(access_key)

AKIAZQ3DTLU6MHKI5HLC


In [53]:
conf = (
    pyspark.conf.SparkConf()
    .setAppName("MY_APP")
    .set(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .set("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")
    .set("spark.hadoop.fs.s3a.access.key", access_key)
    .set("spark.hadoop.fs.s3a.secret.key", secret_key)
    .set("spark.hadoop.fs.s3a.region", "ap-southeast-2")
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")
    .set("spark.sql.shuffle.partitions", "4")
    .setMaster(
        "local[*]"
    )  # replace the * with your desired number of cores. * for use all.
)

extra_packages = [
    "org.apache.hadoop:hadoop-aws:3.3.4",
    "org.apache.hadoop:hadoop-common:3.3.4",
    "com.amazonaws:aws-java-sdk-bundle:1.12.262",
]

builder = pyspark.sql.SparkSession.builder.appName("MyApp").config(conf=conf)
spark = configure_spark_with_delta_pip(
    builder, extra_packages=extra_packages
).getOrCreate()

In [54]:
df = spark.read.format("delta").load("s3a://vdt2025/menu_silver")

In [None]:
dim_dish_df = df.select(
    col("dish_id").alias("dish_id"),
    col("dish_name"),
    col("description"),
    col("menus_appeared"),
    col("times_appeared"),
    col("first_appeared").cast("short"),
    col("last_appeared").cast("short"),
    col("lowest_price").cast("decimal(10,3)"),
    col("highest_price").cast("decimal(10,3)")
).distinct()

In [None]:
dim_menu_df = df.select(
    col("menu_id"),
    col("name").alias("menu_name"),
    col("sponsor"),
    col("event"),
    col("venue"),
    col("place"),
    col("physical_description"),
    col("occasion"),
    col("notes"),
    col("call_number"),
    col("keywords"),
    col("language"),
    col("location"),
    col("location_type"),
    col("currency"),
    col("currency_symbol"),
    col("status"),
    col("page_count").cast("short"),
    col("dish_count").cast("short")
).distinct()

In [None]:
dim_menu_page_df = df.select(
    col("menu_page_id"),
    col("menu_id"), 
    col("page_number").cast("short"),
    col("image_id"),
    col("full_height").cast("short"),
    col("full_width").cast("short"),
    col("uuid")
).distinct()

In [55]:
dim_date_df = df.select(col("date")).distinct().withColumn("full_date", col("date")) \
    .withColumn("date_key", date_format(col("date"), "yyyyMMdd").cast("integer")) \
    .withColumn("year", year(col("date")).cast("short")) \
    .withColumn("month", month(col("date")).cast("short")) \
    .withColumn("month_name", date_format(col("date"), "MMMM")) \
    .withColumn("day_of_month", dayofmonth(col("date")).cast("short")) \
    .withColumn("day_of_week", dayofweek(col("date")).cast("short")) \
    .withColumn("day_name", date_format(col("date"), "EEEE")) \
    .withColumn("week_of_year", weekofyear(col("date")).cast("short")) \
    .withColumn("quarter", quarter(col("date")).cast("short")) \
    .withColumn("is_weekend", (dayofweek(col("date")) == 1) | (dayofweek(col("date")) == 7)) \
    .select("date_key", "full_date", "year", "month", "month_name", "day_of_month", "day_of_week", "day_name", "week_of_year", "quarter", "is_weekend")


In [56]:
fact_menu_item_performance_df = df.select(
    col("id").alias("menu_item_id").cast("integer"), 
    col("menu_page_id").cast("integer"),
    col("menu_id").cast("integer"),
    col("dish_id").cast("integer"),
    date_format(col("date"), "yyyyMMdd").alias("date_id").cast("integer"),
    col("created_at").cast("timestamp").alias("created_at_datetime"),
    col("updated_at").cast("timestamp").alias("updated_at_datetime"),
    col("price").cast("decimal(10,3)"),
    col("high_price").cast("decimal(10,3)"),
    col("xpos").cast("double"),
    col("ypos").cast("double")
)

In [None]:
dim_dish_df = dim_dish_df.write.format("delta").mode("overwrite").save("s3a://vdt2025/menu_gold/dim_dish")

In [None]:
dim_menu_df = dim_menu_df.write.format("delta").mode("overwrite").save("s3a://vdt2025/menu_gold/dim_menu")

In [None]:
dim_menu_page_df = dim_menu_page_df.write.format("delta").mode("overwrite").save("s3a://vdt2025/menu_gold/dim_menu_page")

In [57]:
dim_date_df = dim_date_df.write.format("delta").mode("overwrite").save("s3a://vdt2025/menu_gold/dim_date")

                                                                                

In [58]:
fact_menu_item_performance_df = fact_menu_item_performance_df.write.format("delta").mode("overwrite").save("s3a://vdt2025/menu_gold/fact_menu_item_performance")

25/06/18 11:26:45 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                