In [5]:
from pyspark.sql.types import *
 
# Create the schema for the table
orderSchema = StructType([
    StructField("Transaction_ID", StringType()),
    StructField("Customer_ID", StringType()),
    StructField("Name", StringType()),
    StructField("Email", StringType()),
    StructField("Phone", StringType()),
    StructField("Address", StringType()),
    StructField("City", StringType()),
    StructField("State", StringType()),
    StructField("Zipcode", StringType()),
    StructField("Country", StringType()),
    StructField("Age", IntegerType()),
    StructField("Gender", StringType()),
    StructField("Income", StringType()),
    StructField("Customer_Segment", StringType()),
    StructField("Date", StringType()),
    StructField("Year", IntegerType()),
    StructField("Month", StringType()),
    StructField("Time", StringType()),
    StructField("Total_Purchases", IntegerType()),
    StructField("Amount", FloatType()),
    StructField("Total_Amount", FloatType()),
    StructField("Product_Category", StringType()),
    StructField("Product_Brand", StringType()),
    StructField("Product_Type", StringType()),
    StructField("Feedback", StringType()),
    StructField("Shipping_Method", StringType()),
    StructField("Payment_Method", StringType()),
    StructField("Order_Status", StringType()),
    StructField("Ratings", FloatType()),
    StructField("products", StringType())
])
 
# Import all files from bronze folder of lakehouse
df = spark.read.format("csv").option("header", "true").schema(orderSchema).load("Files/Bronze/retail_data_cleaned.csv")
#.option("header", "true")

# Display the first 10 rows of the dataframe to preview your data
display(df.head(10))

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0da93c38-e78e-4ddd-b12c-6f99bc92fba5)

In [6]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 8, Finished, Available, Finished)

In [7]:
from pyspark.sql import functions as F

# Convert the Date column to a DateType using the format "MM/dd/yyyy"
df = df.withColumn("Date", F.to_date("Date", "MM/dd/yyyy"))

# Extract Year, Month, and Day as integers
df = df.withColumn("Year", F.year("Date")) \
       .withColumn("Month", F.month("Date")) \
       .withColumn("Day", F.dayofmonth("Date"))

# Show the transformed columns
df.select("Date", "Year", "Month", "Day").show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 9, Finished, Available, Finished)

+----------+----+-----+---+
|      Date|Year|Month|Day|
+----------+----+-----+---+
|2023-09-18|2023|    9| 18|
|2023-12-31|2023|   12| 31|
|2023-04-26|2023|    4| 26|
|2023-05-08|2023|    5|  8|
|2024-01-10|2024|    1| 10|
|2023-09-21|2023|    9| 21|
|2023-06-26|2023|    6| 26|
|2023-03-24|2023|    3| 24|
|2024-01-06|2024|    1|  6|
|2023-10-04|2023|   10|  4|
|2023-07-20|2023|    7| 20|
|2023-06-21|2023|    6| 21|
|2024-01-02|2024|    1|  2|
|2023-05-07|2023|    5|  7|
|2023-11-18|2023|   11| 18|
|2023-06-15|2023|    6| 15|
|2023-07-01|2023|    7|  1|
|2023-04-14|2023|    4| 14|
|2024-02-07|2024|    2|  7|
|2023-10-24|2023|   10| 24|
+----------+----+-----+---+
only showing top 20 rows



In [8]:
display(df.head(10))

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8e9430ac-46a2-4f58-b6b6-82cceae48d33)

In [9]:
# List of numeric columns to replace nulls with 0
numeric_columns = ["Age","Total_Purchases", "Amount", "Total_Amount", "Ratings"]

# Replace nulls in specific columns with 0
df = df.fillna({col: 0 for col in numeric_columns})

# Show the result
df.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 11, Finished, Available, Finished)

+--------------+-----------+-------------------+-------------------+----------+--------------------+----------+---------------+-------+---------+---+------+------+----------------+----------+----+-----+-----------+---------------+------+------------+----------------+-------------+------------+---------+---------------+--------------+------------+-------+------------------+---+
|Transaction_ID|Customer_ID|               Name|              Email|     Phone|             Address|      City|          State|Zipcode|  Country|Age|Gender|Income|Customer_Segment|      Date|Year|Month|       Time|Total_Purchases|Amount|Total_Amount|Product_Category|Product_Brand|Product_Type| Feedback|Shipping_Method|Payment_Method|Order_Status|Ratings|          products|Day|
+--------------+-----------+-------------------+-------------------+----------+--------------------+----------+---------------+-------+---------+---+------+------+----------------+----------+----+-----+-----------+---------------+------+---

In [10]:
# List of string columns (assuming string columns are those with non-numeric data)
string_columns = ["Transaction_ID", "Customer_ID", "Name", "Email", "Phone", "Address", "City", "State", "Zipcode", "Country", "Gender", "Income", "Customer_Segment", "Product_Category", "Product_Brand", "Product_Type", "Feedback", "Shipping_Method", "Payment_Method", "Order_Status", "products"]

# Replace null values with "NA" in the string columns
df = df.fillna("NA", subset=string_columns)

# Show the result
df.show()

#display(df_filled_string.head(100))

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 12, Finished, Available, Finished)

+--------------+-----------+-------------------+-------------------+----------+--------------------+----------+---------------+-------+---------+---+------+------+----------------+----------+----+-----+-----------+---------------+------+------------+----------------+-------------+------------+---------+---------------+--------------+------------+-------+------------------+---+
|Transaction_ID|Customer_ID|               Name|              Email|     Phone|             Address|      City|          State|Zipcode|  Country|Age|Gender|Income|Customer_Segment|      Date|Year|Month|       Time|Total_Purchases|Amount|Total_Amount|Product_Category|Product_Brand|Product_Type| Feedback|Shipping_Method|Payment_Method|Order_Status|Ratings|          products|Day|
+--------------+-----------+-------------------+-------------------+----------+--------------------+----------+---------------+-------+---------+---+------+------+----------------+----------+----+-----+-----------+---------------+------+---

In [11]:
display(df.head(100))

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 76e6d4fc-63d1-494b-8426-95d49461e8fb)

In [12]:
from pyspark.sql import functions as F

# Select the necessary columns to form the fact table
fact_columns = [
    "Transaction_ID",  # Fact key (primary key)
    "Customer_ID",  # Foreign key to customer dimension
    "Date",  # Foreign key to date dimension
    "Year",  # Year of transaction (could link to a Date Dimension)
    "Month",  # Month of transaction (could link to Date Dimension)
    "Time",  # Timestamp of transaction
    "Total_Purchases",  # Fact measure (quantity of items purchased)
    "Amount",  # Fact measure (price of each item purchased)
    "Total_Amount",  # Fact measure (total monetary value of the transaction)
    "Ratings",  # Fact measure (customer rating)
    "Order_Status",  # Dimension (status of the order)
    "Shipping_Method"  # Dimension (method of shipping)
]

# Create the fact table DataFrame
fact_table_df = df.select(fact_columns)

# Show the updated fact table
fact_table_df.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 14, Finished, Available, Finished)

+--------------+-----------+----------+----+-----+-----------+---------------+------+------------+-------+------------+---------------+
|Transaction_ID|Customer_ID|      Date|Year|Month|       Time|Total_Purchases|Amount|Total_Amount|Ratings|Order_Status|Shipping_Method|
+--------------+-----------+----------+----+-----+-----------+---------------+------+------------+-------+------------+---------------+
|       8691788|      37249|2023-09-18|2023|    9|10:03:55 PM|              3|108.03|      324.09|    5.0|     Shipped|       Same-Day|
|       2174773|      69749|2023-12-31|2023|   12| 8:42:04 AM|              2|403.35|      806.71|    4.0|  Processing|       Standard|
|       6679610|      30192|2023-04-26|2023|    4| 4:06:29 AM|              3|354.48|     1063.43|    2.0|  Processing|       Same-Day|
|       7232460|      62101|2023-05-08|2023|    5| 2:55:17 PM|              7|352.41|     2466.85|    4.0|  Processing|       Standard|
|       4983775|      27901|2024-01-10|2024|    

In [13]:
from pyspark.sql import functions as F

# Assuming you have a DataFrame 'df' with customer data (like the columns you provided)
# Select the columns that will form the customer dimension table
customer_dimension_columns = [
    "Customer_ID",  # Primary Key for Customer
    "Name",  # Customer Name
    "Email",  # Customer Email
    "Phone",  # Customer Phone
    "Address",  # Customer Address
    "City",  # Customer City
    "State",  # Customer State
    "ZipCode",  # Customer Zip Code
    "Country",  # Customer Country
    "Age",  # Customer Age
    "Gender",  # Customer Gender
    "Income",  # Customer Income
    "Customer_Segment"  # Customer Segment
]

# Create the customer dimension DataFrame
customer_dimension_df = df.select(customer_dimension_columns)

# Show the customer dimension table
customer_dimension_df.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 15, Finished, Available, Finished)

+-----------+-------------------+-------------------+----------+--------------------+----------+---------------+-------+---------+---+------+------+----------------+
|Customer_ID|               Name|              Email|     Phone|             Address|      City|          State|ZipCode|  Country|Age|Gender|Income|Customer_Segment|
+-----------+-------------------+-------------------+----------+--------------------+----------+---------------+-------+---------+---+------+------+----------------+
|      37249|Michelle Harrington|  Ebony39@gmail.com|1414786801|   3959 Amanda Burgs|  Dortmund|         Berlin|  77985|  Germany| 21|  Male|   Low|         Regular|
|      69749|        Kelsey Hill|   Mark36@gmail.com|6852899987|  82072 Dawn Centers|Nottingham|        England|  99071|       UK| 19|Female|   Low|         Premium|
|      30192|       Scott Jensen|  Shane85@gmail.com|8362160449|   4133 Young Canyon|   Geelong|New South Wales|  75929|Australia| 48|  Male|   Low|         Regular|
|   

In [14]:
from pyspark.sql import functions as F

# Select the columns that will form the product dimension table
product_dimension_columns = [
    "Product_Category",  # Product Category
    "Product_Brand",     # Product Brand
    "Product_Type"       # Product Type
]

# Create the product dimension DataFrame
product_dimension_df = df.select(product_dimension_columns)

# Show the product dimension table
product_dimension_df.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 16, Finished, Available, Finished)

+----------------+-------------+------------+
|Product_Category|Product_Brand|Product_Type|
+----------------+-------------+------------+
|        Clothing|         Nike|      Shorts|
|     Electronics|      Samsung|      Tablet|
|           Books|Penguin Books|  Children's|
|      Home Decor|   Home Depot|       Tools|
|         Grocery|       Nestle|   Chocolate|
|     Electronics|        Apple|      Tablet|
|     Electronics|      Samsung|  Television|
|        Clothing|         Zara|       Shirt|
|         Grocery|       Nestle|   Chocolate|
|      Home Decor|   Home Depot| Decorations|
|      Home Decor|   Home Depot|       Tools|
|           Books| Random House| Non-Fiction|
|         Grocery|    Coca-Cola|       Water|
|         Grocery|       Nestle|      Snacks|
|        Clothing|       Adidas|     T-shirt|
|           Books| Random House|  Literature|
|         Grocery|        Pepsi|       Water|
|     Electronics|        Apple|      Tablet|
|         Grocery|    Coca-Cola|  

In [15]:
from pyspark.sql import functions as F

# Select the columns needed for the Date Dimension table
date_dimension_df = df.select("Date").distinct()  # Get distinct dates

# Add Year and Month columns by extracting them from the Date column
date_dimension_df = (
    date_dimension_df
    .withColumn("Year", F.year("Date"))            # Extract Year
    .withColumn("Month", F.month("Date"))          # Extract Month as a number
)

# Show the Date Dimension table
date_dimension_df.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 17, Finished, Available, Finished)

+----------+----+-----+
|      Date|Year|Month|
+----------+----+-----+
|2023-07-15|2023|    7|
|2023-06-22|2023|    6|
|2023-05-22|2023|    5|
|2023-11-08|2023|   11|
|2024-02-05|2024|    2|
|2023-09-14|2023|    9|
|2023-11-22|2023|   11|
|2023-06-18|2023|    6|
|2023-09-19|2023|    9|
|2024-01-07|2024|    1|
|2023-06-23|2023|    6|
|2023-12-10|2023|   12|
|2023-11-29|2023|   11|
|2023-03-12|2023|    3|
|2024-01-11|2024|    1|
|2023-03-24|2023|    3|
|2023-11-25|2023|   11|
|2023-09-27|2023|    9|
|2023-07-29|2023|    7|
|2023-11-17|2023|   11|
+----------+----+-----+
only showing top 20 rows



In [16]:
# Total Sales per City
total_sales_per_city = df.groupBy("City").agg(F.sum("Total_Amount").alias("Total_Sales"))
total_sales_per_city.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 18, Finished, Available, Finished)

+------------+--------------------+
|        City|         Total_Sales|
+------------+--------------------+
|     Hanover|  3217664.1471529007|
|    Winnipeg|  3237655.1894454956|
|     Phoenix|  1283073.6380472183|
|      Cairns|   2999906.320347786|
|     Kelowna|   2966322.839579582|
|    Brighton|   3119716.031478882|
|       Omaha|  1204551.5090970993|
|     Bendigo|  3158467.9312705994|
|    Canberra|  3019667.7514133453|
|      Ottawa|  3066374.6722688675|
|   Edinburgh|  3189332.8599157333|
|      Dallas|  1275347.9528608322|
|  Manchester|   3125469.078642845|
|     Oakland|  1236003.3692913055|
|    Adelaide|   3027242.350548744|
|   Frankfurt|1.3939496106239319E7|
|        Hull|  3008566.3879699707|
| San Antonio|  1303029.2303295135|
|     Raleigh|  1219777.0294971466|
|Philadelphia|  1163018.7595739365|
+------------+--------------------+
only showing top 20 rows



In [17]:
from pyspark.sql import functions as F

# Join the Fact Table with Customer Dimension on Customer_ID
sales_per_city_df = (
    fact_table_df
    .join(customer_dimension_df, fact_table_df["Customer_ID"] == customer_dimension_df["Customer_ID"], "inner")
    .groupBy("City")  # Group by City
    .agg(F.sum("Total_Amount").alias("Total_Sales"))  # Sum the Total_Amount for each city
    .orderBy(F.desc("Total_Sales"))  # Optional: order by Total Sales in descending order
)

# Show the Total Sales per City
sales_per_city_df.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 19, Finished, Available, Finished)

+-------------+--------------------+
|         City|         Total_Sales|
+-------------+--------------------+
|      Chicago|1.3737880056760788E8|
|   Portsmouth|1.3084496701404572E8|
|San Francisco| 7.831910012208462E7|
|    Frankfurt|6.4345960100746155E7|
|       Boston| 6.089540047289848E7|
|     New York| 3.453197202598953E7|
|   Fort Worth| 3.435502646147728E7|
|       London| 2.909871083112812E7|
|      Bendigo|1.6395549232676506E7|
|   St. John's|1.6052678513139725E7|
|     Montreal| 1.603814836448288E7|
|      Toronto|1.5990113467778206E7|
|      Hamburg|1.5873236430983543E7|
|    Wuppertal|1.5789033344211578E7|
|       Hobart|1.5703529328454018E7|
|     Winnipeg|1.5659483437058449E7|
|    Edinburgh|1.5623729536513329E7|
|    Stuttgart|1.5598946472307205E7|
|         Bonn| 1.558685455853653E7|
|   Birmingham| 1.549805568235588E7|
+-------------+--------------------+
only showing top 20 rows



In [18]:
# Total Sales per Quarter per Year
total_sales_per_quarter_year = df.withColumn("Quarter", F.quarter("Date")) \
                                 .groupBy("Year", "Quarter") \
                                 .agg(F.sum("Total_Amount").alias("Total_Sales"))
total_sales_per_quarter_year.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 20, Finished, Available, Finished)

+----+-------+--------------------+
|Year|Quarter|         Total_Sales|
+----+-------+--------------------+
|NULL|   NULL|   456533.8999977112|
|2023|      3|1.0390090862190628E8|
|2023|      2| 1.022145871706562E8|
|2023|      4|1.0296125076342487E8|
|2024|      1| 6.780420450732231E7|
|2023|      1|3.5221034315784454E7|
+----+-------+--------------------+



In [19]:
# Total Products Sold per City
total_products_sold_per_city = df.groupBy("City").agg(F.sum("Total_Purchases").alias("Total_Products_Sold"))
total_products_sold_per_city.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 21, Finished, Available, Finished)

+------------+-------------------+
|        City|Total_Products_Sold|
+------------+-------------------+
|     Hanover|              12387|
|    Winnipeg|              12800|
|     Phoenix|               4830|
|      Cairns|              11841|
|     Kelowna|              12008|
|    Brighton|              12265|
|       Omaha|               4630|
|     Bendigo|              12393|
|    Canberra|              11729|
|      Ottawa|              11900|
|   Edinburgh|              12541|
|      Dallas|               4760|
|  Manchester|              12322|
|     Oakland|               4886|
|    Adelaide|              11989|
|   Frankfurt|              54918|
|        Hull|              11904|
| San Antonio|               5157|
|     Raleigh|               4864|
|Philadelphia|               4483|
+------------+-------------------+
only showing top 20 rows



In [20]:
# Total Products Sold per City
total_products_sold_per_city = df.groupBy("City").agg(F.sum("Total_Purchases").alias("Total_Products_Sold"))
total_products_sold_per_city.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 22, Finished, Available, Finished)

+------------+-------------------+
|        City|Total_Products_Sold|
+------------+-------------------+
|     Hanover|              12387|
|    Winnipeg|              12800|
|     Phoenix|               4830|
|      Cairns|              11841|
|     Kelowna|              12008|
|    Brighton|              12265|
|       Omaha|               4630|
|     Bendigo|              12393|
|    Canberra|              11729|
|      Ottawa|              11900|
|   Edinburgh|              12541|
|      Dallas|               4760|
|  Manchester|              12322|
|     Oakland|               4886|
|    Adelaide|              11989|
|   Frankfurt|              54918|
|        Hull|              11904|
| San Antonio|               5157|
|     Raleigh|               4864|
|Philadelphia|               4483|
+------------+-------------------+
only showing top 20 rows



In [21]:
# Top Selling Products (by total sales amount)
top_selling_products = df.groupBy("Product_Type") \
                         .agg(F.sum("Total_Amount").alias("Total_Sales")) \
                         .orderBy(F.desc("Total_Sales"))
top_selling_products.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 23, Finished, Available, Finished)

+--------------------+--------------------+
|        Product_Type|         Total_Sales|
+--------------------+--------------------+
|               Water| 3.353849335254383E7|
|          Smartphone| 2.536094257862568E7|
|         Non-Fiction| 2.476621533374214E7|
|             Fiction| 2.475079412027359E7|
|             T-shirt|1.6680339656702042E7|
|               Shoes|1.6674895459059715E7|
|          Television| 1.666194597730732E7|
|         Decorations|1.6603689931912422E7|
|               Juice|1.6547322534795761E7|
|              Tablet|1.6515159204510689E7|
|          Soft Drink| 1.632686539283371E7|
|           Furniture|1.6294088592823982E7|
|              Fridge|1.0078253674794197E7|
|Mitsubishi 1.5 To...|   9188886.088431358|
|             Kitchen|   8532991.749515533|
|            Thriller|   8525723.348668098|
|              Shorts|   8437446.982055664|
|              Coffee|   8437236.902762413|
|          Headphones|   8401900.721751213|
|               Jeans|   8378990

In [22]:
# Total Products Sold per City (again for clarity as it was listed twice)
total_products_sold_per_city = df.groupBy("City").agg(F.sum("Total_Purchases").alias("Total_Products_Sold"))
total_products_sold_per_city.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 24, Finished, Available, Finished)

+------------+-------------------+
|        City|Total_Products_Sold|
+------------+-------------------+
|     Hanover|              12387|
|    Winnipeg|              12800|
|     Phoenix|               4830|
|      Cairns|              11841|
|     Kelowna|              12008|
|    Brighton|              12265|
|       Omaha|               4630|
|     Bendigo|              12393|
|    Canberra|              11729|
|      Ottawa|              11900|
|   Edinburgh|              12541|
|      Dallas|               4760|
|  Manchester|              12322|
|     Oakland|               4886|
|    Adelaide|              11989|
|   Frankfurt|              54918|
|        Hull|              11904|
| San Antonio|               5157|
|     Raleigh|               4864|
|Philadelphia|               4483|
+------------+-------------------+
only showing top 20 rows



In [23]:
# Total Sales by Sales Channel (assuming "Shipping_Method" as the sales channel)
total_sales_by_sales_channel = df.groupBy("Shipping_Method") \
                                 .agg(F.sum("Total_Amount").alias("Total_Sales"))
total_sales_by_sales_channel.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 25, Finished, Available, Finished)

+---------------+--------------------+
|Shipping_Method|         Total_Sales|
+---------------+--------------------+
|             NA|   507516.7108974457|
|        Express|  1.39923322083539E8|
|       Standard|1.2988981098116302E8|
|       Same-Day|1.4223786950349236E8|
+---------------+--------------------+



In [24]:
from pyspark.sql import functions as F

# Count total customers in each segment
customer_segmentation_df = (
    customer_dimension_df
    .groupBy("Customer_Segment")  # Group by Customer Segment
    .agg(F.count("Customer_ID").alias("Total_Customers"))  # Count of customers in each segment
    .orderBy(F.desc("Total_Customers"))  # Optional: sort by count in descending order
)

# Show Customer Segmentation
customer_segmentation_df.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 26, Finished, Available, Finished)

+----------------+---------------+
|Customer_Segment|Total_Customers|
+----------------+---------------+
|         Regular|         146219|
|             New|          91185|
|         Premium|          64387|
|              NA|            215|
+----------------+---------------+



In [25]:
# Calculate the Average Order Value (AOV)
average_order_value_df = (
    fact_table_df
    .agg(
        (F.sum("Total_Amount") / F.countDistinct("Transaction_ID")).alias("Average_Order_Value")
    )
)

# Show Average Order Value
average_order_value_df.show()

StatementMeta(, dea4c8f7-ef0e-455b-9bcb-64ede6169c9e, 27, Finished, Available, Finished)

+-------------------+
|Average_Order_Value|
+-------------------+
|  1401.058606132852|
+-------------------+

