TASK -1

In [0]:
df = spark.read .option("header", "true") .option("inferSchema", "true") .csv("/Volumes/workspace/default/online_retail")

In [0]:
df.printSchema()

In [0]:
df.show(5)

In [0]:
df.count()

DATA CLEANING

In [0]:
from pyspark.sql.functions import col, sum
null_counts = df.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df.columns
])

null_counts.show(truncate=False)

In [0]:
df = df.fillna({"Description":"Unknown Product"})

In [0]:
sales_df = df
customer_df = df.filter(col("CustomerID").isNotNull())

In [0]:
sales_df.filter((col("Quantity") <= 0) | (col("UnitPrice") <= 0)).count()


In [0]:
valid_sales_df = df.filter(
    (col("Quantity") > 0) &
    (col("UnitPrice") > 0) &
    (~col("InvoiceNo").startswith("C"))
)


In [0]:
cancelled_df = df.filter(col("InvoiceNo").startswith("C"))

error_df = df.filter(
    (col("Quantity") == 0) | (col("UnitPrice") <= 0)
)


In [0]:
returns_df = df.filter(
    (col("Quantity") < 0) &
    (~col("InvoiceNo").startswith("C"))
)

In [0]:
df = df.dropDuplicates()


In [0]:
from pyspark.sql.functions import col

df = df.withColumn("CustomerID", col("CustomerID").cast("string"))
df.printSchema()


In [0]:
valid_sales_df = valid_sales_df.withColumn(
    "TotalPrice",
    col("Quantity") * col("UnitPrice")
)

In [0]:

valid_sales_df.count()


In [0]:
print("Original:", df.count())
print("Valid:", valid_sales_df.count())
print("Cancelled:", cancelled_df.count())
print("Errors:", error_df.count())
print("Customer:", customer_df.count())
print("returns:",returns_df.count())
cancelled_df.show(1000)


TASK - 2

In [0]:
most_cancelled = (
    cancelled_df
    .groupBy("StockCode", "Description")
    .count() 
    .orderBy(col("count").desc())
)
most_cancelled.show()


In [0]:
from pyspark.sql.functions import sum

most_cancelled_qty = (
    cancelled_df
    .groupBy("StockCode", "Description")
    .agg(sum("Quantity").alias("total_cancelled_qty"))
    .orderBy(col("total_cancelled_qty"))
)
most_cancelled_qty.show(10, False)


Top Selling Products

By Total Quantity Sold

In [0]:
from pyspark.sql.functions import sum, col

top_products = (
    valid_sales_df
    .groupBy("StockCode", "Description")
    .agg(sum("Quantity").alias("Total_Quantity"))
    .orderBy(col("Total_Quantity").desc())
)

top_products.show(10, False)


By Total Revenue

In [0]:
top_revenue_products = (
    valid_sales_df
    .groupBy("StockCode", "Description")
    .agg(sum("TotalPrice").alias("Total_Revenue"))
    .orderBy(col("Total_Revenue").desc())
)
top_revenue_products.show(10, False)

Most Frequently Purchased Product

In [0]:
top_freq = (
    valid_sales_df
    .groupBy("StockCode", "Description")
    .count()
    .orderBy(col("count").desc())
)

top_freq.show(10, False)


High Frequency low revenue products

In [0]:
from pyspark.sql.functions import col

sales_with_revenue = valid_sales_df.withColumn(
    "Revenue",
    col("Quantity") * col("UnitPrice")
)


In [0]:
from pyspark.sql.functions import col, sum, count

product_stats = (
    sales_with_revenue
    .groupBy("StockCode", "Description")
    .agg(
        count("*").alias("Purchase_Frequency"),
        sum("Revenue").alias("Total_Revenue")
    )
)
from pyspark.sql.functions import avg

avg_vals = product_stats.select(
    avg("Purchase_Frequency").alias("avg_freq"),
    avg("Total_Revenue").alias("avg_rev")
).collect()[0]

high_freq_low_rev = product_stats.filter(
    (col("Purchase_Frequency") > avg_vals["avg_freq"]) &
    (col("Total_Revenue") < avg_vals["avg_rev"])
)


In [0]:
high_freq_low_rev.orderBy(col("Purchase_Frequency").desc()).show(20, False)


Revenue per Country

In [0]:
from pyspark.sql.functions import sum 

country_sales = (
    sales_with_revenue
    .groupBy("Country")
    .agg(sum("Revenue").alias("Total_Revenue"))
)



In [0]:
top_countries = country_sales.orderBy(col("Total_Revenue").desc())

top_countries.show(10, False)

In [0]:
least_countries = country_sales.orderBy(col("Total_Revenue").asc())

least_countries.show(10, False)


In [0]:
base_path = "/Volumes/workspace/default/online_retail/Cleaned_files"

valid_sales_df.write.mode("overwrite").parquet(f"{base_path}/valid_sales")
cancelled_df.write.mode("overwrite").parquet(f"{base_path}/cancelled")
returns_df.write.mode("overwrite").parquet(f"{base_path}/returns")
customer_df.write.mode("overwrite").parquet(f"{base_path}/customers")
