In [0]:
import logging
from pyspark.sql.functions import current_timestamp, row_number, col
from pyspark.sql import Window

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class DatabaseInitializer:
    def __init__(self, spark):
        self.spark = spark

    def init_databases(self):
        try:
            self.spark.sql("CREATE DATABASE IF NOT EXISTS bronze")
            self.spark.sql("CREATE DATABASE IF NOT EXISTS silver")
            self.spark.sql("CREATE DATABASE IF NOT EXISTS gold")
            logging.info("Databases initialized successfully.")
        except Exception as e:
            logging.error(f"Error initializing databases: {e}")

    def get_config(self):
        return [
        {
            "file_path": "dbfs:/FileStore/shared_uploads/saiprasadpadhy@gmail.com/media_customer_reviews",
            "tbl_name": "media_customer_reviews",
            "primary_keys": ["new_id"],
            "type": "dim"
        },
        {
            "file_path": "dbfs:/FileStore/shared_uploads/saiprasadpadhy@gmail.com/media_gold_reviews_chunked",
            "tbl_name": "media_gold_reviews_chunked",
            "primary_keys": ["franchiseID", "chunk_id"],
            "type": "fact"
        },
        {
            "file_path": "dbfs:/FileStore/shared_uploads/saiprasadpadhy@gmail.com/sales_customers",
            "tbl_name": "sales_customers",
            "primary_keys": ["customerID"],
            "type": "dim"
        },
        {
            "file_path": "dbfs:/FileStore/shared_uploads/saiprasadpadhy@gmail.com/sales_franchises",
            "tbl_name": "sales_franchises",
            "primary_keys": ["franchiseID"],
            "type": "dim"
        },
        {
            "file_path": "dbfs:/FileStore/shared_uploads/saiprasadpadhy@gmail.com/sales_suppliers",
            "tbl_name": "sales_suppliers",
            "primary_keys": ["supplierID"],
            "type": "dim"
        },
        {
            "file_path": "dbfs:/FileStore/shared_uploads/saiprasadpadhy@gmail.com/sales_transactions",
            "tbl_name": "sales_transactions",
            "primary_keys": ["transactionID"],
            "type": "fact"
        }
    ]


class BronzeLayerLoader:
    def __init__(self, spark):
        self.spark = spark

    def load(self, db: str, parquet_path: str, table_name: str):
        try:
            table_exists = self.spark.catalog.tableExists(f"{db}.{table_name}")
            df = self.spark.read.parquet(parquet_path).withColumn('inserted_at', current_timestamp())
            mode = "append" if table_exists else "overwrite"
            df.write.format("delta").mode(mode).saveAsTable(f"{db}.{table_name}")
            logging.info(f"Loaded data into bronze table: {db}.{table_name}")
        except Exception as e:
            logging.error(f"Failed to load data into bronze table {db}.{table_name}: {e}")
    
    def load_with_autoloader(self, db: str, parquet_path: str, table_name: str):
        try:
            table_exists = self.spark.catalog.tableExists(f"{db}.{table_name}")
            df = self.spark.readStream \
                .format("parquet") \
                .option("cloudFiles.format", "parquet") \
                .option("cloudFiles.schemaLocation", f"/tmp/schema/{db}/{table_name}")\
                .load(parquet_path) \
                .withColumn("inserted_at", current_timestamp())

            checkpoint_path = f"/tmp/checkpoints/{db}_{table_name}"

            query = df.writeStream \
                .format("delta") \
                .option("checkpointLocation", checkpoint_path) \
                .outputMode("append") \
                .trigger(once=True) \
                .toTable(f"{db}.{table_name}")

            query.awaitTermination()
            logging.info(f"Auto Loader loaded data into bronze table: {db}.{table_name}")
        except Exception as e:
            logging.error(f"Auto Loader failed for bronze table {db}.{table_name}: {e}")


    def cleanup(self, table_name: str):
        table_path = f"/user/hive/warehouse/{table_name}"
        try:
            dbutils.fs.rm(f"dbfs:{table_path}", True)
            logging.info(f"Removed files for table: {table_name}")
        except Exception as e:
            logging.warning(f"Files not found or failed to delete for {table_name}: {e}")
        try:
            self.spark.sql(f"DROP TABLE {table_name}")
            logging.info(f"Dropped table: {table_name}")
        except Exception as e:
            logging.warning(f"Table not found or failed to drop: {table_name}: {e}")


class DataTransformer:
    def __init__(self, spark):
        self.spark = spark

    def scd1_merge(self, src_db: str, dest_db: str, tbl_name: str, primary_keys: list):
        try:   
            window_spec = Window.partitionBy(primary_keys).orderBy(col("inserted_at").desc())
            df_bronze = (
                self.spark.table(f"{src_db}.{tbl_name}")
                .withColumn("row_num", row_number().over(window_spec))
                .filter("row_num = 1")
                .drop("row_num")
                .withColumn("updated_at", current_timestamp())
                .drop("inserted_at")
            )

            df_bronze.createOrReplaceTempView("bronze_tmp")
            table_exists = self.spark.catalog.tableExists(f"{dest_db}.{tbl_name}")

            if not table_exists:
                df_bronze.write.format("delta").mode("overwrite").saveAsTable(f"{dest_db}.{tbl_name}")
                logging.info(f"Created new silver table: {tbl_name}")
                return

            pk_conditions = " AND ".join([f"target.{col} = source.{col}" for col in primary_keys])
            merge_sql = f"""
                MERGE INTO {dest_db}.{tbl_name} AS target
                USING bronze_tmp AS source
                ON {pk_conditions}
                WHEN MATCHED THEN UPDATE SET *
                WHEN NOT MATCHED THEN INSERT *
            """
            self.spark.sql(merge_sql)
            logging.info(f"SCD Type 1 merge completed: {src_db}.{tbl_name} -> {dest_db}.{tbl_name}")
        except Exception as e:
            logging.error(f"Error during SCD1 merge for {tbl_name}: {e}")

    def load_to_gold(self, src_db: str, dest_db: str, tbl_name: str, tbl_type: str):
        try:
            df = self.spark.table(f"{src_db}.{tbl_name}")
            df.write.format("delta").mode("overwrite").saveAsTable(f"{dest_db}.{tbl_type}_{tbl_name}")
            logging.info(f"Loaded data into gold table: {dest_db}.{tbl_type}_{tbl_name}")
        except Exception as e:
            logging.error(f"Failed to load gold table {tbl_name}: {e}")


In [0]:
initializer = DatabaseInitializer(spark)
initializer.init_databases()
config = initializer.get_config()

bronze_loader = BronzeLayerLoader(spark)
transformer = DataTransformer(spark)

for entry in config:
    bronze_loader.load_with_autoloader("bronze", entry["file_path"], entry["tbl_name"])
    transformer.scd1_merge("bronze", "silver", entry["tbl_name"], entry["primary_keys"])
    transformer.load_to_gold("silver", "gold", entry["tbl_name"], entry["type"])


2025-05-20 16:07:51,211 - INFO - Received command c on object id p0
2025-05-20 16:07:51,285 - INFO - Databases initialized successfully.
2025-05-20 16:07:51,450 - ERROR - Auto Loader failed for bronze table bronze.media_customer_reviews: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.
2025-05-20 16:07:59,099 - INFO - SCD Type 1 merge completed: bronze.media_customer_reviews -> silver.media_customer_reviews
2025-05-20 16:08:03,790 - INFO - Loaded data into gold table: gold.dim_media_customer_reviews
2025-05-20 16:08:04,028 - ERROR - Auto Loader failed for bronze table bronze.media_gold_reviews_chunked: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to

## Most sold products to identify the top-selling items

In [0]:
df = spark.table("gold.fact_sales_transactions")
result_df = df.groupBy("product") \
              .sum("quantity") \
              .withColumnRenamed("sum(quantity)", "no_of_products_sold") \
              .orderBy("no_of_products_sold", ascending=False)
result_df.show()

2025-05-20 10:35:17,062 - INFO - Received command c on object id p0


+--------------------+-------------------+
|             product|no_of_products_sold|
+--------------------+-------------------+
|  Golden Gate Ginger|               3865|
|     Outback Oatmeal|               3733|
|Austin Almond Bis...|               3716|
|       Tokyo Tidbits|               3662|
|         Pearly Pies|               3595|
|       Orchard Oasis|               3586|
+--------------------+-------------------+



## Suppliers provide ingredients to the most franchises

In [0]:
df = spark.table("gold.dim_sales_franchises")
result_df = df.groupBy("supplierID") \
              .count() \
              .withColumnRenamed("count", "no_of_franchises") \
              .orderBy("no_of_franchises", ascending=False)
result_df.show()

+----------+----------------+
|supplierID|no_of_franchises|
+----------+----------------+
|   4000022|               1|
|   4000034|               1|
|   4000021|               1|
|   4000005|               1|
|   4000003|               1|
|   4000044|               1|
|   4000004|               1|
|   4000037|               1|
|   4000039|               1|
|   4000047|               1|
|   4000045|               1|
|   4000031|               1|
|   4000009|               1|
|   4000015|               1|
|   4000019|               1|
|   4000013|               1|
|   4000026|               1|
|   4000018|               1|
|   4000028|               1|
|   4000032|               1|
+----------+----------------+
only showing top 20 rows



## Total sales per month

In [0]:
from pyspark.sql.functions import month, sum, col, to_date, expr

df = spark.table("gold.fact_sales_transactions")

df = df.withColumn("totalPrice", col("totalPrice").cast("double"))
df = df.withColumn("sales_month", expr("extract(month from dateTime)"))

result_df = df.groupBy("sales_month") \
              .agg(sum("totalPrice").alias("sales_amount")) \
              .orderBy("sales_amount", ascending=False)
result_df.show()

2025-05-20 10:36:47,478 - INFO - Received command c on object id p0


+-----------+------------+
|sales_month|sales_amount|
+-----------+------------+
|          5|     66471.0|
+-----------+------------+

