In [0]:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SplitCSVFiles").getOrCreate()

sales_df = spark.read.csv("/Volumes/mini_project_12/mini_/data/sales.csv", header=True, inferSchema=True)

output_path = "/dbfs/mnt/raw/sales_split"
sales_df.repartition(10).write.csv(output_path, header=True, mode="overwrite")

print("CSV files split and stored in:", output_path)


from pyspark.sql.functions import current_timestamp, input_file_name
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

schema = StructType([
    StructField("CUST_ID", StringType(), True),
    StructField("AMOUNT_SOLD", IntegerType(), True),
    StructField("source_file", StringType(), True),
    StructField("loaded_ts", TimestampType(), True)
])


spark.sql("CREATE SCHEMA IF NOT EXISTS bronze")

def load_to_bronze_layer(file_path, table_name):
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    df = df.withColumn("source_file", input_file_name()) \
           .withColumn("loaded_ts", current_timestamp())

    df.write.format("delta").mode("append").partitionBy("loaded_ts").saveAsTable(f"bronze.{table_name}")

load_to_bronze_layer("/Volumes/mini_project_12/mini_/data/sales.csv", "sales_stg")

data_files = [
    "/Volumes/mini_project_12/mini_/data/costs.csv",
    "/Volumes/mini_project_12/mini_/data/customers.csv",
    "/Volumes/mini_project_12/mini_/data/promotions.csv",
    "/Volumes/mini_project_12/mini_/data/supplementary_demographics.csv",
    "/Volumes/mini_project_12/mini_/data/times.csv"
]

data_table_names = [
    "cost_stg",
    "customer_stg",
    "promotions_stg",
    "supplementary_demographics_stg",
    "times_stg"
]

for file_path, table_name in zip(data_files, data_table_names):
    load_to_bronze_layer(file_path, table_name)

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

spark.sql("CREATE SCHEMA IF NOT EXISTS silver")

def create_silver_table(source_table, target_table, primary_key):
    df = spark.sql(f"SELECT * FROM bronze.{source_table}")

    window_spec = Window.partitionBy(primary_key).orderBy("loaded_ts")
    df = df.withColumn("rank", row_number().over(window_spec))
    df = df.filter("rank = 1").drop("rank")
    df.write.format("delta").mode("overwrite").saveAsTable(f"silver.{target_table}")

create_silver_table("customer_stg", "customers_edw", "CUST_ID")

edw_mappings = [
    ("sales_stg", "sales_edw", "PROD_ID"),
    ("promotions_stg", "promotions_edw", "PROMO_ID"),
    ("supplementary_demographics_stg", "supplementary_demographics_edw", "CUST_ID"),
    ("times_stg", "times_edw", "TIME_ID"),
    ("cost_stg", "cost_edw", "PROD_ID")
]

for source, target, key in edw_mappings:
    create_silver_table(source, target, key)
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

def create_high_priority_view():
    query = """
    SELECT 
        CONCAT(c.CUST_FIRST_NAME, ' ', c.CUST_LAST_NAME) AS full_name,
        c.CUST_EMAIL AS email,
        c.CUST_ID,
        SUM(s.AMOUNT_SOLD) AS total_amount
    FROM silver.customers_edw AS c
    JOIN silver.sales_edw AS s
    ON c.CUST_ID = s.CUST_ID
    GROUP BY c.CUST_FIRST_NAME, c.CUST_LAST_NAME, c.CUST_EMAIL, c.CUST_ID
    HAVING SUM(s.AMOUNT_SOLD) > 1500
    """
    
    high_priority_df = spark.sql(query)

    spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

    high_priority_df.write.format("delta").mode("overwrite").saveAsTable("gold.high_priority_customer")

create_high_priority_view()


CSV files split and stored in: /dbfs/mnt/raw/sales_split
