In [0]:
from pyspark.sql.functions import current_timestamp

def init():
    spark.sql("create database if not exists bronze")
    spark.sql("create database if not exists silver")
    spark.sql("create database if not exists gold")

    config = [
        {
            "file_path": "/FileStore/shared_uploads/lakshmisravani208@gmail.com/media_customer_reviews.parquet",
            "tbl_name": "media_customer_reviews",
            "primary_keys": ["new_id"],
            "type": "dim"
        },
        {
            "file_path": "/FileStore/shared_uploads/lakshmisravani208@gmail.com/media_gold_reviews_chunked.parquet",
            "tbl_name": "media_gold_reviews_chunked",
            "primary_keys": ["franchiseID", "chunk_id"],
            "type": "fact"
        },
        {
            "file_path": "/FileStore/shared_uploads/lakshmisravani208@gmail.com/sales_customers.parquet",
            "tbl_name": "sales_customers",
            "primary_keys": ["customerID"],
            "type": "dim"
        },
        {
            "file_path": "/FileStore/shared_uploads/lakshmisravani208@gmail.com/sales_franchises.parquet",
            "tbl_name": "sales_franchises",
            "primary_keys": ["franchiseID"],
            "type": "dim"
        },
        {
            "file_path": "/FileStore/shared_uploads/lakshmisravani208@gmail.com/sales_suppliers.parquet",
            "tbl_name": "sales_suppliers",
            "primary_keys": ["supplierID"],
            "type": "dim"
        },
        {
            "file_path": "/FileStore/shared_uploads/lakshmisravani208@gmail.com/sales_transactions.parquet",
            "tbl_name": "sales_transactions",
            "primary_keys": ["transactionID"],
            "type": "fact"
        }
    ]
    print("Initialized DB & created config")
    return config


def load_to_bronze_table(db: str, parquet_path: str, table_name: str):

    table_exists = spark.catalog.tableExists(f"{db}.{table_name}")
    df = spark.read.parquet(parquet_path)
    df = df.withColumn('inserted_at', current_timestamp())
    if table_exists:
        df.write.format("delta").mode("append").saveAsTable(f"{db}.{table_name}")
    else:
        df.write.format("delta").mode("overwrite").saveAsTable(f"{db}.{table_name}")
        
def load_with_autoloader(self, db: str, parquet_path: str, table_name: str):
        try:
            table_exists = self.spark.catalog.tableExists(f"{db}.{table_name}")
            df = self.spark.readStream \
                .format("parquet") \
                .option("cloudFiles.format", "parquet") \
                .option("cloudFiles.schemaLocation", f"/tmp/schema/{db}/{table_name}")\
                .load(parquet_path) \
                .withColumn("inserted_at", current_timestamp())

            checkpoint_path = f"/tmp/checkpoints/{db}_{table_name}"

            query = df.writeStream \
                .format("delta") \
                .option("checkpointLocation", checkpoint_path) \
                .outputMode("append") \
                .trigger(once=True) \
                .toTable(f"{db}.{table_name}")

            query.awaitTermination()
            logging.info(f"Auto Loader loaded data into bronze table: {db}.{table_name}")
        except Exception as e:
            logging.error(f"Auto Loader failed for bronze table {db}.{table_name}: {e}")

def cleanup(table_name):
    
    table_path = f"/user/hive/warehouse/{table_name}"
    try:
        dbutils.fs.rm(f"dbfs:{table_path}", True)
    except:
        print(f"{table} files Not found")
    try:
        spark.sql(f"DROP TABLE {table}")
    except:
        print(f"{table} Not found")

def scd1_merge_bronze_to_silver_sql(src_db: str, dest_db: str, tbl_name: str, primary_keys: list):

    df_bronze = spark.table(f"{src_db}.{tbl_name}").withColumn("updated_at", current_timestamp()).drop("inserted_at")
    df_bronze = df_bronze.dropDuplicates(subset=primary_keys)
    df_bronze.createOrReplaceTempView("bronze_tmp")
    table_exist = spark.catalog.tableExists(f"{dest_db}.{tbl_name}")

    if not table_exist:
        df_bronze.write.format("delta").mode("overwrite").saveAsTable(f"{dest_db}.{tbl_name}")
        print(f"Created new silver table: {tbl_name}")
        return
    
    pk_conditions = " AND ".join([f"target.{col} = source.{col}" for col in primary_keys])
    merge_sql = f"""
        MERGE INTO {dest_db}.{tbl_name} AS target
        USING bronze_tmp AS source
        ON {pk_conditions}
        WHEN MATCHED THEN
          UPDATE SET *
        WHEN NOT MATCHED THEN
          INSERT *
    """
    spark.sql(merge_sql)
    print(f"SCD Type 1 merge completed: {src_db}.{tbl_name} -> {dest_db}.{tbl_name}")

def load_to_gold_table(src_db: str,dest_db: str, tbl_name: str, tbl_type: str):

    df = spark.table(f"{src_db}.{tbl_name}")
    df.write.format("delta").mode("overwrite").saveAsTable(f"{dest_db}.{tbl_type}_{tbl_name}")



In [0]:
for entry in config:
    load_with_autoloader("bronze", entry["file_path"], entry["tbl_name"])
    scd1_merge_bronze_to_silver_sql("bronze", "silver", entry["tbl_name"], entry["primary_keys"])
    load_to_gold_table("silver", "gold", entry["tbl_name"], entry["type"])

## Most sold products to identify the top-selling items

In [0]:
df = spark.table("gold.fact_sales_transactions")
result_df = df.groupBy("product") \
              .sum("quantity") \
              .withColumnRenamed("sum(quantity)", "no_of_products_sold") \
              .orderBy("no_of_products_sold", ascending=False)
result_df.show()

+--------------------+-------------------+
|             product|no_of_products_sold|
+--------------------+-------------------+
|  Golden Gate Ginger|               3865|
|     Outback Oatmeal|               3733|
|Austin Almond Bis...|               3716|
|       Tokyo Tidbits|               3662|
|         Pearly Pies|               3595|
|       Orchard Oasis|               3586|
+--------------------+-------------------+



## Suppliers provide ingredients to the most franchises

In [0]:
df = spark.table("gold.dim_sales_franchises")
result_df = df.groupBy("supplierID") \
              .count() \
              .withColumnRenamed("count", "no_of_franchises") \
              .orderBy("no_of_franchises", ascending=False)
result_df.show()

+----------+----------------+
|supplierID|no_of_franchises|
+----------+----------------+
|   4000022|               1|
|   4000034|               1|
|   4000021|               1|
|   4000005|               1|
|   4000003|               1|
|   4000044|               1|
|   4000004|               1|
|   4000037|               1|
|   4000039|               1|
|   4000047|               1|
|   4000045|               1|
|   4000031|               1|
|   4000009|               1|
|   4000015|               1|
|   4000019|               1|
|   4000013|               1|
|   4000026|               1|
|   4000018|               1|
|   4000028|               1|
|   4000032|               1|
+----------+----------------+
only showing top 20 rows



## Total sales per month

In [0]:
from pyspark.sql.functions import month, sum, col, to_date, expr

df = spark.table("gold.fact_sales_transactions")

df = df.withColumn("totalPrice", col("totalPrice").cast("double"))
df = df.withColumn("sales_month", expr("extract(month from dateTime)"))

result_df = df.groupBy("sales_month") \
              .agg(sum("totalPrice").alias("sales_amount")) \
              .orderBy("sales_amount", ascending=False)
result_df.show()

+-----------+------------+
|sales_month|sales_amount|
+-----------+------------+
|          5|     66471.0|
+-----------+------------+

