In [55]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, dayofweek, col, when, rand, avg, sum as _sum

In [16]:
spark = SparkSession.builder.appName("Sunday Sales Analysis").getOrCreate()

In [71]:
dim_products = spark.read.csv("data/adv/dim_product.csv", header=True)
fact_internet_sales = spark.read.csv("data/adv/fact_internet_sales.csv", header=True)
dim_date = spark.read.csv("data/adv/dim_date.csv", header=True)
dim_customer = spark.read.csv("data/adv/dim_customer.csv", header=True)
dim_currency = spark.read.csv("data/adv/dim_currency.csv", header=True)

In [79]:
# Join with dim_products
fact_sales = fact_internet_sales.join(dim_products, on="ProductKey", how="inner")

# Join with dim_date
fact_sales = fact_sales.join(dim_date, fact_sales["OrderDateKey"] == dim_date["DateKey"], how="inner")

# Join with dim_customer
fact_sales = fact_sales.join(dim_customer, on="CustomerKey", how="inner")

# Join with dim_currency
fact_sales = fact_sales.join(dim_currency, on="CurrencyKey", how="inner")

# Verify schema after joins
fact_sales.printSchema()

root
 |-- CurrencyKey: string (nullable = true)
 |-- CustomerKey: string (nullable = true)
 |-- ProductKey: string (nullable = true)
 |-- OrderDateKey: string (nullable = true)
 |-- OrderQuantity: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- ExtendedAmount: string (nullable = true)
 |-- UnitPriceDiscountPct: string (nullable = true)
 |-- DiscountAmount: string (nullable = true)
 |-- ProductStandardCost: string (nullable = true)
 |-- TotalProductCost: string (nullable = true)
 |-- SalesAmount: string (nullable = true)
 |-- TaxAmt: string (nullable = true)
 |-- Freight: string (nullable = true)
 |-- ProductAlternateKey: string (nullable = true)
 |-- EnglishProductName: string (nullable = true)
 |-- SpanishProductName: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- ListPrice: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- DaysToManufacture: string (nullable = true)
 |-- Status: string (n

In [103]:
# Task1 Filtering
from pyspark.sql.functions import col

# Filter sales based on the given criteria
task1_filtered_sales = fact_sales.filter(
    (col("EnglishDayNameOfWeek") == "Sunday") &  # Filter for Sunday
    (col("Color") == "Silver") &  # Filter for Silver color
    (col("Size").isNotNull()) &  # Ensure Size is not null
    (col("Size") != "") &  # Ensure Size is not empty
    (col("Weight").cast("double") > 10) &  # Ensure Weight is greater than 10
    (col("YearlyIncome").cast("double") > 100000.0) &  # Ensure YearlyIncome > 100,000
    (col("TotalChildren").cast("int") > 1)  # Ensure more than 1 child
)

# Show the filtered sales data
task1_filtered_sales.show(truncate=False)




+-----------+-----------+----------+------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+-------------------+-----------------------+--------------------------+------+---------+----+------+-----------------+-------+--------+--------------------+---------------+--------------------+--------------------+----------------+---------------+----------------+----------------+----------------+-----------------+---------------+------------+----------------+-------------+----------+--------------+---------+----------+---------+---------+----------+-------------+------+------+-----------------------------+------------+-------------+----------------------+---------------------+-------------------+--------------------+--------------------+
|CurrencyKey|CustomerKey|ProductKey|OrderDateKey|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|Sa

In [None]:
# Task 2 Aggregation
from pyspark.sql.functions import col, sum, avg

# Perform aggregation
aggregated_sales = task1_filtered_sales.groupBy(
    "CustomerKey", "FirstName"
).agg(
    sum("TaxAmt").alias("TotalTaxAmt"),  
    avg("SalesAmount").alias("AverageSalesAmount"),  
    avg("TotalProductCost").alias("AverageTotalProductCost") 
)

# Show the aggregated result
aggregated_sales.show(truncate=False)


+-----------+---------+------------------+------------------+-----------------------+
|CustomerKey|FirstName|TotalTaxAmt       |AverageSalesAmount|AverageTotalProductCost|
+-----------+---------+------------------+------------------+-----------------------+
|12188      |Ashley   |185.5992          |2319.99           |1265.6195              |
|18131      |Ann      |185.5992          |2319.99           |1265.6195              |
|11456      |Jon      |271.9992          |3399.99           |1912.1544              |
|13581      |Albert   |185.5992          |2319.99           |1265.6195              |
|18139      |Jennifer |185.5992          |2319.99           |1265.6195              |
|11249      |Cindy    |271.9992          |3399.99           |1912.1544              |
|26107      |Alexis   |271.9992          |3399.99           |1912.1544              |
|11242      |Larry    |45.1992           |564.99            |308.2179               |
|12710      |Dalton   |185.5992          |2319.99     

In [105]:
# Task 3 Sorting

aggregated_sales_sorted = aggregated_sales.orderBy("FirstName", ascending=True)

aggregated_sales_sorted.show(truncate=False)

+-----------+---------+------------------+------------------+-----------------------+
|CustomerKey|FirstName|TotalTaxAmt       |AverageSalesAmount|AverageTotalProductCost|
+-----------+---------+------------------+------------------+-----------------------+
|13581      |Albert   |185.5992          |2319.99           |1265.6195              |
|26107      |Alexis   |271.9992          |3399.99           |1912.1544              |
|11244      |Alexis   |185.5992          |2319.99           |1265.6195              |
|16623      |Andres   |185.5992          |2319.99           |1265.6195              |
|18131      |Ann      |185.5992          |2319.99           |1265.6195              |
|11240      |Anne     |457.59839999999997|2859.99           |1588.88695             |
|12188      |Ashley   |185.5992          |2319.99           |1265.6195              |
|17536      |Benjamin |185.5992          |2319.99           |1265.6195              |
|15100      |Brenda   |185.5992          |2319.99     

In [106]:
# Task 4 Data presentation

final_result = aggregated_sales_sorted.drop("CustomerKey")

final_result.show(truncate=False)

+---------+------------------+------------------+-----------------------+
|FirstName|TotalTaxAmt       |AverageSalesAmount|AverageTotalProductCost|
+---------+------------------+------------------+-----------------------+
|Albert   |185.5992          |2319.99           |1265.6195              |
|Alexis   |271.9992          |3399.99           |1912.1544              |
|Alexis   |185.5992          |2319.99           |1265.6195              |
|Andres   |185.5992          |2319.99           |1265.6195              |
|Ann      |185.5992          |2319.99           |1265.6195              |
|Anne     |457.59839999999997|2859.99           |1588.88695             |
|Ashley   |185.5992          |2319.99           |1265.6195              |
|Benjamin |185.5992          |2319.99           |1265.6195              |
|Brenda   |185.5992          |2319.99           |1265.6195              |
|Cindy    |271.9992          |3399.99           |1912.1544              |
|Colleen  |185.5992          |2319.99 