In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:
df = spark.read.csv("/Workspace/Users/meganathan5215@gmail.com/retail_sales.csv", header=True, inferSchema=True)

In [0]:
display(df)

sale_id,product,category,price,quantity,city,payment_method,discount_percent,rating,sale_year
1,Monitor,Electronics,20556,3,Pune,UPI,0,2.6,2022
2,Headphones,Accessories,115048,1,Bangalore,Card,37,2.9,2019
3,Smartwatch,Electronics,111988,5,Mumbai,Cash,14,3.7,2022
4,Monitor,Electronics,134915,4,Pune,UPI,33,1.1,2022
5,Tablet,Electronics,145072,8,Delhi,Cash,34,2.3,2019
6,Smartwatch,Accessories,29875,1,Bangalore,Card,48,4.9,2023
7,Smartwatch,Accessories,141234,1,Bangalore,Cash,24,3.3,2022
8,Monitor,Electronics,17464,4,Delhi,Cash,23,2.0,2022
9,Laptop,Accessories,113929,8,Hyderabad,Cash,11,3.1,2023
10,Tablet,Electronics,67715,5,Pune,Cash,17,2.4,2022


In [0]:
print("Total Rows and Columns")
print(df.count())
print(len(df.columns))

Total Rows and Columns
300
10


In [0]:
df.printSchema()

root
 |-- sale_id: integer (nullable = true)
 |-- product: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- discount_percent: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- sale_year: integer (nullable = true)



In [0]:
display(df.limit(10))

sale_id,product,category,price,quantity,city,payment_method,discount_percent,rating,sale_year
1,Monitor,Electronics,20556,3,Pune,UPI,0,2.6,2022
2,Headphones,Accessories,115048,1,Bangalore,Card,37,2.9,2019
3,Smartwatch,Electronics,111988,5,Mumbai,Cash,14,3.7,2022
4,Monitor,Electronics,134915,4,Pune,UPI,33,1.1,2022
5,Tablet,Electronics,145072,8,Delhi,Cash,34,2.3,2019
6,Smartwatch,Accessories,29875,1,Bangalore,Card,48,4.9,2023
7,Smartwatch,Accessories,141234,1,Bangalore,Cash,24,3.3,2022
8,Monitor,Electronics,17464,4,Delhi,Cash,23,2.0,2022
9,Laptop,Accessories,113929,8,Hyderabad,Cash,11,3.1,2023
10,Tablet,Electronics,67715,5,Pune,Cash,17,2.4,2022


In [0]:
display(df.describe())

summary,sale_id,product,category,price,quantity,city,payment_method,discount_percent,rating,sale_year
count,300.0,300,300,300.0,300.0,300,300,300.0,300.0,300.0
mean,150.5,,,70055.09666666666,5.076666666666667,,,24.346666666666668,2.985333333333331,2021.5933333333328
stddev,86.74675786448736,,,42600.51819648856,2.561116935129458,,,14.462030400323751,1.1082473835291555,1.6904083798076637
min,1.0,Headphones,Accessories,904.0,1.0,Bangalore,Card,0.0,1.0,2019.0
max,300.0,Tablet,Electronics,149956.0,9.0,Pune,UPI,49.0,5.0,2024.0


In [0]:
# Missing Value Analysis

In [0]:
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).display()

sale_id,product,category,price,quantity,city,payment_method,discount_percent,rating,sale_year
0,0,0,0,0,0,0,0,0,0


In [0]:
# 2. DATA CLEANING
from pyspark.sql.functions import col

# Drop exact duplicate rows
df = df.dropDuplicates()

In [0]:
df = df.withColumn(
        "discount_amount",
        col("price") * (col("discount_percent") / 100)
     ) \
     .withColumn(
        "net_price_per_unit",
        col("price") - col("discount_amount")
     ) \
     .withColumn(
        "total_amount",
        col("net_price_per_unit") * col("quantity")
     )

display(df.limit(10))


sale_id,product,category,price,quantity,city,payment_method,discount_percent,rating,sale_year,discount_amount,net_price_per_unit,total_amount
298,Headphones,Accessories,97739,2,Hyderabad,UPI,13,1.3,2023,12706.07,85032.93,170065.86
30,Smartwatch,Accessories,23789,3,Hyderabad,UPI,2,4.5,2024,475.78,23313.22,69939.66
191,Headphones,Electronics,142547,7,Bangalore,UPI,33,2.8,2023,47040.51,95506.49,668545.4299999999
96,Keyboard,Accessories,113356,8,Hyderabad,Cash,25,3.6,2019,28339.0,85017.0,680136.0
150,Tablet,Electronics,25789,5,Hyderabad,Card,45,4.5,2020,11605.05,14183.95,70919.75
128,Tablet,Accessories,9848,9,Pune,Cash,46,1.0,2023,4530.08,5317.92,47861.28
83,Keyboard,Accessories,39256,2,Hyderabad,UPI,9,3.1,2022,3533.04,35722.96,71445.92
130,Mobile,Accessories,74960,8,Chennai,UPI,10,3.1,2024,7496.0,67464.0,539712.0
40,Monitor,Accessories,104682,2,Chennai,Card,19,2.4,2023,19889.58,84792.42,169584.84
224,Mobile,Accessories,57070,7,Bangalore,Cash,40,3.4,2024,22828.0,34242.0,239694.0


In [0]:
from pyspark.sql.functions import when

df = df.withColumn("is_high_value",when(col("total_amount") > 100000, 1).otherwise(0)
)

df = df.withColumn("good_rating",when(col("rating") >= 4, 1).otherwise(0)
)

display(df.limit(10))


sale_id,product,category,price,quantity,city,payment_method,discount_percent,rating,sale_year,discount_amount,net_price_per_unit,total_amount,is_high_value,good_rating
298,Headphones,Accessories,97739,2,Hyderabad,UPI,13,1.3,2023,12706.07,85032.93,170065.86,1,0
30,Smartwatch,Accessories,23789,3,Hyderabad,UPI,2,4.5,2024,475.78,23313.22,69939.66,0,1
191,Headphones,Electronics,142547,7,Bangalore,UPI,33,2.8,2023,47040.51,95506.49,668545.4299999999,1,0
96,Keyboard,Accessories,113356,8,Hyderabad,Cash,25,3.6,2019,28339.0,85017.0,680136.0,1,0
150,Tablet,Electronics,25789,5,Hyderabad,Card,45,4.5,2020,11605.05,14183.95,70919.75,0,1
128,Tablet,Accessories,9848,9,Pune,Cash,46,1.0,2023,4530.08,5317.92,47861.28,0,0
83,Keyboard,Accessories,39256,2,Hyderabad,UPI,9,3.1,2022,3533.04,35722.96,71445.92,0,0
130,Mobile,Accessories,74960,8,Chennai,UPI,10,3.1,2024,7496.0,67464.0,539712.0,1,0
40,Monitor,Accessories,104682,2,Chennai,Card,19,2.4,2023,19889.58,84792.42,169584.84,1,0
224,Mobile,Accessories,57070,7,Bangalore,Cash,40,3.4,2024,22828.0,34242.0,239694.0,1,0


In [0]:
from pyspark.sql.functions import sum as _sum

display(
    df.groupBy("city")
      .agg(_sum("total_amount").alias("city_revenue"))
      .orderBy(col("city_revenue").desc())
)


city,city_revenue
Bangalore,17078803.309999995
Chennai,14979168.129999995
Hyderabad,14454358.160000002
Pune,13430927.38
Delhi,13069764.7
Mumbai,10156785.229999997


In [0]:
display(
    df.groupBy("product")
      .agg(_sum("total_amount").alias("revenue"))
      .orderBy(col("revenue").desc())
      .limit(10)
)


product,revenue
Headphones,16395586.47
Tablet,12067688.51
Monitor,11986488.619999995
Mobile,11439722.85
Keyboard,11135350.160000002
Laptop,10569171.399999997
Smartwatch,9575798.9


In [0]:
display(
    df.groupBy("category")
      .agg(_sum("total_amount").alias("category_revenue"))
      .orderBy(col("category_revenue").desc())
)


category,category_revenue
Accessories,45820599.18999999
Electronics,37349207.72000001


In [0]:
df.createOrReplaceTempView("retail")

display(spark.sql("""
SELECT city,payment_method,SUM(total_amount) AS revenue
FROM retail
GROUP BY city, payment_method
ORDER BY revenue DESC
"""))


city,payment_method,revenue
Bangalore,Cash,6661711.879999999
Chennai,UPI,6458911.51
Delhi,UPI,5876444.29
Hyderabad,Cash,5555048.430000001
Bangalore,UPI,5411050.6000000015
Pune,Cash,5076172.470000001
Bangalore,Card,5006040.829999999
Hyderabad,UPI,4918440.790000001
Mumbai,Card,4625187.04
Chennai,Cash,4471145.34


In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline

numeric_cols = ["price", "quantity", "discount_percent", "rating", "sale_year"]
categorical_cols = ["product", "category", "city", "payment_method"]

# Index categorical values
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx") for c in categorical_cols]

# One-hot encode
encoders = [OneHotEncoder(inputCols=[c+"_idx"], outputCols=[c+"_oh"]) for c in categorical_cols]

# Assemble all features
assembler = VectorAssembler(
    inputCols=numeric_cols + [c+"_oh" for c in categorical_cols],
    outputCol="features"
)

rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="total_amount"
)

pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])


In [0]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train_df)

predictions = model.transform(test_df)

display(predictions.select("sale_id", "total_amount", "prediction").limit(10))


sale_id,total_amount,prediction
9,811174.48,615553.1557503139
12,495407.52,424215.9851070239
14,14703.8,52873.7834476118
22,352837.92,259738.38756370783
24,427114.8,412030.89607661794
25,199224.54,202925.88216018668
26,289931.04,317408.3978724232
28,78528.9,148110.57953499694
31,448440.3,519392.9840303519
35,35642.0,87772.76239447792


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

rmse = RegressionEvaluator(
    labelCol="total_amount",
    predictionCol="prediction",
    metricName="rmse"
).evaluate(predictions)

r2 = RegressionEvaluator(
    labelCol="total_amount",
    predictionCol="prediction",
    metricName="r2"
).evaluate(predictions)

print("RMSE =", rmse)
print("R2 =", r2)


RMSE = 137952.28791315205
R2 = 0.7712612845451102
