In [0]:
%run ../includes/configurations

In [0]:
bronze_folder_path

Out[126]: '/mnt/clvprojectadls/bronze'

In [0]:
silver_folder_path

Out[127]: '/mnt/clvprojectadls/silver'

In [0]:
dbutils.widgets.text("p_file_date", "2024-11-01")
v_file_date = dbutils.widgets.get("p_file_date")

#This notebook reads data from the transaction_data file

#Specifying the schema  

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType,DateType

In [0]:
from pyspark.sql.functions import current_timestamp, lit

In [0]:
transaction_data_schema = StructType(fields = [StructField('CustomerId', StringType(),False),
                                            StructField("TransactionDate", DateType(), True),
                                            StructField('TransactionAmount',DoubleType(),True),
                                            StructField('ItemPurchased',StringType(),True)])

# Read the customer_info file

In [0]:
transaction_data_df = spark.read\
                   .option('header',True)\
                   .schema(transaction_data_schema)\
                   .csv(f"{bronze_folder_path}/{v_file_date}/transaction_data.csv")

In [0]:
display(transaction_data_df)

CustomerId,TransactionDate,TransactionAmount,ItemPurchased
CUST021,2023-09-28,6.61,Salad
CUST021,2023-01-30,13.17,Pastry
CUST021,2023-02-10,23.8,Coffee
CUST021,2023-08-31,11.75,Salad
CUST021,2023-10-09,24.82,Coffee
CUST021,2023-04-06,16.94,Salad
CUST021,2023-05-03,10.19,Coffee
CUST021,2023-10-10,12.86,Salad
CUST021,2023-08-07,29.25,Coffee
CUST021,2023-10-26,20.64,Pastry


# Rename the column


In [0]:
transaction_data_renamed_df = transaction_data_df.withColumnRenamed("CustomerId","customer_id")\
                          .withColumnRenamed("TransactionDate","transaction_date")\
                          .withColumnRenamed("TransactionAmount","transaction_amount")\
                          .withColumnRenamed("ItemPurchased","item_purchased")\
                          .withColumn("file_date", lit(v_file_date))


In [0]:
display(transaction_data_renamed_df)

customer_id,transaction_date,transaction_amount,item_purchased,file_date
CUST021,2023-09-28,6.61,Salad,2024-11-10
CUST021,2023-01-30,13.17,Pastry,2024-11-10
CUST021,2023-02-10,23.8,Coffee,2024-11-10
CUST021,2023-08-31,11.75,Salad,2024-11-10
CUST021,2023-10-09,24.82,Coffee,2024-11-10
CUST021,2023-04-06,16.94,Salad,2024-11-10
CUST021,2023-05-03,10.19,Coffee,2024-11-10
CUST021,2023-10-10,12.86,Salad,2024-11-10
CUST021,2023-08-07,29.25,Coffee,2024-11-10
CUST021,2023-10-26,20.64,Pastry,2024-11-10


# Add a column called ingestion date

In [0]:
transaction_data_final_df = transaction_data_renamed_df.withColumn('ingestion_date', current_timestamp())

In [0]:
display(transaction_data_final_df )

customer_id,transaction_date,transaction_amount,item_purchased,file_date,ingestion_date
CUST021,2023-09-28,6.61,Salad,2024-11-10,2024-11-13T21:42:14.928+0000
CUST021,2023-01-30,13.17,Pastry,2024-11-10,2024-11-13T21:42:14.928+0000
CUST021,2023-02-10,23.8,Coffee,2024-11-10,2024-11-13T21:42:14.928+0000
CUST021,2023-08-31,11.75,Salad,2024-11-10,2024-11-13T21:42:14.928+0000
CUST021,2023-10-09,24.82,Coffee,2024-11-10,2024-11-13T21:42:14.928+0000
CUST021,2023-04-06,16.94,Salad,2024-11-10,2024-11-13T21:42:14.928+0000
CUST021,2023-05-03,10.19,Coffee,2024-11-10,2024-11-13T21:42:14.928+0000
CUST021,2023-10-10,12.86,Salad,2024-11-10,2024-11-13T21:42:14.928+0000
CUST021,2023-08-07,29.25,Coffee,2024-11-10,2024-11-13T21:42:14.928+0000
CUST021,2023-10-26,20.64,Pastry,2024-11-10,2024-11-13T21:42:14.928+0000


# Save the table to the silver folder

In [0]:
spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning", "true")

from delta.tables import DeltaTable

# Check if the table exists in the catalog
if spark._jsparkSession.catalog().tableExists("clv_silver.transaction_data"):
    # Load the existing Delta table
    deltaTable = DeltaTable.forPath(spark, "/mnt/clvprojectadls/silver/transaction_data")
    
    # Perform the merge operation
    deltaTable.alias("tgt").merge(transaction_data_final_df.alias("src"), "tgt.customer_id = src.customer_id") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    # Write the DataFrame as a new Delta table with partitioning by customer_id
    transaction_data_final_df.write \
        .mode('overwrite') \
        .partitionBy('customer_id') \
        .format('delta') \
        .saveAsTable("clv_silver.transaction_data")

In [0]:
%sql
SELECT *
FROM clv_silver.transaction_data
ORDER BY customer_id

customer_id,transaction_date,transaction_amount,item_purchased,file_date,ingestion_date
CUST001,2024-11-01,20.5,coffee,2024-11-01,2024-11-13T21:41:44.975+0000
CUST002,2024-11-05,30.75,pastry,2024-11-01,2024-11-13T21:41:44.975+0000
CUST003,2023-04-28,17.36,Sandwich,2024-11-05,2024-11-13T21:42:00.498+0000
CUST003,2023-05-15,20.32,Coffee,2024-11-05,2024-11-13T21:42:00.498+0000
CUST003,2023-11-02,12.99,Coffee,2024-11-05,2024-11-13T21:42:00.498+0000
CUST003,2023-07-15,10.78,Coffee,2024-11-05,2024-11-13T21:42:00.498+0000
CUST003,2023-08-28,22.66,Sandwich,2024-11-05,2024-11-13T21:42:00.498+0000
CUST003,2023-10-02,19.6,Coffee,2024-11-05,2024-11-13T21:42:00.498+0000
CUST003,2023-11-07,9.98,Coffee,2024-11-05,2024-11-13T21:42:00.498+0000
CUST003,2023-04-28,6.9,Salad,2024-11-05,2024-11-13T21:42:00.498+0000
