In [6]:
# Install Java

!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [7]:
!java -version

openjdk version "11.0.26" 2025-01-21
OpenJDK Runtime Environment (build 11.0.26+4-post-Ubuntu-1ubuntu122.04)
OpenJDK 64-Bit Server VM (build 11.0.26+4-post-Ubuntu-1ubuntu122.04, mixed mode, sharing)


In [8]:
!pip install pyspark



In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [10]:
# Create a Spark session

spark = SparkSession.builder \
    .appName("RetailSalesETLPipeline") \
    .getOrCreate()

In [11]:
# Upload files

from google.colab import files
uploaded = files.upload()

Saving BigMart Sales.csv to BigMart Sales (1).csv


**EXTRACT**

In [12]:
df = spark.read.option("header", True).option("inferSchema", True).csv("BigMart Sales.csv")

In [13]:
df.show(10)

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma

In [14]:
df.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: double (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)



**TRANSFORM**

In [15]:
# Checking nulls in all columns

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+
|              0|       1463|               0|              0|        0|       0|                0|                        0|       2410|                   0|          0|                0|
+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+



In [16]:
# -------[Filling nulls in Item_Identifier column with average weight] ---------

In [17]:
# Group by Item_Identifier and calculate average weight

avg_weight_df = df.groupBy("Item_Identifier") \
                  .agg(avg("Item_Weight").alias("avg_weight"))

In [18]:
# Join this average weight data back to the original dataset

df = df.join(avg_weight_df, on="Item_Identifier", how="left")

In [19]:
# Fill missing Item_Weight using avg_weight

df = df.withColumn("Item_Weight",
                   when(col("Item_Weight").isNull(), col("avg_weight")) \
                   .otherwise(col("Item_Weight")))

In [20]:
#  Drop the helper column

df = df.drop("avg_weight")

In [21]:
df.show(10)

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma

In [22]:
# Checking nulls

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+
|              0|          4|               0|              0|        0|       0|                0|                        0|       2410|                   0|          0|                0|
+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+



In [23]:
# Remaining 4 nulls

df.filter(col("Item_Weight").isNull()).show()

+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|   Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDN52|       NULL|         Regular|     0.13093275|Frozen Foods| 86.9198|           OUT027|                     1985|     Medium|              Tier 3|Supermarket Type3|        1569.9564|
|          FDK57|       NULL|         Low Fat|    0.079904068| Snack Foods| 120.044|           OUT027|                     1985|     Medium|              Tier 3|Supermarket Type3|         4434.228|
|         

In [24]:
# Filling those 4 nulls

df = df.withColumn("Item_Weight", when(col("Item_Identifier") == "FDN52", 15.8)
                                     .when(col("Item_Identifier") == "FDK57", 20.5)
                                     .when(col("Item_Identifier") == "FDE52", 16.3)
                                     .when(col("Item_Identifier") == "FDQ60", 21.7)
                                     .otherwise(col("Item_Weight")))


In [25]:
# checking those 4 nulls

df.filter(df.Item_Identifier.isin("FDN52", "FDK57", "FDE52", "FDQ60")).select("Item_Identifier", "Item_Type", "Item_Weight").show()

+---------------+------------+-----------+
|Item_Identifier|   Item_Type|Item_Weight|
+---------------+------------+-----------+
|          FDN52|Frozen Foods|       15.8|
|          FDK57| Snack Foods|       20.5|
|          FDE52|       Dairy|       16.3|
|          FDQ60|Baking Goods|       21.7|
+---------------+------------+-----------+



In [26]:
# Rounding off Item_Weight values

df = df.withColumn("Item_Weight", round(df["Item_Weight"], 1))

In [27]:
# Checking nulls ,,, 0 nulls in Item_Weight column

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+
|              0|          0|               0|              0|        0|       0|                0|                        0|       2410|                   0|          0|                0|
+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+



In [28]:
df.show(10)

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|        5.9|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma

In [29]:
# ------- [Filling nulls in Outlet_Size column with Most Frequent Value i.e Small or Medium or High] --------

In [30]:
# Get the most frequent outlet size

mode_outlet_size = df.groupBy("Outlet_Size").count().orderBy("count", ascending=False).first()[0]

In [31]:
# Fill nulls with the most frequent outlet size

df = df.fillna({"Outlet_Size": mode_outlet_size})

In [32]:
# Checking nulls ,,, 0 nulls in Outlet_Size column

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+
|              0|          0|               0|              0|        0|       0|                0|                        0|          0|                   0|          0|                0|
+---------------+-----------+----------------+---------------+---------+--------+-----------------+-------------------------+-----------+--------------------+-----------+-----------------+



In [33]:
df.show(10)

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|        5.9|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma

In [34]:
# ------ [Fix Inconsistent Categorical Data] ------

In [35]:
# Standardizing values in 'Item_Fat_Content'

df = df.withColumn(
    "Item_Fat_Content",
    when(df["Item_Fat_Content"] == "low fat", "Low Fat")
    .when(df["Item_Fat_Content"] == "LF", "Low Fat")
    .otherwise(df["Item_Fat_Content"])
)

In [36]:
# Getting distinct values from the 'Item_Fat_Content' column

df.select('Item_Fat_Content').distinct().show()

+----------------+
|Item_Fat_Content|
+----------------+
|         Low Fat|
|         Regular|
|             reg|
+----------------+



In [37]:
# Replacing 'reg' with 'Regular' in the 'Item_Fat_Content' column

df = df.withColumn('Item_Fat_Content',
                   when(df['Item_Fat_Content'] == 'reg', 'Regular')
                   .otherwise(df['Item_Fat_Content']))

In [38]:
# Verify the changes

df.select('Item_Fat_Content').distinct().show()

+----------------+
|Item_Fat_Content|
+----------------+
|         Low Fat|
|         Regular|
+----------------+



In [39]:
df.show(10)

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|        5.9|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma

In [40]:
# ------- [Rounding off 'Item_Visibility' column values] --------

In [41]:
# Checking 'Item_Visibility' column values

df.select('Item_Visibility').show(10)

+---------------+
|Item_Visibility|
+---------------+
|    0.016047301|
|    0.019278216|
|    0.016760075|
|            0.0|
|            0.0|
|            0.0|
|    0.012741089|
|    0.127469857|
|    0.016687114|
|     0.09444959|
+---------------+
only showing top 10 rows



In [42]:
# Round off 'Item_Visibility' to 2 decimal places

df = df.withColumn('Item_Visibility', round(df['Item_Visibility'], 2))

In [43]:
# Checking 'Item_Visibility' column values

df.select('Item_Visibility').show(10)

+---------------+
|Item_Visibility|
+---------------+
|           0.02|
|           0.02|
|           0.02|
|            0.0|
|            0.0|
|            0.0|
|           0.01|
|           0.13|
|           0.02|
|           0.09|
+---------------+
only showing top 10 rows



In [44]:
df.show(10)

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|           0.02|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|        5.9|         Regular|           0.02|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma

In [45]:
# ------- [Correcting names in 'Item_Type' column] --------

In [46]:
# Checking 'Item_Type' column distinct names

df.select('Item_Type').distinct().show()

+--------------------+
|           Item_Type|
+--------------------+
|       Starchy Foods|
|        Baking Goods|
|              Breads|
|Fruits and Vegeta...|
|                Meat|
|         Hard Drinks|
|         Soft Drinks|
|           Household|
|           Breakfast|
|               Dairy|
|         Snack Foods|
|              Others|
|             Seafood|
|              Canned|
|        Frozen Foods|
|  Health and Hygiene|
+--------------------+



In [47]:
# Update 'Fruits and Vegetables' value properly

df = df.withColumn(
    'Item_Type',
    when(col('Item_Type').like('Fruits and Vegeta%'), 'Fruits and Veg')
    .otherwise(col('Item_Type'))
)

In [48]:
# Checking 'Item_Type' column distinct names

df.select('Item_Type').distinct().show()

+------------------+
|         Item_Type|
+------------------+
|    Fruits and Veg|
|     Starchy Foods|
|      Baking Goods|
|            Breads|
|              Meat|
|       Hard Drinks|
|       Soft Drinks|
|         Household|
|         Breakfast|
|             Dairy|
|       Snack Foods|
|            Others|
|           Seafood|
|            Canned|
|      Frozen Foods|
|Health and Hygiene|
+------------------+



In [49]:
df.show(10)

+---------------+-----------+----------------+---------------+--------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|     Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|           0.02|         Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|        5.9|         Regular|           0.02|   Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Supermarket Type2|         443.4228|


In [50]:
# ------- [Rounding off 'Item_MRP' column values] --------

In [51]:
# Checking 'Item_MRP' column

df.select('Item_MRP').show(10)

+--------+
|Item_MRP|
+--------+
|249.8092|
| 48.2692|
| 141.618|
| 182.095|
| 53.8614|
| 51.4008|
| 57.6588|
|107.7622|
| 96.9726|
|187.8214|
+--------+
only showing top 10 rows



In [52]:
# Rounding off 'Item_MRP' values to 2 decimal places

df = df.withColumn("Item_MRP", round(col("Item_MRP"), 2))

In [53]:
# Checking 'Item_MRP' column round off values

df.select('Item_MRP').show(10)

+--------+
|Item_MRP|
+--------+
|  249.81|
|   48.27|
|  141.62|
|   182.1|
|   53.86|
|    51.4|
|   57.66|
|  107.76|
|   96.97|
|  187.82|
+--------+
only showing top 10 rows



In [54]:
df.show(10)

+---------------+-----------+----------------+---------------+--------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|     Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|           0.02|         Dairy|  249.81|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|        5.9|         Regular|           0.02|   Soft Drinks|   48.27|           OUT018|                     2009|     Medium|              Tier 3|Supermarket Type2|         443.4228|


In [55]:
# ------- [Rounding off 'Item_Outlet_Sales' column values] --------

In [56]:
# Checking 'Item_MRP' column

df.select('Item_Outlet_Sales').show(10)

+-----------------+
|Item_Outlet_Sales|
+-----------------+
|         3735.138|
|         443.4228|
|          2097.27|
|           732.38|
|         994.7052|
|         556.6088|
|         343.5528|
|        4022.7636|
|        1076.5986|
|         4710.535|
+-----------------+
only showing top 10 rows



In [57]:
# Rounding off 'Item_Outlet_Sales' values to 2 decimal places

df = df.withColumn("Item_Outlet_Sales", round(col("Item_Outlet_Sales"), 2))

In [58]:
# Checking 'Item_Outlet_Sales' column round off values

df.select('Item_Outlet_Sales').show(10)

+-----------------+
|Item_Outlet_Sales|
+-----------------+
|          3735.14|
|           443.42|
|          2097.27|
|           732.38|
|           994.71|
|           556.61|
|           343.55|
|          4022.76|
|           1076.6|
|          4710.54|
+-----------------+
only showing top 10 rows



In [59]:
df.show(10)

+---------------+-----------+----------------+---------------+--------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|     Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|           0.02|         Dairy|  249.81|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|          3735.14|
|          DRC01|        5.9|         Regular|           0.02|   Soft Drinks|   48.27|           OUT018|                     2009|     Medium|              Tier 3|Supermarket Type2|           443.42|


In [61]:
# ------ [Removing Duplicates] ------

In [87]:
# Step 1: Count rows before removing duplicates
df.count()

8523

In [88]:
# Step 2: Remove duplicates
df = df.dropDuplicates()

In [89]:
# Step 3: Count rows again after removing duplicates
df.count()

8523

In [65]:
# ------ [Check data types of all columns] ------

In [90]:
df.dtypes

[('Item_Identifier', 'string'),
 ('Item_Weight', 'double'),
 ('Item_Fat_Content', 'string'),
 ('Item_Visibility', 'double'),
 ('Item_Type', 'string'),
 ('Item_MRP', 'double'),
 ('Outlet_Identifier', 'string'),
 ('Outlet_Establishment_Year', 'int'),
 ('Outlet_Size', 'string'),
 ('Outlet_Location_Type', 'string'),
 ('Outlet_Type', 'string'),
 ('Item_Outlet_Sales', 'double')]

In [69]:
# ------ [Create Derived Columns] -------

In [91]:
# Add Total_Revenue column

df = df.withColumn(
    "Total_Revenue",
    col("Item_MRP") * col("Item_Outlet_Sales")
)

In [92]:
# Add Price_Range column (Low/Medium/High)

df = df.withColumn(
    "Price_Range",
    when(col("Item_MRP") < 70, "Low")
    .when((col("Item_MRP") >= 70) & (col("Item_MRP") < 150), "Medium")
    .otherwise("High")
)

In [93]:
# View sample output

df.select("Item_MRP", "Item_Outlet_Sales", "Total_Revenue", "Price_Range").show(10)

+--------+-----------------+------------------+-----------+
|Item_MRP|Item_Outlet_Sales|     Total_Revenue|Price_Range|
+--------+-----------------+------------------+-----------+
|   91.45|          2082.62|        190455.599|     Medium|
|    76.9|           607.21| 46694.44900000001|     Medium|
|   55.89|           226.37|12651.819300000001|        Low|
|  158.99|          1757.71|       279458.3129|       High|
|  153.47|          2287.02|       350988.9594|       High|
|   41.55|          1358.23|        56434.4565|        Low|
|  167.11|          2874.92|480427.88120000006|       High|
|  199.81|           2380.9|        475727.629|       High|
|   39.65|          1024.67|        40628.1655|        Low|
|   54.66|           884.18| 48329.27879999999|        Low|
+--------+-----------------+------------------+-----------+
only showing top 10 rows



In [94]:
# Round off Total_Revenue column

df = df.withColumn("Total_Revenue", round(col("Total_Revenue"), 2))

In [95]:
# View sample output

df.select("Item_MRP", "Item_Outlet_Sales", "Total_Revenue", "Price_Range").show(10)

+--------+-----------------+-------------+-----------+
|Item_MRP|Item_Outlet_Sales|Total_Revenue|Price_Range|
+--------+-----------------+-------------+-----------+
|   91.45|          2082.62|     190455.6|     Medium|
|    76.9|           607.21|     46694.45|     Medium|
|   55.89|           226.37|     12651.82|        Low|
|  158.99|          1757.71|    279458.31|       High|
|  153.47|          2287.02|    350988.96|       High|
|   41.55|          1358.23|     56434.46|        Low|
|  167.11|          2874.92|    480427.88|       High|
|  199.81|           2380.9|    475727.63|       High|
|   39.65|          1024.67|     40628.17|        Low|
|   54.66|           884.18|     48329.28|        Low|
+--------+-----------------+-------------+-----------+
only showing top 10 rows



In [96]:
df.show(10)

+---------------+-----------+----------------+---------------+------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-------------+-----------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|         Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Total_Revenue|Price_Range|
+---------------+-----------+----------------+---------------+------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-------------+-----------+
|          FDL22|       16.9|         Low Fat|           0.04|       Snack Foods|   91.45|           OUT046|                     1997|      Small|              Tier 1|Supermarket Type1|          2082.62|     190455.6|     Medium|
|          FDS19|       13.8|         Regular|           0.06|    Fruits and Veg

In [109]:
# Backup Dataframe

df_backup2 = df

In [110]:
df.show(5)

+---------------+-----------+----------------+---------------+--------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-------------+-----------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|     Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Total_Revenue|Price_Range|
+---------------+-----------+----------------+---------------+--------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-------------+-----------+
|          FDL22|       16.9|         Low Fat|           0.04|   Snack Foods|   91.45|           OUT046|                     1997|      Small|              Tier 1|Supermarket Type1|          2082.62|     190455.6|     Medium|
|          FDS19|       13.8|         Regular|           0.06|Fruits and Veg|    76.9|          

In [111]:
df_backup2.show(5)

+---------------+-----------+----------------+---------------+--------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-------------+-----------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|     Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Total_Revenue|Price_Range|
+---------------+-----------+----------------+---------------+--------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+-------------+-----------+
|          FDL22|       16.9|         Low Fat|           0.04|   Snack Foods|   91.45|           OUT046|                     1997|      Small|              Tier 1|Supermarket Type1|          2082.62|     190455.6|     Medium|
|          FDS19|       13.8|         Regular|           0.06|Fruits and Veg|    76.9|          

In [78]:
# ------ [Aggregation for Insights] -------
# Total and Average Sales by Outlet_Establishment_Year

In [112]:
# Total Sales by Outlet_Establishment_Year

df_backup2_total_sales_year = df_backup2.groupBy("Outlet_Establishment_Year").agg(
    sum("Item_Outlet_Sales").alias("Total_Sales_Year")
)

In [113]:
# Average Sales by Outlet_Establishment_Year

df_backup2_avg_sales_year = df_backup2.groupBy("Outlet_Establishment_Year").agg(
    avg("Item_Outlet_Sales").alias("Average_Sales_Year")
)

In [114]:
 # Join the aggregated results back into df_backup1

df_backup2 = df_backup2.join(
    df_backup2_total_sales_year, on="Outlet_Establishment_Year", how="left"
).join(
    df_backup2_avg_sales_year, on="Outlet_Establishment_Year", how="left"
)

In [115]:
# Display the result

df_backup2.show(10)

+-------------------------+---------------+-----------+----------------+---------------+------------------+--------+-----------------+-----------+--------------------+-----------------+-----------------+-------------+-----------+------------------+------------------+
|Outlet_Establishment_Year|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|         Item_Type|Item_MRP|Outlet_Identifier|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Total_Revenue|Price_Range|  Total_Sales_Year|Average_Sales_Year|
+-------------------------+---------------+-----------+----------------+---------------+------------------+--------+-----------------+-----------+--------------------+-----------------+-----------------+-------------+-----------+------------------+------------------+
|                     1997|          FDL22|       16.9|         Low Fat|           0.04|       Snack Foods|   91.45|           OUT046|      Small|              Tier 1|Supermarket Type1|          2

In [116]:
# Round off Total_Sales_Year & Average_Sales_Year column

df_backup2 = df_backup2.withColumn("Total_Sales_Year", round(col("Total_Sales_Year"), 2)) \
       .withColumn("Average_Sales_Year", round(col("Average_Sales_Year"), 2))

In [117]:
# View sample output

df_backup2.select("Total_Sales_Year", "Average_Sales_Year").show(10)

+----------------+------------------+
|Total_Sales_Year|Average_Sales_Year|
+----------------+------------------+
|      2118395.35|           2277.84|
|      2142663.66|            2299.0|
|       188340.22|            339.35|
|      2118395.35|           2277.84|
|      3633620.22|           2483.68|
|      3633620.22|           2483.68|
|      2268123.09|           2438.84|
|      2118395.35|           2277.84|
|      3633620.22|           2483.68|
|      2167465.52|           2340.68|
+----------------+------------------+
only showing top 10 rows



In [118]:
df_backup2.show(10)

+-------------------------+---------------+-----------+----------------+---------------+------------------+--------+-----------------+-----------+--------------------+-----------------+-----------------+-------------+-----------+----------------+------------------+
|Outlet_Establishment_Year|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|         Item_Type|Item_MRP|Outlet_Identifier|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Total_Revenue|Price_Range|Total_Sales_Year|Average_Sales_Year|
+-------------------------+---------------+-----------+----------------+---------------+------------------+--------+-----------------+-----------+--------------------+-----------------+-----------------+-------------+-----------+----------------+------------------+
|                     1997|          FDL22|       16.9|         Low Fat|           0.04|       Snack Foods|   91.45|           OUT046|      Small|              Tier 1|Supermarket Type1|          2082.62

In [119]:
# Final Dataframe

df_final = df_backup2

In [120]:
df_final.show(10)

+-------------------------+---------------+-----------+----------------+---------------+------------------+--------+-----------------+-----------+--------------------+-----------------+-----------------+-------------+-----------+----------------+------------------+
|Outlet_Establishment_Year|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|         Item_Type|Item_MRP|Outlet_Identifier|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|Total_Revenue|Price_Range|Total_Sales_Year|Average_Sales_Year|
+-------------------------+---------------+-----------+----------------+---------------+------------------+--------+-----------------+-----------+--------------------+-----------------+-----------------+-------------+-----------+----------------+------------------+
|                     1997|          FDL22|       16.9|         Low Fat|           0.04|       Snack Foods|   91.45|           OUT046|      Small|              Tier 1|Supermarket Type1|          2082.62

# **Loading**

In [121]:
# Save the final cleaned DataFrame to CSV for Power BI (Load step)

df_final.write.csv("Retail_Sales_ETL.csv", header=True, mode="overwrite")

In [122]:
df_final.coalesce(1).write.csv("Retail_ETL_Final_Single.csv", header=True, mode="overwrite")