In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SalesDataBI") \
    .master("local[1]") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

# Loading the Data

## About Dataset from kaggle

Data holds the basic information about sales data. The company have sales agencies / resellers and branches and the data file holds only the branch/reseller information in the customer field.

Notes:
Please note that, for the privacy issues, the customerID, SKU and documentID is processed with LabelEncoder. So, each customer or each product has a unique ID in the data file.

Note 2: Data holds for more than 3 years, prices of the products can get raised by the time.

DocumentID : ID of the transaction. A transaction might hold multiple records for the same customer at the same date with multiple products (SKU). DocumentID might be useful for combining the transactions and detecting the items sold together.
Date : Date of transaction / sell. In the date time format.
SKU : Item / Product code. The unique code for each item sold.
Price : Sales price for the transaction. The price for the product for the customer for the date.
Discount : discount amount for the transaction.
Customer : Unique customer id for each customer. For the data set, customer can be a reseller or a branch of the company.
Quantity : Number of items sold in the transaction.
Version 2 Updates:
Newer version of the data set has more rows until 2023 January. Also some of the column names are updated for the naming standards in other Kaggle notebooks.

InvoiceID : ID of the transaction. A transaction might hold multiple records for the same customer at the same date with multiple products (SKU). DocumentID might be useful for combining the transactions and detecting the items sold together.
Date : Date of transaction / sell. In the date time format.
ProductID : Item / Product code. The unique code for each item sold.
TotalSales : Sales price for the transaction. If you want to get unit_price , divide TotalSales column to Quantity column
Discount : Discount amount for the transaction.
CustomerID : Unique customer id for each customer. For the data set, customer can be a reseller or a branch of the company.
Quantity : Number of items sold in the transaction.

In [10]:
sales_df1 = spark.read.option("header", "true").csv("dataset/sales1.csv")

In [11]:
sales_df1.show(5)

+---+----------+----------+----+------+--------+--------+--------+
|_c0|DocumentID|      Date| SKU| Price|Discount|Customer|Quantity|
+---+----------+----------+----+------+--------+--------+--------+
|  0|       716|2019-09-23|1039|381.78|67.37254|       1|     1.0|
|  1|       716|2019-09-23| 853|593.22| 0.00034|       1|     1.0|
|  2|       716|2019-09-23| 862|423.73|-0.00119|       1|     1.0|
|  3|       716|2019-09-23| 868| 201.7|35.58814|       1|     1.0|
|  4|       716|2019-09-23|2313|345.76|61.01966|       1|     1.0|
+---+----------+----------+----+------+--------+--------+--------+
only showing top 5 rows



24/11/06 03:58:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , DocumentID, Date, SKU, Price, Discount, Customer, Quantity
 Schema: _c0, DocumentID, Date, SKU, Price, Discount, Customer, Quantity
Expected: _c0 but found: 
CSV file: file:///home/ayoub/BI2Bigdata/dataset/sales1.csv


In [12]:
sales_df1 = sales_df1.withColumnRenamed("DocumentID", "InvoiceID") \
                   .withColumnRenamed("SKU", "ProductID") \
                   .withColumnRenamed("Price", "TotalSales") \
                   .withColumnRenamed("Customer", "CustomerID")

In [13]:
sales_df1.show(5)

+---+---------+----------+---------+----------+--------+----------+--------+
|_c0|InvoiceID|      Date|ProductID|TotalSales|Discount|CustomerID|Quantity|
+---+---------+----------+---------+----------+--------+----------+--------+
|  0|      716|2019-09-23|     1039|    381.78|67.37254|         1|     1.0|
|  1|      716|2019-09-23|      853|    593.22| 0.00034|         1|     1.0|
|  2|      716|2019-09-23|      862|    423.73|-0.00119|         1|     1.0|
|  3|      716|2019-09-23|      868|     201.7|35.58814|         1|     1.0|
|  4|      716|2019-09-23|     2313|    345.76|61.01966|         1|     1.0|
+---+---------+----------+---------+----------+--------+----------+--------+
only showing top 5 rows



24/11/06 03:58:51 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , DocumentID, Date, SKU, Price, Discount, Customer, Quantity
 Schema: _c0, DocumentID, Date, SKU, Price, Discount, Customer, Quantity
Expected: _c0 but found: 
CSV file: file:///home/ayoub/BI2Bigdata/dataset/sales1.csv


In [14]:
sales_df2 = spark.read.option("header", "true").csv("dataset/sales2.csv")

In [15]:
sales_df = sales_df1.union(sales_df2)
sales_df.show(5)

+---+---------+----------+---------+----------+--------+----------+--------+
|_c0|InvoiceID|      Date|ProductID|TotalSales|Discount|CustomerID|Quantity|
+---+---------+----------+---------+----------+--------+----------+--------+
|  0|      716|2019-09-23|     1039|    381.78|67.37254|         1|     1.0|
|  1|      716|2019-09-23|      853|    593.22| 0.00034|         1|     1.0|
|  2|      716|2019-09-23|      862|    423.73|-0.00119|         1|     1.0|
|  3|      716|2019-09-23|      868|     201.7|35.58814|         1|     1.0|
|  4|      716|2019-09-23|     2313|    345.76|61.01966|         1|     1.0|
+---+---------+----------+---------+----------+--------+----------+--------+
only showing top 5 rows



24/11/06 03:59:58 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , DocumentID, Date, SKU, Price, Discount, Customer, Quantity
 Schema: _c0, DocumentID, Date, SKU, Price, Discount, Customer, Quantity
Expected: _c0 but found: 
CSV file: file:///home/ayoub/BI2Bigdata/dataset/sales1.csv


In [17]:
sales_df = sales_df.drop("_c0")

In [18]:
sales_df.write.format("delta").mode("overwrite").save("delta/sales_data")

                                                                                

# Slowly Changing Dimensions (SCD)

## Customer Dimension Table (SCD Type 2)

In [21]:
from pyspark.sql import functions as F

# Load the customer dimension table (now that it exists)
customer_dim = spark.read.format("delta").load("delta/customer_dimension")

# Load the new customer data (simulating with sales data)
customer_data = sales_df.select("CustomerID", "Customer").distinct()

# Merge operation for SCD Type 2: Update existing records and insert new ones
customer_data.alias("new") \
    .join(customer_dim.alias("old"), "new.CustomerID = old.CustomerID", "left_outer") \
    .withColumn("is_match", F.when(F.col("old.CustomerID").isNull(), F.lit(False)).otherwise(F.lit(True))) \
    .withColumn("valid_to", F.when(F.col("is_match") == False, F.current_date()).otherwise(F.lit(None))) \
    .withColumn("is_active", F.when(F.col("is_match") == False, F.lit(True)).otherwise(F.lit(False))) \
    .withColumn("valid_from", F.when(F.col("is_match") == False, F.current_date()).otherwise(F.lit(None))) \
    .select("new.CustomerID", "new.Customer", "valid_from", "valid_to", "is_active") \
    .createOrReplaceTempView("customer_updates")


customer_dim_df = spark.read.format("delta").load("delta/customer_dimension")

customer_dim_df.createOrReplaceTempView("customer_dim_table")


spark.sql("""
MERGE INTO customer_dim_table AS old
USING customer_updates AS new
ON old.CustomerID = new.CustomerID
WHEN MATCHED AND old.Customer != new.Customer AND old.is_active = TRUE
    THEN UPDATE SET old.valid_to = current_date(), old.is_active = FALSE
WHEN NOT MATCHED
    THEN INSERT (CustomerID, Customer, valid_from, valid_to, is_active)
         VALUES (new.CustomerID, new.Customer, new.valid_from, new.valid_to, new.is_active)
""")


AnalysisException: `delta/customer_dimension` is not a Delta table.;