In [0]:
# import section
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql.window import Window
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('orders').getOrCreate()


In [0]:
%run /Workspace/apache-spark/databricks-project-fmcg-sports/utils/utilities

In [0]:
# setup the catlog, datasource and s3 paths
dbutils.widgets.text('catlog','fmcg','catlog')
dbutils.widgets.text('datasource','orders','datasource')

catlog      = dbutils.widgets.get('catlog')
datasource  = dbutils.widgets.get('datasource')

base_path   = f'{s3_bucket}/{datasource}'
landing_path = f'{base_path}/landing'
processed_path = f'{base_path}/processed'

#print(catlog,datasource,base_path, landing_path, processed_path)

In [0]:
try:
    # read orders full load
    orders_bronze = (
        spark.read.format('csv')
        .option('header','true')
        .option('inferSchema','true')
        .load(f'{landing_path}/*.csv')
        .withColumns({
            'load_timestamp': f.current_timestamp(),
            'file_name': f.col('_metadata.file_name'),
            'file_size': f.col('_metadata.file_size'),
            'file_date': f.regexp_replace(
                    f.regexp_extract(f.col('file_name'),r"(\d{4}_\d{2}_\d{2})",1),
                    '_',
                    '-'
            ).cast('date')
        })
    )
    orders_bronze.count()
    orders_bronze.limit(10).display()
except Exception as e:
        print(e)

In [0]:
# save data to delta table
orders_bronze.write.mode('overwrite')\
.format('delta')\
.option('enableChangeDataFeed','true')\
.saveAsTable('fmcg.bronze.orders_bronze')

spark.sql('select count(*) from orders_bronze').display()

In [0]:
# now move the files from landing to processed folder
# extract all files
files = dbutils.fs.ls(landing_path)
if len(files) == 0:
    print('no files to move')
else:
    for file in files:
        dbutils.fs.mv(file.path, f"{processed_path}/{file.name}", True)


### Silver transformation

In [0]:
orders_silver = spark.read.table('fmcg.bronze.orders_bronze')
# remove the recors with order_qty is null
orders_silver = orders_silver.filter(f.col('order_qty').isNotNull())

orders_silver.display()

In [0]:
# remove record with null quantity null 
orders_silver.filter(f.col("order_qty").isNull()).count()
# validate to see if record remove
orders_silver.filter(f.col('product_id').cast('double').isNull()).count()

# for saver side removing all non digit product ids replace with 999999
# change the date format 
regex_col_expr = f.regexp_replace(
        f.col('order_placement_date'),r"^[A-Za-z]+,\s*",""
    )
orders_silver = orders_silver.withColumns({
    'product_id': f.when(
        f.col('product_id').rlike(r"^[0-9]+$"),
        f.col('product_id').cast('string')
    ).otherwise('999999').cast('string'),
    
    'order_placement_date': f.coalesce(
        f.try_to_date(regex_col_expr, 'MMMM dd, yyyy'),
        f.try_to_date(regex_col_expr, 'dd/MM/yyyy'),
        f.try_to_date(regex_col_expr, 'dd-MM-yyyy'),
        f.try_to_date(regex_col_expr, 'MM/dd/yyyy'),
        f.try_to_date(regex_col_expr, 'yyyy-MM-dd'),
        f.try_to_date(regex_col_expr, 'yyyy/MM/dd')
    )

})
orders_silver.display()


In [0]:
# check if there any duplicate recors
orders_silver.groupBy('order_id','customer_id','product_id','order_placement_date','order_qty').count().filter(f.col('count') > 1).count()

# Records shows there are 3616 duplicate records lets drop it
orders_silver = orders_silver.dropDuplicates(['order_id','customer_id','product_id','order_placement_date','order_qty'])

# re check if there any duplicate recors
orders_silver.groupBy('order_id','customer_id','product_id','order_placement_date','order_qty').count().filter(f.col('count') > 1).count()

In [0]:
# fetch min and max date
orders_silver.agg(
    f.min("order_placement_date").alias('min_date'),
    f.max("order_placement_date").alias('max_date')
).show()

In [0]:
product_silver = spark.read.table('fmcg.silver.product')
product_silver.limit(4).display()

In [0]:
# get the product key from product_silver and join with order_silver
joined_table = orders_silver.join(product_silver, on = "product_id", how= 'inner').select(orders_silver["*"], product_silver.product_key)
joined_table.count()

In [0]:
if not spark.catalog.tableExists(f'{catlog}.{silver_schema}.{datasource}'):
    #print('database found')
    joined_table.write.mode('overwrite')\
        .format('Delta')\
            .option('enableChangeDataFeed','true')\
                .option('mergeSchema','true')\
                    .saveAsTable(f'{catlog}.{silver_schema}.{datasource}')
    print('Table created for first time')
else:
    targetTable = DeltaTable.forName(spark, f'{catlog}.{silver_schema}.{datasource}')
    targetTable.alias('t').merge(
        joined_table.alias('s'),
        "t.order_id = s.order_id AND t.order_placement_date = s.order_placement_date AND t.customer_id = s.customer_id AND t.product_id = s.product_id AND t.order_qty = s.order_qty"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    
    print('database has been upserted successfull')

## Gold Transforamtion

In [0]:
orders_gold = spark.sql('SELECT order_id, order_placement_date AS date, customer_id, product_id, order_qty, product_key FROM fmcg.silver.orders')

In [0]:
if not spark.catalog.tableExists(f'{catlog}.{gold_schema}.sp_{datasource}'):
    orders_gold.write.mode('overwrite')\
        .format('Delta')\
        .option('enableChangeDataFeed','true')\
        .option('mergeSchema', 'true')\
        .saveAsTable(f'{catlog}.{gold_schema}.sp_{datasource}')
    print(f'Data writtern to {catlog}.{gold_schema}.sp_{datasource}')
else:
    targetTable = DeltaTable.forName(spark, f'{catlog}.{gold_schema}.sp_{datasource}')
    targetTable.alias('t').merge(
        source=orders_gold.alias('s'),
        condition="t.order_id = s.order_id AND t.date = s.date AND t.customer_id = s.customer_id AND t.product_id = s.product_id AND t.order_qty = s.order_qty"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    print(f'Data updated in {catlog}.{gold_schema}.sp_{datasource}')

In [0]:
# child table has all data which need to aggregate 
child_orders = spark.sql("SELECT date, customer_id, product_id, order_qty FROM fmcg.gold.sp_orders")
child_orders = child_orders.withColumn(
    'date', f.trunc(f.col('date'), 'Month')
).groupBy('date','customer_id', 'product_id').agg(
    f.sum('order_qty').alias('sold_quantity')
).withColumnsRenamed({
    'product_id': 'product_code',
    'customer_id': 'customer_code'
})

child_orders.limit(5).display()

In [0]:
# Merge parent order fact_orders and sb_orders
parent_orders = DeltaTable.forName(spark, f'{catlog}.{gold_schema}.fact_orders')

parent_orders.alias('t').merge(
    source= child_orders.alias('s'),
    condition= "t.order_id = s.order_id AND t.date = s.date AND t.customer_code = s.customer_code AND t.product_code = s.product_code"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [0]:
%sql
SELECT * FROM fmcg.gold.sp_orders LIMIT 5