# ETL Project: Analyzing Factors Affecting Sales in a Large Retailer

### Author: Mirunan

---

## Project Overview

This project aims to analyze the factors affecting sales in a large retail chain. Using a structured ETL (Extract, Transform, Load) process, we will analyze data from various sources to understand the impact of external factors like temperature, fuel price, consumer price index (CPI), and unemployment on weekly sales.

In [1]:
import findspark
findspark.init()


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("My PySpark Notebook") \
    .getOrCreate()


In [3]:
# Load a CSV file into a DataFrame
df = spark.read.csv("C:\\Users\\mirun\\pyspark\\Walmart_Sales.csv", header=True, inferSchema=True)


In [4]:
import os
print(os.getcwd())


C:\Users\mirun\pyspark


In [5]:
#STEP - 1 EXTRACT


df.show(5)

+-----+----------+------------+------------+-----------+----------+-----------+------------+
|Store|      Date|Weekly_Sales|Holiday_Flag|Temperature|Fuel_Price|        CPI|Unemployment|
+-----+----------+------------+------------+-----------+----------+-----------+------------+
|    1|05-02-2010|   1643690.9|           0|      42.31|     2.572|211.0963582|       8.106|
|    1|12-02-2010|  1641957.44|           1|      38.51|     2.548|211.2421698|       8.106|
|    1|19-02-2010|  1611968.17|           0|      39.93|     2.514|211.2891429|       8.106|
|    1|26-02-2010|  1409727.59|           0|      46.63|     2.561|211.3196429|       8.106|
|    1|05-03-2010|  1554806.68|           0|       46.5|     2.625|211.3501429|       8.106|
+-----+----------+------------+------------+-----------+----------+-----------+------------+
only showing top 5 rows



In [6]:
df.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Weekly_Sales: double (nullable = true)
 |-- Holiday_Flag: integer (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)



In [7]:
df.describe().show()

+-------+------------------+----------+------------------+-------------------+------------------+-------------------+-----------------+------------------+
|summary|             Store|      Date|      Weekly_Sales|       Holiday_Flag|       Temperature|         Fuel_Price|              CPI|      Unemployment|
+-------+------------------+----------+------------------+-------------------+------------------+-------------------+-----------------+------------------+
|  count|              6435|      6435|              6435|               6435|              6435|               6435|             6435|              6435|
|   mean|              23.0|      null|1046964.8775617732|0.06993006993006994| 60.66378243978229|  3.358606837606832|171.5783938487799| 7.999151048951067|
| stddev|12.988182381175454|      null| 564366.6220536977| 0.2550489443698279|18.444932875811585|0.45901970719285223|39.35671229566419|1.8758847818627944|
|    min|                 1|01-04-2011|         209986.25|            

In [8]:
# TRASNOFRM

In [9]:
from pyspark.sql.functions import col, to_date, month, sum, avg, count, when


null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
null_counts.show()

+-----+----+------------+------------+-----------+----------+---+------------+
|Store|Date|Weekly_Sales|Holiday_Flag|Temperature|Fuel_Price|CPI|Unemployment|
+-----+----+------------+------------+-----------+----------+---+------------+
|    0|   0|           0|           0|          0|         0|  0|           0|
+-----+----+------------+------------+-----------+----------+---+------------+



In [10]:
#CHECK FOR INCCRECT VALUES
negative_sales = df.filter(col("Weekly_Sales") < 0)
negative_sales_count = negative_sales.count()
print(f"Number of rows with negative Weekly Sales: {negative_sales_count}")



Number of rows with negative Weekly Sales: 0


In [11]:
# Convert Temperature from Fahrenheit to Celsius
df = df.withColumn("Temperature_Celsius", (col("Temperature") - 32) * 5 / 9)


# Check for unrealistic Temperature values in Celsius
unrealistic_temperature = df.filter((col("Temperature_Celsius") < -50) | (col("Temperature_Celsius") > 50))
unrealistic_temperature_count = unrealistic_temperature.count()
print(f"Number of rows with unrealistic Temperature in Celsius: {unrealistic_temperature_count}")


Number of rows with unrealistic Temperature in Celsius: 0


In [12]:
df.show(10)

+-----+----------+------------+------------+-----------+----------+-----------+------------+-------------------+
|Store|      Date|Weekly_Sales|Holiday_Flag|Temperature|Fuel_Price|        CPI|Unemployment|Temperature_Celsius|
+-----+----------+------------+------------+-----------+----------+-----------+------------+-------------------+
|    1|05-02-2010|   1643690.9|           0|      42.31|     2.572|211.0963582|       8.106|  5.727777777777779|
|    1|12-02-2010|  1641957.44|           1|      38.51|     2.548|211.2421698|       8.106| 3.6166666666666654|
|    1|19-02-2010|  1611968.17|           0|      39.93|     2.514|211.2891429|       8.106|  4.405555555555555|
|    1|26-02-2010|  1409727.59|           0|      46.63|     2.561|211.3196429|       8.106|  8.127777777777778|
|    1|05-03-2010|  1554806.68|           0|       46.5|     2.625|211.3501429|       8.106|  8.055555555555555|
|    1|12-03-2010|  1439541.59|           0|      57.79|     2.667|211.3806429|       8.106| 14.

In [25]:

# Adjust the date format to match "DD-MM-YYYY"
clean_df = df.withColumn("Date", to_date(col("Date"), "dd-MM-yyyy"))
clean_df = clean_df.withColumn("Month", month(col("Date")))

# Perform aggregation by Store and Month
monthly_sales = clean_df.groupBy("Store", "Month").agg(
    sum("Weekly_Sales").alias("Total_Weekly_Sales"),
    avg("Temperature").alias("Average_Temperature")
)

# Show the result
monthly_sales.show()


+-----+-----+--------------------+-------------------+
|Store|Month|  Total_Weekly_Sales|Average_Temperature|
+-----+-----+--------------------+-------------------+
|   39|   11|       1.296493728E7|  62.58500000000001|
|   43|    7|          8797377.36|  83.90357142857144|
|   10|    2|2.4941794599999998E7|  54.14666666666667|
|   23|    6|       1.935221371E7|  64.19461538461539|
|   33|    8|  3290940.3099999996|  95.39846153846155|
|   41|    3|1.4893696289999997E7|  37.60230769230769|
|   16|    3|          6241637.49| 32.823076923076925|
|   17|    2|          9955253.06|  22.46083333333333|
|   17|    1|   6984197.720000001|           22.09375|
|   27|    9|        2.17501172E7|  69.44384615384615|
|   34|    5|1.1517461929999998E7|  63.33833333333333|
|   13|   10|       2.518715197E7|              55.25|
|   32|    8|1.5575600919999998E7|  73.90230769230769|
|    4|   10|       2.671301505E7|  62.51923076923078|
|    9|   10|          6876571.84|  66.90384615384616|
|   24|   

In [49]:
#SALES  ANALYSIS BY Season and Holidays

In [None]:
#WEEK AND YEAR
removed_null_df = removed_null_df.withColumn(wee
