In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable

In [0]:
%run /Workspace/Users/eshivee@gmail.com/Atlikon_SportsBar_Data_Pipeline/Set_Up/utilities

In [0]:
bronze_schema, silver_schema

In [0]:
dbutils.widgets.text('catalog', 'sportats', 'Catalog')
catalog = dbutils.widgets.get('catalog')
dbutils.widgets.text('data_source', 'orders', 'Data Source')
data_source = dbutils.widgets.get('data_source')

basepath = f's3://sportsbarsa/{data_source}'
landing_path = f'{basepath}/landing'
processed_path = f'{basepath}/processed'
print(f'basepath: {basepath}')
print(f'landing_path: {landing_path}')
print(f'processed_path: {processed_path}')

bronze_table = f"{catalog}.{bronze_schema}.{data_source}"
silver_table = f"{catalog}.{silver_schema}.{data_source}"
gold_table = f"{catalog}.{gold_schema}.sb_fact_{data_source}"

In [0]:
df = spark.read.format('csv')\
    .option('header', True)\
    .option('inferSchema', True)\
    .load(f'{landing_path}/*.csv')\
    .withColumn('read_timestamp', current_timestamp())\
    .select('*' , '_metadata.file_name', '_metadata.file_size')

df.display()
#incremental data  has now been uploaded into the landing folder
df.count()

In [0]:
df_old = spark.read.table('sportats.bronze.orders')
print('total_rows_before append', df_old.count())


df.write.format('delta')\
    .mode('append')\
    .option('delta.enableChangeDataFeed', 'true')\
    .saveAsTable(bronze_table)


df_new = spark.read.table('sportats.bronze.orders')
print('total_rows_after append', df_new.count())

In [0]:
df.write.mode('overwrite')\
    .format('delta')\
    .option('delta.enableChangeDataFeed', 'true')\
    .saveAsTable(f'{catalog}.{bronze_schema}.stg_{data_source}')

In [0]:
files = dbutils.fs.ls(landing_path)
for file in files:
    dbutils.fs.mv(file.path, f'{processed_path}/{file.name}', True)

**SILVER LAYER STAGING ORDERS**

In [0]:
df_bronze = spark.read.table(f'{catalog}.{bronze_schema}.stg_{data_source}')
df_bronze.limit(4).display()

In [0]:
print('rows_with_null_qty', df_bronze.count())

df_silver = df_bronze.filter(col('order_qty').isNotNull())

print('rows_after_removing_nulls:',df_silver.count())

In [0]:
#lets take care of the customer ids that are not numeric

df_silver = df_silver.withColumn('customer_id', when(col("customer_id").rlike('^[0-9]+$'), col("customer_id"))\
                .otherwise('999999').cast('string'))


In [0]:
df_silver = df_silver.withColumn(
    'order_placement_date',
    regexp_replace(col("order_placement_date"), r"^[A-Za-z]+,\s*", "")
)

df_silver.display()

In [0]:
df_silver = df_silver.withColumn(
    'order_placement_date',
    coalesce(
        try_to_date('order_placement_date', 'yyyy/MM/dd'),
        try_to_date('order_placement_date', 'dd-MM-yyyy'),
        try_to_date('order_placement_date', 'dd/MM/yyyy'),
        try_to_date('order_placement_date', 'MMMM dd, yyyy')
    )
)

In [0]:
df_silver.count()

In [0]:
df_silver = df_silver.dropDuplicates(subset =['order_id', 'order_placement_date', 'customer_id', 'product_id', 'order_qty'])

df_silver.count()

In [0]:
df_silver = df_silver.withColumn('product_id', col('product_id').cast('string'))

df_silver.printSchema()

In [0]:
df_products = spark.read.table('sportats.silver.products')
df_joined = df_silver.join(df_products.select('product_id', "product_code"), on = 'product_id', how = 'inner')

In [0]:
df_joined.limit(10).display()

In [0]:
if spark.catalog.tableExists(silver_table):
    delta_obj = DeltaTable.forName(spark, silver_table)
    delta_obj.alias('trg').merge(df_joined.alias('src'),
                "trg.order_placement_date = src.order_placement_date AND\
                trg.order_id = src.order_id AND\
                trg.product_code = src.product_code AND\
                trg.customer_id = src.customer_id"              
                ).whenMatchedUpdateAll()\
                .whenNotMatchedInsertAll()\
                .execute()

#technically, we are merging the bronze df with the silver table. with the bronze table being the source and the silver table being the target.
else:
    df_joined.write.mode('overwrite')\
        .format('delta')\
        .option('delta.enableChangeDataFeed', 'true')\
        .option('mergeSchema', 'true')\
        .saveAsTable(silver_table)

In [0]:
df_joined.write.format('delta')\
    .mode('overwrite')\
    .option('delta.enableChangeDataFeed', 'true')\
    .saveAsTable(f'{catalog}.{silver_schema}.stg_{data_source}')

**GOLD LAYER ORDERS STAGING**

In [0]:
df_silver = spark.read.table(f'{catalog}.{silver_schema}.stg_{data_source}')

df_silver.display()

In [0]:
df_gold = df_silver.select(col('order_id'), col('order_placement_date').alias('date'), col('customer_id').alias('customer_code'), col('product_code'), col("product_id"), col('order_qty').alias('sold_quantity'))
df_gold.limit(5).display()

In [0]:
if spark.catalog.tableExists(gold_table):
        dlt_obj = DeltaTable.forName(spark, gold_table)
        dlt_obj.alias('trg').merge(df_gold.alias('src'), "trg.date = src.date AND\
                                                        trg.order_id = src.order_id AND\
                                                        trg.product_code = src.product_code AND\
                                                        trg.customer_code = src.customer_code")\
                                                        .whenMatchedUpdateAll()\
                                                        .whenNotMatchedInsertAll()\
                                                        .execute()

else:
    df_gold.write.mode('overwrite')\
        .format('delta')\
        .option('delta.enableChangeDataFeed', 'true')\
        .option('mergeSchema', 'true')\
        .saveAsTable(gold_table)


In [0]:
df_gold.display()

In [0]:
df_child = spark.read.table(f'{catalog}.{silver_schema}.stg_{data_source}')

df_child_inc = df_child.select(trunc('order_placement_date', 'MM').alias('start_month')).distinct()
df_child_inc.display()
df_child_inc.createOrReplaceTempView('df_child_inc_v')

In [0]:
%sql
select * from df_child_inc_v

In [0]:
spark.read.table(f'{catalog}.{gold_schema}.sb_fact_{data_source}').display()

In [0]:
monthly_table = spark.read.table(f'{catalog}.{gold_schema}.sb_fact_{data_source}')

monthly_table = monthly_table.withColumn('date', trunc('date', 'MM'))
monthly_table.display()
monthly_table.select('date', "product_code", "customer_code", "sold_quantity")
monthly_table.filter(col('date') == '2025-12-01').display()
monthly_table.count()

In [0]:
monthly_table = monthly_table.join(df_child_inc, monthly_table['date'] == df_child_inc['start_month'])
print('Total Rows:', monthly_table.count())
#basically, you want to get df_gold back. That is the data that was just added for either december 01, 02, etc

In [0]:
monthly_table.select('date').distinct().display()

In [0]:
df_sum_monthly = monthly_table.groupBy('date', "product_code", 'customer_code')\
              .agg(sum('sold_quantity').alias('sold_quantity'))

In [0]:
df_sum_monthly.count()

In [0]:
gold_parent_delta = DeltaTable.forName(spark, f'{catalog}.{gold_schema}.fact_orders')
gold_parent_delta.alias('trg').merge(df_sum_monthly.alias('src'), 'trg.date = src.date AND\
                                                                   trg.product_code = src.product_code AND\
                                                                   trg.customer_code = src.product_code')\
                                                                   .whenMatchedUpdateAll()\
                                                                   .whenNotMatchedInsertAll()\
                                                                   .execute()

In [0]:
%sql
-- drop tables not are not needed again

DROP TABLE sportats.bronze.stg_orders;
DROP TABLE sportats.silver.stg_orders;
