In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Visuelle Sales Forecasting") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()


In [9]:
sales_df = spark.read.option("header", "true").csv("C:/Documents/m5-sales-forecasting/data/raw/sales.csv", inferSchema=True)
sales_df.printSchema()
sales_df.show(5)

root
 |-- _c0: integer (nullable = true)
 |-- external_code: integer (nullable = true)
 |-- retail: integer (nullable = true)
 |-- season: string (nullable = true)
 |-- category: string (nullable = true)
 |-- color: string (nullable = true)
 |-- image_path: string (nullable = true)
 |-- fabric: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- restock: integer (nullable = true)
 |-- 0: double (nullable = true)
 |-- 1: double (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: double (nullable = true)
 |-- 4: double (nullable = true)
 |-- 5: double (nullable = true)
 |-- 6: double (nullable = true)
 |-- 7: double (nullable = true)
 |-- 8: double (nullable = true)
 |-- 9: double (nullable = true)
 |-- 10: double (nullable = true)
 |-- 11: double (nullable = true)

+---+-------------+------+------+-----------+------+--------------+-----------+------------+-------+---+---+---+---+---+---+---+---+---+---+---+---+
|_c0|external_code|retail|season|   category| col

In [10]:
from pyspark.sql import functions as F

# Melt the sales columns into a long format
sales_long_df = sales_df.selectExpr("external_code", "retail", "season", "category", "color", "fabric", "release_date", "restock", 
                                   "stack(12, '0', `0`, '1', `1`, '2', `2`, '3', `3`, '4', `4`, '5', `5`, '6', `6`, '7', `7`, '8', `8`, '9', `9`, '10', `10`, '11', `11`) as (month, sales)")
sales_long_df.show(5)


+-------------+------+------+-----------+-----+-------+------------+-------+-----+-----+
|external_code|retail|season|   category|color| fabric|release_date|restock|month|sales|
+-------------+------+------+-----------+-----+-------+------------+-------+-----+-----+
|            5|    36|  SS17|long sleeve| grey|acrylic|  2016-11-28|     22|    0|  1.0|
|            5|    36|  SS17|long sleeve| grey|acrylic|  2016-11-28|     22|    1|  3.0|
|            5|    36|  SS17|long sleeve| grey|acrylic|  2016-11-28|     22|    2|  1.0|
|            5|    36|  SS17|long sleeve| grey|acrylic|  2016-11-28|     22|    3|  1.0|
|            5|    36|  SS17|long sleeve| grey|acrylic|  2016-11-28|     22|    4|  2.0|
+-------------+------+------+-----------+-----+-------+------------+-------+-----+-----+
only showing top 5 rows



Here we are merging multiple datasets to include sesonality, trends etc

In [11]:
price_discount_df = spark.read.option("header", "true").csv("C:/Documents/m5-sales-forecasting/data/raw/price_discount_series.csv", inferSchema=True)
weather_df = spark.read.option("header", "true").csv("C:/Documents/m5-sales-forecasting/data/raw/vis2_weather_data.csv", inferSchema=True)


In [30]:
weather_df.printSchema()






root
 |-- locality: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- avg temp °C: double (nullable = true)
 |-- min temp °C: double (nullable = true)
 |-- max temp °C: double (nullable = true)
 |-- dew point °C: double (nullable = true)
 |-- humidity %: double (nullable = true)
 |-- visibility km: double (nullable = true)
 |-- avg wind km/h: double (nullable = true)
 |-- max wind km/h: double (nullable = true)
 |-- gust km/h: double (nullable = true)
 |-- slm pressure mb: double (nullable = true)
 |-- avg pressure mb: double (nullable = true)
 |-- rain mm: double (nullable = true)
 |-- month: string (nullable = true)



In [15]:
# Rename sales columns in sales_df first
sales_df = sales_df.withColumnRenamed("_c0", "id")  # Rename '_c0' to 'id'

# Rename the sales columns from '0', '1', '2', ... to 'jan_sales', 'feb_sales', 'mar_sales', etc.
sales_df = sales_df.withColumnRenamed("0", "jan_sales") \
                   .withColumnRenamed("1", "feb_sales") \
                   .withColumnRenamed("2", "mar_sales") \
                   .withColumnRenamed("3", "apr_sales") \
                   .withColumnRenamed("4", "may_sales") \
                   .withColumnRenamed("5", "jun_sales") \
                   .withColumnRenamed("6", "jul_sales") \
                   .withColumnRenamed("7", "aug_sales") \
                   .withColumnRenamed("8", "sep_sales") \
                   .withColumnRenamed("9", "oct_sales") \
                   .withColumnRenamed("10", "nov_sales") \
                   .withColumnRenamed("11", "dec_sales")

# Now merge with other DataFrames, e.g., price_discount_df
sales_price_df = sales_df.join(price_discount_df, on="external_code", how="inner")

# Check the result of the merge
sales_price_df.show(5)


+-------------+---+------+------+-----------+-----+--------------+-------+------------+-------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+------+---+---+---+---+---+---+---+---+---+---+---+---+------------------+
|external_code| id|retail|season|   category|color|    image_path| fabric|release_date|restock|jan_sales|feb_sales|mar_sales|apr_sales|may_sales|jun_sales|jul_sales|aug_sales|sep_sales|oct_sales|nov_sales|dec_sales|retail|  0|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11|             price|
+-------------+---+------+------+-----------+-----+--------------+-------+------------+-------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+------+---+---+---+---+---+---+---+---+---+---+---+---+------------------+
|            5|  0|    36|  SS17|long sleeve| grey|PE17/00005.png|acrylic|  2016-11-28|     22|      1.0|      3.0|      1.0|     

Merge Sales and Price discounts

In [21]:
# Merge sales with price discount data based on external_code, retail, and the date
sales_price_df = sales_df.join(price_discount_df, on=["external_code", "retail"], how="left")

# Show the merged data
print(sales_price_df.columns)

['external_code', 'retail', 'id', 'season', 'category', 'color', 'image_path', 'fabric', 'release_date', 'restock', 'jan_sales', 'feb_sales', 'mar_sales', 'apr_sales', 'may_sales', 'jun_sales', 'jul_sales', 'aug_sales', 'sep_sales', 'oct_sales', 'nov_sales', 'dec_sales', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', 'price']


In [38]:
# Drop columns '0' to '11' manually
sales_price_df = sales_price_df.drop("0")
sales_price_df = sales_price_df.drop("1")
sales_price_df = sales_price_df.drop("2")
sales_price_df = sales_price_df.drop("3")
sales_price_df = sales_price_df.drop("4")
sales_price_df = sales_price_df.drop("5")
sales_price_df = sales_price_df.drop("6")
sales_price_df = sales_price_df.drop("7")
sales_price_df = sales_price_df.drop("8")
sales_price_df = sales_price_df.drop("9")
sales_price_df = sales_price_df.drop("10")
sales_price_df = sales_price_df.drop("11")

# Check final column list
print(sales_price_df.columns)




['external_code', 'retail', 'id', 'season', 'category', 'color', 'image_path', 'fabric', 'release_date', 'restock', 'jan_sales', 'feb_sales', 'mar_sales', 'apr_sales', 'may_sales', 'jun_sales', 'jul_sales', 'aug_sales', 'sep_sales', 'oct_sales', 'nov_sales', 'dec_sales']


In [42]:
sales_price_df = sales_price_df.join(price_discount_df.select("external_code", "price"), on="external_code", how="left")
print(sales_price_df.columns)

['external_code', 'retail', 'id', 'season', 'category', 'color', 'image_path', 'fabric', 'release_date', 'restock', 'jan_sales', 'feb_sales', 'mar_sales', 'apr_sales', 'may_sales', 'jun_sales', 'jul_sales', 'aug_sales', 'sep_sales', 'oct_sales', 'nov_sales', 'dec_sales', 'price']


In [43]:
sales_price_df.show(5)

+-------------+------+---+------+-----------+-----+--------------+-------+------------+-------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+------------------+
|external_code|retail| id|season|   category|color|    image_path| fabric|release_date|restock|jan_sales|feb_sales|mar_sales|apr_sales|may_sales|jun_sales|jul_sales|aug_sales|sep_sales|oct_sales|nov_sales|dec_sales|             price|
+-------------+------+---+------+-----------+-----+--------------+-------+------------+-------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+------------------+
|            5|    36|  0|  SS17|long sleeve| grey|PE17/00005.png|acrylic|  2016-11-28|     22|      1.0|      3.0|      1.0|      1.0|      2.0|      1.0|      0.0|      0.0|      2.0|      0.0|      0.0|      0.0|0.0549442093112735|
|            5|    36|  0|  SS17|long sleeve| grey|PE17/0000

Store your data as parquet format

In [44]:
# Save as Parquet to "processed_data" folder
sales_price_df.write.mode("overwrite").parquet("C:/Documents/m5-sales-forecasting/data/processed")
