# Question 2: Sales Data Transformation and Aggregation

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, date_format, expr, desc
from pyspark.sql.types import DecimalType
import sqlite3

In [3]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("RetailDataAnalysis") \
    .getOrCreate()

# Load the dataset into a PySpark DataFrame
file_path = 'data.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)

df.printSchema()

# Show the first few rows of the dataset
df.show()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|  

## Data Cleaning

In [4]:
# Find the columns with missing values
columns_with_missing = [column for column in df.columns if df.filter(col(column).isNull()).count() > 0]
columns_with_missing

['Description', 'CustomerID']

In [5]:
# Filter rows where Description is null
df_null_description = df.filter(col('Description').isNull())

# Show the resulting rows
df_null_description.show(df_null_description.count(),truncate=False)

# I decided to keep these rows instead of dropping them to reserve the 
# information (Invoice #, Stockcode, InvoiceDate and etc.). 
# Because the UnitPrice for these rows are 0, 
# keeping these rows does not affect analysis of the top10 products either.

+---------+------------+-----------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode   |Description|Quantity|InvoiceDate     |UnitPrice|CustomerID|Country       |
+---------+------------+-----------+--------+----------------+---------+----------+--------------+
|536414   |22139       |null       |56      |12/1/2010 11:52 |0.0      |null      |United Kingdom|
|536545   |21134       |null       |1       |12/1/2010 14:32 |0.0      |null      |United Kingdom|
|536546   |22145       |null       |1       |12/1/2010 14:33 |0.0      |null      |United Kingdom|
|536547   |37509       |null       |1       |12/1/2010 14:33 |0.0      |null      |United Kingdom|
|536549   |85226A      |null       |1       |12/1/2010 14:34 |0.0      |null      |United Kingdom|
|536550   |85044       |null       |1       |12/1/2010 14:34 |0.0      |null      |United Kingdom|
|536552   |20950       |null       |1       |12/1/2010 14:34 |0.0      |null      |United Kingdom|
|536553   

In [6]:
# Filter rows where CustomerID is null
df_null_customerID = df.filter(col('CustomerID').isNull())

# Show the resulting rows
df_null_customerID.show(truncate=False)

+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate    |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+---------------+---------+----------+--------------+
|536414   |22139    |null                               |56      |12/1/2010 11:52|0.0      |null      |United Kingdom|
|536544   |21773    |DECORATIVE ROSE BATHROOM BOTTLE    |1       |12/1/2010 14:32|2.51     |null      |United Kingdom|
|536544   |21774    |DECORATIVE CATS BATHROOM BOTTLE    |2       |12/1/2010 14:32|2.51     |null      |United Kingdom|
|536544   |21786    |POLKADOT RAIN HAT                  |4       |12/1/2010 14:32|0.85     |null      |United Kingdom|
|536544   |21787    |RAIN PONCHO RETROSPOT              |2       |12/1/2010 14:32|1.66     |null      |United Kingdom|
|536544   |21790    |VINTAGE SNAP CARDS         

In [7]:
# Fillna using a dummy 999999 for CustomerID rows with missing values
df_cleaned = df.fillna(999999, subset=['CustomerID'])

In [8]:
# Ensure 'InvoiceDate' is in DateTime format
df_cleaned = df_cleaned.withColumn('InvoiceDate', expr("to_timestamp(InvoiceDate, 'MM/dd/yyyy HH:mm')"))

# Ensure correct data types for these attributes
df_cleaned = df_cleaned.withColumn('Quantity', col('Quantity').cast('int')) \
                       .withColumn('UnitPrice', col('UnitPrice').cast('float')) \
                       .withColumn('CustomerID', col('CustomerID').cast('int'))

In [9]:
# Filter rows where UnitPrice is below 0
df_negative_unit_price = df_cleaned.filter(col('UnitPrice') < 0)

# Show the resulting rows
df_negative_unit_price.show(truncate=False)

# The Description tells the reason for the negative values, thus I decided to leave them as-is.

+---------+---------+---------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description    |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+---------+---------+---------------+--------+-------------------+---------+----------+--------------+
|A563186  |B        |Adjust bad debt|1       |2011-08-12 14:51:00|-11062.06|999999    |United Kingdom|
|A563187  |B        |Adjust bad debt|1       |2011-08-12 14:52:00|-11062.06|999999    |United Kingdom|
+---------+---------+---------------+--------+-------------------+---------+----------+--------------+



In [10]:
# Filter rows where Quantity is below 0
df_negative_quantity = df_cleaned.filter(col('Quantity') < 0)

# Show the resulting rows
df_negative_quantity.show(truncate=False)

# The negative quantities could be return orders, thus I decided to leave them as-is.

+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|C536379  |D        |Discount                           |-1      |2010-12-01 09:41:00|27.5     |14527     |United Kingdom|
|C536383  |35004C   |SET OF 3 COLOURED  FLYING DUCKS    |-1      |2010-12-01 09:49:00|4.65     |15311     |United Kingdom|
|C536391  |22556    |PLASTERS IN TIN CIRCUS PARADE      |-12     |2010-12-01 10:24:00|1.65     |17548     |United Kingdom|
|C536391  |21984    |PACK OF 12 PINK PAISLEY TISSUES    |-24     |2010-12-01 10:24:00|0.29     |17548     |United Kingdom|
|C536391  |21983    |PACK OF 12 BLUE PAISLEY TISSUES    |-24     |2010-12-01 10:24:00|0.29     |17548     |United Kingdom|
|C536391  |21980

## Calculate total sales and the number of transactions per day

In [11]:
# Calculate total sales (Quantity * UnitPrice)
df_sales = df_cleaned.withColumn('TotalSales', col('Quantity') * col('UnitPrice'))

# Round to 2 decimal places and cast to DecimalType
df_sales = df_sales.withColumn(
    'TotalSales',
    col('TotalSales').cast(DecimalType(10, 2)))

# Group by date and calculate total sales and number of transactions
df_daily_sales = df_sales.groupBy(date_format(col('InvoiceDate'), 'yyyy-MM-dd').alias('Date')) \
                         .agg(sum('TotalSales').alias('TotalSales'), count('InvoiceNo').alias('Transactions')) \
                         .orderBy('Date')

# Show the daily sales and transaction count
df_daily_sales.show()

+----------+----------+------------+
|      Date|TotalSales|Transactions|
+----------+----------+------------+
|2010-12-01|  58635.56|        3108|
|2010-12-02|  46207.28|        2109|
|2010-12-03|  45620.46|        2202|
|2010-12-05|  31383.95|        2725|
|2010-12-06|  53860.18|        3878|
|2010-12-07|  45059.05|        2963|
|2010-12-08|  44189.84|        2647|
|2010-12-09|  52532.13|        2891|
|2010-12-10|  57404.91|        2758|
|2010-12-12|  17240.92|        1451|
|2010-12-13|  35379.34|        2283|
|2010-12-14|  42843.29|        2087|
|2010-12-15|  29443.69|        1349|
|2010-12-16|  48334.35|        1790|
|2010-12-17|  43534.19|        3115|
|2010-12-19|   7517.31|         522|
|2010-12-20|  24741.75|        1763|
|2010-12-21|  47097.94|        1586|
|2010-12-22|   6134.57|         291|
|2010-12-23|  11796.31|         963|
+----------+----------+------------+
only showing top 20 rows



## Identify the top 10 products with the highest sales

In [12]:
# Group by product and calculate total sales
df_top_products = df_sales.groupBy('StockCode', 'Description') \
                          .agg(sum('TotalSales').alias('TotalSales')) \
                          .orderBy(desc('TotalSales')) \
                          .limit(10)

# Show the top 10 products
df_top_products.show(truncate=False)

+---------+----------------------------------+----------+
|StockCode|Description                       |TotalSales|
+---------+----------------------------------+----------+
|DOT      |DOTCOM POSTAGE                    |206245.48 |
|22423    |REGENCY CAKESTAND 3 TIER          |164762.19 |
|47566    |PARTY BUNTING                     |98302.98  |
|85123A   |WHITE HANGING HEART T-LIGHT HOLDER|97715.99  |
|85099B   |JUMBO BAG RED RETROSPOT           |92356.03  |
|23084    |RABBIT NIGHT LIGHT                |66756.59  |
|POST     |POSTAGE                           |66230.64  |
|22086    |PAPER CHAIN KIT 50'S CHRISTMAS    |63791.94  |
|84879    |ASSORTED COLOUR BIRD ORNAMENT     |58959.73  |
|79321    |CHILLI LIGHTS                     |53768.06  |
+---------+----------------------------------+----------+



## Save the transformed data back to a SQL database.

In [12]:
df_pandas = df_cleaned.toPandas()

# Connect to SQLite database
conn = sqlite3.connect('RetailDataAnalysis.db')

# Save the DataFrame to SQL database
df_pandas.to_sql('TransformedRetailData', conn, if_exists='replace', index=False)

# Close the connection
conn.close()

In [13]:
# Stop the Spark session
spark.stop()