In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Sales Data Analysis").getOrCreate()

# Load the dataset
file_path = "Online_Retail.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.printSchema()
df.show(10)

root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2024-01-01 08:26:00|     2.55|     17850|United Kingdom|
|   536366|    71053| WHITE METAL LANTERN|       6|2024-01-01 08:28:00|     3.39|     17850|United Kingdom|
|   536367|   84406B|CREAM CUPID HEART...|       8|2024-01-01 08:34:00|     2.75|     13047|        France|
|   536368

In [5]:
df = df.dropDuplicates()

df = df.dropna(subset=["InvoiceNo", "StockCode", "CustomerID"])

df.show(5)


+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536366|    71053| WHITE METAL LANTERN|       6|2024-01-01 08:28:00|     3.39|     17850|United Kingdom|
|   536368|   84029G|KNITTED UNION FLA...|       6|2024-01-01 08:35:00|     3.39|     13047|        France|
|   536365|   85123A|WHITE HANGING HEA...|       6|2024-01-01 08:26:00|     2.55|     17850|United Kingdom|
|   536367|   84406B|CREAM CUPID HEART...|       8|2024-01-01 08:34:00|     2.75|     13047|        France|
|   536369|   85123A|RED WOOLLY HOTTIE...|       6|2024-01-01 08:45:00|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+



In [6]:
from pyspark.sql.functions import col

df = df.withColumn("TotalPrice", col("Quantity") * col("UnitPrice"))

df = df.withColumn("InvoiceDate", col("InvoiceDate").cast("timestamp"))

df.show(5)


+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|        TotalPrice|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+
|   536366|    71053| WHITE METAL LANTERN|       6|2024-01-01 08:28:00|     3.39|     17850|United Kingdom|             20.34|
|   536368|   84029G|KNITTED UNION FLA...|       6|2024-01-01 08:35:00|     3.39|     13047|        France|             20.34|
|   536365|   85123A|WHITE HANGING HEA...|       6|2024-01-01 08:26:00|     2.55|     17850|United Kingdom|15.299999999999999|
|   536367|   84406B|CREAM CUPID HEART...|       8|2024-01-01 08:34:00|     2.75|     13047|        France|              22.0|
|   536369|   85123A|RED WOOLLY HOTTIE...|       6|2024-01-01 08:45:00|     3.39|     17850|United Kingdom|    

In [7]:
from pyspark.sql.functions import sum

#total revenue
total_revenue = df.agg(sum("TotalPrice").alias("TotalRevenue"))
total_revenue.show()


+------------+
|TotalRevenue|
+------------+
|       98.32|
+------------+



In [8]:
quantity_by_country = df.groupBy("Country").agg(sum("Quantity").alias("TotalQuantity"))
quantity_by_country.show()


+--------------+-------------+
|       Country|TotalQuantity|
+--------------+-------------+
|        France|           14|
|United Kingdom|           18|
+--------------+-------------+



In [9]:
from pyspark.sql.functions import desc

#top 5 products with the highest revenue
top_products = df.groupBy("StockCode").agg(sum("TotalPrice").alias("TotalRevenue")) \
    .orderBy(desc("TotalRevenue")).limit(5)
top_products.show()


+---------+------------+
|StockCode|TotalRevenue|
+---------+------------+
|   85123A|       35.64|
|   84406B|        22.0|
|    71053|       20.34|
|   84029G|       20.34|
+---------+------------+



In [10]:
# customer who spent the most
top_customer = df.groupBy("CustomerID").agg(sum("TotalPrice").alias("TotalSpending")) \
    .orderBy(desc("TotalSpending")).limit(1)
top_customer.show()


+----------+------------------+
|CustomerID|     TotalSpending|
+----------+------------------+
|     17850|55.980000000000004|
+----------+------------------+



In [11]:
# country with highest revenue
top_country = df.groupBy("Country").agg(sum("TotalPrice").alias("CountryRevenue")) \
    .orderBy(desc("CountryRevenue")).limit(1)
top_country.show()


+--------------+------------------+
|       Country|    CountryRevenue|
+--------------+------------------+
|United Kingdom|55.980000000000004|
+--------------+------------------+



In [12]:
# Save transformed dataset to Parquet
df.write.parquet("transformed_sales.parquet")

# Save top products to CSV
top_products.write.csv("top_products.csv", header=True)
