In [0]:
from pyspark.sql.functions import col, avg, count, sum, mean, stddev, isnan, isnull
from pyspark.sql import SparkSession

In [0]:
# Load the table into a DataFrame
transactions_df = spark.table("transaction")

# Preview it
transactions_df.show()

+-------------+-----------+----------+------------+--------+-----------------+
|TransactionID|Customer ID|Product ID|Total Amount| Channel|MarketingCampaign|
+-------------+-----------+----------+------------+--------+-----------------+
|            1|        101|       501|         500|     Web|       Campaign A|
|            2|        102|       502|         300|  Mobile|       Campaign B|
|            3|        103|       501|         700|In-Store|       Campaign A|
+-------------+-----------+----------+------------+--------+-----------------+



In [0]:
products_df = spark.table("products")
products_df.show()

+---------+------------+---------------+-----+
|ProductID|Product Name|       Category|Price|
+---------+------------+---------------+-----+
|      501|  Smartphone|    Electronics| 1000|
|      502|     Shampoo|  Personal Care|  300|
|      503|     Blender|Home Appliances|  500|
|      504|  Headphones|    Electronics|  800|
+---------+------------+---------------+-----+



In [0]:
# Join on Product ID (with different column names)
combined_df = transactions_df.join(
    products_df,
    transactions_df["`Product ID`"] == products_df["ProductID"],
    "inner"
)

# Drop duplicate join column if needed
combined_df = combined_df.drop(products_df["ProductID"])

# Show joined data
combined_df.show()


+-------------+-----------+----------+------------+--------+-----------------+------------+-------------+-----+
|TransactionID|Customer ID|Product ID|Total Amount| Channel|MarketingCampaign|Product Name|     Category|Price|
+-------------+-----------+----------+------------+--------+-----------------+------------+-------------+-----+
|            3|        103|       501|         700|In-Store|       Campaign A|  Smartphone|  Electronics| 1000|
|            2|        102|       502|         300|  Mobile|       Campaign B|     Shampoo|Personal Care|  300|
|            1|        101|       501|         500|     Web|       Campaign A|  Smartphone|  Electronics| 1000|
+-------------+-----------+----------+------------+--------+-----------------+------------+-------------+-----+



In [0]:
from pyspark.sql.functions import avg, sum

# Add price to each transaction
enriched_df = combined_df.withColumn("OrderValue", combined_df["Price"])

# Group by customer
avg_order_value = enriched_df.groupBy("Customer ID").agg(avg("OrderValue").alias("AvgOrderValue"))
display(avg_order_value)

Customer ID,AvgOrderValue
101,1000.0
102,300.0
103,1000.0


In [0]:
#most popular product
from pyspark.sql.functions import count

popular_products = combined_df.groupBy("Product ID", "Product Name").agg(count("*").alias("TotalOrders")).orderBy("TotalOrders", ascending=False)
popular_products.show()

+----------+------------+-----------+
|Product ID|Product Name|TotalOrders|
+----------+------------+-----------+
|       501|  Smartphone|          2|
|       502|     Shampoo|          1|
+----------+------------+-----------+



In [0]:
#popular categories
popular_categories = combined_df.groupBy("Category").agg(count("*").alias("TotalOrders")).orderBy("TotalOrders", ascending=False)
popular_categories.show()

+-------------+-----------+
|     Category|TotalOrders|
+-------------+-----------+
|  Electronics|          2|
|Personal Care|          1|
+-------------+-----------+



In [0]:
from pyspark.sql.functions import col
import re

# Function to clean column names because they cant have any space ex- customer id = customer_id
def clean_column_names(df):
    for old_name in df.columns:
        new_name = re.sub(r'[^a-zA-Z0-9_]', '_', old_name)  # Replace space/special characters with "_"
        df = df.withColumnRenamed(old_name, new_name)
    return df

# Apply cleaning
clean_df = clean_column_names(combined_df)

# saving 
clean_df.write.format("delta").mode("overwrite").saveAsTable("combined_transaction_data")

In [0]:
# Optimize using Delta features
spark.sql("OPTIMIZE combined_transaction_data")

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
# monitor data quality

from pyspark.sql.types import NumericType
from pyspark.sql.functions import col, when, count, isnan, isnull

# Separate numeric columns for isnan()
numeric_cols = [f.name for f in combined_df.schema.fields if isinstance(f.dataType, NumericType)]

# Combine isnull + isnan for numeric, only isnull for others
null_counts = combined_df.select([
    count(when(isnull(c) | (isnan(c) if c in numeric_cols else False), c)).alias(c)
    for c in combined_df.columns
])

null_counts.show()

+-------------+-----------+----------+------------+-------+-----------------+------------+--------+-----+
|TransactionID|Customer ID|Product ID|Total Amount|Channel|MarketingCampaign|Product Name|Category|Price|
+-------------+-----------+----------+------------+-------+-----------------+------------+--------+-----+
|            0|          0|         0|           0|      0|                0|           0|       0|    0|
+-------------+-----------+----------+------------+-------+-----------------+------------+--------+-----+

