
# 🧠 Rossmann Store Sales Forecasting with Apache Spark

## 📍 Problem Description

Rossmann operates over 3,000 drug stores across 7 European countries. In this project, I take on the challenge of forecasting daily sales for 1,115 stores located in Germany, over a six-week period.

Sales are influenced by a wide range of factors, including:

- Promotions and marketing campaigns  
- Local competition  
- School and public holidays  
- Seasonality and events  
- Store-specific and geographic characteristics

## 🎯 Project Objective

As a solo data practitioner, my goal is to build a scalable and reliable forecasting model using **Apache Spark** within the **Databricks Community Edition** environment.

This notebook will serve as a complete, end-to-end pipeline — from data cleaning and exploration, through feature engineering and modeling, to performance evaluation — all executed with **PySpark**.

## 💡 Why This Matters

Accurate forecasts help Rossmann store managers to:

- Plan staff schedules more effectively  
- Boost operational productivity  
- Improve team morale  
- Deliver better customer service  

By building this predictive solution, I aim to support data-driven decision-making and demonstrate the practical use of big data tools in real-world retail operations.

---

This is a personal project designed to enhance my skills in **data engineering**, **machine learning**, and **big data analytics** using **Spark**.


# Imports

In [0]:
from pyspark.sql import functions as F
import pandas as pd
import os
import glob
from pyspark.sql.functions import lit

# Functions

In [0]:
def see_nulls(df):
    null_counts_row = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).first().asDict()
    non_zero_nulls = {k: v for k, v in null_counts_row.items() if v > 0}
    return non_zero_nulls

In [0]:
def read_with_state(spark_path):
    filename = os.path.basename(spark_path)
    state_code = filename.replace("Rossmann_", "").replace(".csv", "")
    
    if state_code == "DE":
        return spark.read.option("header", True).csv(spark_path).withColumn("State", F.lit("DE"))
    #elif state_code == "NI":
     #   return spark.read.option("header", True).csv(spark_path).withColumn("State", F.lit("HB,NI"))
    else:
        return spark.read.option("header", True).csv(spark_path).withColumn("State", F.lit(state_code))


In [0]:
def process_train_test(df):
    return df.fillna({"Open": 0}) \
        .withColumn("week_of_year", F.weekofyear(F.col("Date"))) \
        .withColumn("year", F.year(F.col("Date"))) \
        .withColumn("month", F.month(F.col("Date"))) \
        .withColumn("day", F.dayofmonth(F.col("Date")))

# Competition Data

In [0]:
df_train = spark.read.option("Header", True).option("InferSchema", True).csv("/FileStore/tables/train.csv")
display(df_train.head(10))

df_train.printSchema()


Store,DayOfWeek,Date,Sales,Customers,Open,Promo,StateHoliday,SchoolHoliday
1,5,2015-07-31,5263,555,1,1,0,1
2,5,2015-07-31,6064,625,1,1,0,1
3,5,2015-07-31,8314,821,1,1,0,1
4,5,2015-07-31,13995,1498,1,1,0,1
5,5,2015-07-31,4822,559,1,1,0,1
6,5,2015-07-31,5651,589,1,1,0,1
7,5,2015-07-31,15344,1414,1,1,0,1
8,5,2015-07-31,8492,833,1,1,0,1
9,5,2015-07-31,8565,687,1,1,0,1
10,5,2015-07-31,7185,681,1,1,0,1


root
 |-- Store: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Sales: integer (nullable = true)
 |-- Customers: integer (nullable = true)
 |-- Open: integer (nullable = true)
 |-- Promo: integer (nullable = true)
 |-- StateHoliday: string (nullable = true)
 |-- SchoolHoliday: integer (nullable = true)



In [0]:
df_store = spark.read.option("Header", True).option("InferSchema", True).csv("/FileStore/tables/store.csv")
display(df_store.head(10))

df_store.printSchema()

Store,StoreType,Assortment,CompetitionDistance,CompetitionOpenSinceMonth,CompetitionOpenSinceYear,Promo2,Promo2SinceWeek,Promo2SinceYear,PromoInterval
1,c,a,1270,9,2008,0,,,
2,a,a,570,11,2007,1,13.0,2010.0,"Jan,Apr,Jul,Oct"
3,a,a,14130,12,2006,1,14.0,2011.0,"Jan,Apr,Jul,Oct"
4,c,c,620,9,2009,0,,,
5,a,a,29910,4,2015,0,,,
6,a,a,310,12,2013,0,,,
7,a,c,24000,4,2013,0,,,
8,a,a,7520,10,2014,0,,,
9,a,c,2030,8,2000,0,,,
10,a,a,3160,9,2009,0,,,


root
 |-- Store: integer (nullable = true)
 |-- StoreType: string (nullable = true)
 |-- Assortment: string (nullable = true)
 |-- CompetitionDistance: integer (nullable = true)
 |-- CompetitionOpenSinceMonth: integer (nullable = true)
 |-- CompetitionOpenSinceYear: integer (nullable = true)
 |-- Promo2: integer (nullable = true)
 |-- Promo2SinceWeek: integer (nullable = true)
 |-- Promo2SinceYear: integer (nullable = true)
 |-- PromoInterval: string (nullable = true)



In [0]:
df_test = spark.read.option("Header", True).option("InferSchema", True).csv("/FileStore/tables/test.csv")
display(df_test.head(10))

df_test.printSchema()

Id,Store,DayOfWeek,Date,Open,Promo,StateHoliday,SchoolHoliday
1,1,4,2015-09-17,1,1,0,0
2,3,4,2015-09-17,1,1,0,0
3,7,4,2015-09-17,1,1,0,0
4,8,4,2015-09-17,1,1,0,0
5,9,4,2015-09-17,1,1,0,0
6,10,4,2015-09-17,1,1,0,0
7,11,4,2015-09-17,1,1,0,0
8,12,4,2015-09-17,1,1,0,0
9,13,4,2015-09-17,1,1,0,0
10,14,4,2015-09-17,1,1,0,0


root
 |-- Id: integer (nullable = true)
 |-- Store: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: integer (nullable = true)
 |-- Promo: integer (nullable = true)
 |-- StateHoliday: string (nullable = true)
 |-- SchoolHoliday: integer (nullable = true)



#External Data

## Store States

In [0]:
df_store_states = spark.read.option("Header", True).option("InferSchema", True).csv("/FileStore/tables/store_states.csv")
display(df_store_states.select(F.col('State')).distinct().orderBy(F.col('State')).limit(10))

df_store_states.printSchema()

display(see_nulls(df_store_states))

State
BE
BW
BY
"HB,NI"
HE
HH
NW
RP
SH
SN


root
 |-- Store: integer (nullable = true)
 |-- State: string (nullable = true)

{}

## Google Trends by State

In [0]:
file_list = dbutils.fs.ls("dbfs:/FileStore/tables/")

valid_files = [
    f.path for f in file_list 
    if f.name.startswith("Rossmann_DE_") and f.name.endswith(".csv") and "-1" not in f.name
]

df_google_trends = None

for f in valid_files:
    df_tmp = read_with_state(f)
    df_google_trends = df_tmp if df_google_trends is None else df_google_trends.union(df_tmp)


In [0]:
de_file_path = next(f.path for f in file_list if f.name.startswith("Rossmann_DE") and "-1" not in f.name)


df_de = spark.read.option("header", True).csv(de_file_path)


df_de = df_de \
    .withColumn("week_start_str", F.split("Week", " - ").getItem(0)) \
    .withColumn("week_end_str", F.split("Week", " - ").getItem(1)) \
    .withColumn("week_start", F.to_date("week_start_str", "yyyy-MM-dd")) \
    .withColumn("week_end", F.to_date("week_end_str", "yyyy-MM-dd"))


df_de = df_de.withColumn("Date", F.explode(F.sequence(F.col("week_start"), F.col("week_end"))))


df_de = df_de.withColumn("DE_trend", F.col('rossmann'))


df_de = df_de.select("Date", "DE_trend")

display(
    df_de.limit(1)
)


Date,DE_trend
2004-01-04,8


In [0]:

df_google_trends = df_google_trends.dropDuplicates(["Week", "State"])


df_google_trends = df_google_trends \
    .withColumn("week_start_str", F.split("Week", " - ").getItem(0)) \
    .withColumn("week_end_str", F.split("Week", " - ").getItem(1)) \
    .withColumn("week_start", F.to_date("week_start_str", "yyyy-MM-dd")) \
    .withColumn("week_end", F.to_date("week_end_str", "yyyy-MM-dd"))


df_google_trends = df_google_trends \
    .withColumn("Date", F.explode(F.sequence(F.col("week_start"), F.col("week_end"))))


df_google_trends = df_google_trends.drop("week_start_str", "week_end_str", "week_start", "week_end", "week")


df_google_trends = df_google_trends.withColumn(
    "State", F.substring_index("State", "_", -1)
)

df_google_trends = df_google_trends.join(df_de, on="Date", how = "left")

In [0]:
display(
    df_google_trends.filter( 
                            (F.col("rossmann") != 0) 
                            ).orderBy(F.col("Date")).limit(20)
)

Date,rossmann,State,DE_trend
2004-01-11,13,NW,10
2004-01-12,13,NW,10
2004-01-13,13,NW,10
2004-01-14,13,NW,10
2004-01-15,13,NW,10
2004-01-16,13,NW,10
2004-01-17,13,NW,10
2004-04-04,16,NW,10
2004-04-05,16,NW,10
2004-04-06,16,NW,10


# Data Exploration

## Store

In [0]:
display(df_store.limit(1))

Store,StoreType,Assortment,CompetitionDistance,CompetitionOpenSinceMonth,CompetitionOpenSinceYear,Promo2,Promo2SinceWeek,Promo2SinceYear,PromoInterval
1,c,a,1270,9,2008,0,,,


In [0]:
display(see_nulls(df_store))
df_store.printSchema()

{'CompetitionDistance': 3,
 'CompetitionOpenSinceMonth': 354,
 'CompetitionOpenSinceYear': 354,
 'Promo2SinceWeek': 544,
 'Promo2SinceYear': 544,
 'PromoInterval': 544}root
 |-- Store: integer (nullable = true)
 |-- StoreType: string (nullable = true)
 |-- Assortment: string (nullable = true)
 |-- CompetitionDistance: integer (nullable = true)
 |-- CompetitionOpenSinceMonth: integer (nullable = true)
 |-- CompetitionOpenSinceYear: integer (nullable = true)
 |-- Promo2: integer (nullable = true)
 |-- Promo2SinceWeek: integer (nullable = true)
 |-- Promo2SinceYear: integer (nullable = true)
 |-- PromoInterval: string (nullable = true)



In [0]:
display(df_store.describe())

summary,Store,StoreType,Assortment,CompetitionDistance,CompetitionOpenSinceMonth,CompetitionOpenSinceYear,Promo2,Promo2SinceWeek,Promo2SinceYear,PromoInterval
count,1115.0,1115,1115,1112.0,761.0,761.0,1115.0,571.0,571.0,571
mean,558.0,,,5404.901079136691,7.224704336399474,2008.6688567674116,0.5121076233183857,23.595446584938703,2011.7635726795097,
stddev,322.0170802923348,,,7663.174720367942,3.21234779661471,6.19598255932908,0.5000776843468315,14.141983542922194,1.6749350926066222,
min,1.0,a,a,20.0,1.0,1900.0,0.0,1.0,2009.0,"Feb,May,Aug,Nov"
max,1115.0,d,c,75860.0,12.0,2015.0,1.0,50.0,2015.0,"Mar,Jun,Sept,Dec"


In [0]:
display(
    df_store.groupBy('PromoInterval').agg(F.count("*").alias("count")).orderBy("count")
    )

PromoInterval,count
"Mar,Jun,Sept,Dec",106
"Feb,May,Aug,Nov",130
"Jan,Apr,Jul,Oct",335
,544


In [0]:
display(
    df_store.groupBy('StoreType').agg(F.count("*").alias("count")).orderBy("count")
    )

StoreType,count
b,17
c,148
d,348
a,602


## Train

In [0]:
display(see_nulls(df_train))

{}

In [0]:
display(df_train.describe())

summary,Store,DayOfWeek,Sales,Customers,Open,Promo,StateHoliday,SchoolHoliday
count,1017209.0,1017209.0,1017209.0,1017209.0,1017209.0,1017209.0,1017209,1017209.0
mean,558.4297268309659,3.998340557348588,5773.818972305593,633.1459464082602,0.8301066939045958,0.3815145166824124,0.0,0.1786466694651738
stddev,321.9086511434535,1.99739096494053,3849.926175234754,464.4117338866316,0.3755392246931954,0.4857586048774204,0.0,0.3830563681809261
min,1.0,1.0,0.0,0.0,0.0,0.0,0,0.0
max,1115.0,7.0,41551.0,7388.0,1.0,1.0,c,1.0


In [0]:
display(
    df_train.filter(
    (F.col("Open") == 0) &
    (F.col("Sales") == 0)
).count()
)

172817

In [0]:
display(
    df_train.filter(
    (F.col("Open") == 1) &
    (F.col("Sales") == 0)
).count()
)

54

## Test

In [0]:
display(
    df_test.limit(1)
)

Id,Store,DayOfWeek,Date,Open,Promo,StateHoliday,SchoolHoliday
1,1,4,2015-09-17,1,1,0,0


In [0]:
display(see_nulls(df_test))

{'Open': 11}

In [0]:
display(df_test.describe())

summary,Id,Store,DayOfWeek,Open,Promo,StateHoliday,SchoolHoliday
count,41088.0,41088.0,41088.0,41077.0,41088.0,41088,41088.0
mean,20544.5,555.8995327102804,3.9791666666666665,0.8543223701828274,0.3958333333333333,0.0,0.4434871495327103
stddev,11861.228266920758,320.2744959417983,2.015481291705622,0.3527870290635694,0.4890348925327371,0.0,0.4968020779460085
min,1.0,1.0,1.0,0.0,0.0,0,0.0
max,41088.0,1115.0,7.0,1.0,1.0,a,1.0


In [0]:
display(
    df_test.filter(F.col('Open').isNull())
)

Id,Store,DayOfWeek,Date,Open,Promo,StateHoliday,SchoolHoliday
480,622,4,2015-09-17,,1,0,0
1336,622,3,2015-09-16,,1,0,0
2192,622,2,2015-09-15,,1,0,0
3048,622,1,2015-09-14,,1,0,0
4760,622,6,2015-09-12,,0,0,0
5616,622,5,2015-09-11,,0,0,0
6472,622,4,2015-09-10,,0,0,0
7328,622,3,2015-09-09,,0,0,0
8184,622,2,2015-09-08,,0,0,0
9040,622,1,2015-09-07,,0,0,0


# Data Preprocessing

In [0]:
df_train = process_train_test(df_train)
display(
    df_train.limit(1)
)

Store,DayOfWeek,Date,Sales,Customers,Open,Promo,StateHoliday,SchoolHoliday,week_of_year,year,month,day
1,5,2015-07-31,5263,555,1,1,0,1,31,2015,7,31


In [0]:
df_test = process_train_test(df_test)
display(
    df_test.limit(1)
)

Id,Store,DayOfWeek,Date,Open,Promo,StateHoliday,SchoolHoliday,week_of_year,year,month,day
1,1,4,2015-09-17,1,1,0,0,38,2015,9,17


In [0]:
display(
    see_nulls(df_store)
)
df_store.printSchema()

{'CompetitionDistance': 3,
 'CompetitionOpenSinceMonth': 354,
 'CompetitionOpenSinceYear': 354,
 'Promo2SinceWeek': 544,
 'Promo2SinceYear': 544,
 'PromoInterval': 544}root
 |-- Store: integer (nullable = true)
 |-- StoreType: string (nullable = true)
 |-- Assortment: string (nullable = true)
 |-- CompetitionDistance: integer (nullable = true)
 |-- CompetitionOpenSinceMonth: integer (nullable = true)
 |-- CompetitionOpenSinceYear: integer (nullable = true)
 |-- Promo2: integer (nullable = true)
 |-- Promo2SinceWeek: integer (nullable = true)
 |-- Promo2SinceYear: integer (nullable = true)
 |-- PromoInterval: string (nullable = true)



In [0]:
df_store = df_store.fillna(0)
df_store = df_store.fillna({'PromoInterval': '0'})

In [0]:
display(
    see_nulls(df_store)
)
display(
    see_nulls(df_train)
)
display(
    see_nulls(df_test)
)

{}{}{}

# Code

In [0]:
df_train = df_train.withColumn(
    "SalePerCustomer", F.round(F.col("Sales")/F.col("Customers"), 2)
)
display(df_train.head(5))

df_train.select(F.col("SalePerCustomer")).describe().show()

Store,DayOfWeek,Date,Sales,Customers,Open,Promo,StateHoliday,SchoolHoliday,week_of_year,year,month,day,SalePerCustomer
1,5,2015-07-31,5263,555,1,1,0,1,31,2015,7,31,9.48
2,5,2015-07-31,6064,625,1,1,0,1,31,2015,7,31,9.7
3,5,2015-07-31,8314,821,1,1,0,1,31,2015,7,31,10.13
4,5,2015-07-31,13995,1498,1,1,0,1,31,2015,7,31,9.34
5,5,2015-07-31,4822,559,1,1,0,1,31,2015,7,31,8.63


+-------+-----------------+
|summary|  SalePerCustomer|
+-------+-----------------+
|  count|           844340|
|   mean|9.493638285524783|
| stddev|2.197496666048751|
|    min|              0.0|
|    max|            64.96|
+-------+-----------------+



In [0]:
df_train = df_train.filter(~(
    (F.col("Open") == 0) &
    (F.col("Sales") == 0)
)
)

In [0]:
df_train = df_train.filter((
    (F.col("Open") != 0) &
    (F.col("Sales") != 0)
)
)

In [0]:
display(df_train.describe())

summary,Store,DayOfWeek,Sales,Customers,Open,Promo,StateHoliday,SchoolHoliday,week_of_year,year,month,day,SalePerCustomer
count,844338.0,844338.0,844338.0,844338.0,844338.0,844338.0,844338,844338.0,844338.0,844338.0,844338.0,844338.0,844338.0
mean,558.4213739047633,3.5203496703926627,6955.959133664481,762.777166253325,1.0,0.4463556064040704,0.0,0.1935776904509805,23.64694589133736,2013.8319452636267,5.845773848861475,15.835705606048762,9.493660773292207
stddev,321.73086140626936,1.7237123646860368,3103.815515490556,401.19415256680463,0.0,0.4971142441308316,0.0,0.3951019527866048,14.38993090242554,0.7772713805425885,3.323959483467046,8.68339176080937,2.19745069213118
min,1.0,1.0,46.0,8.0,1.0,0.0,0,0.0,1.0,2013.0,1.0,1.0,2.75
max,1115.0,7.0,41551.0,7388.0,1.0,1.0,c,1.0,52.0,2015.0,12.0,31.0,64.96
