Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas. 

In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the `grocery_sales` table in `PostgreSQL` database with the following features:

# `grocery_sales`
- `"index"` - unique ID of the row
- `"Store_ID"` - the store number
- `"Date"` - the week of sales
- `"Weekly_Sales"` - sales for the given store

Also, you have the `extra_data.parquet` file that contains complementary data:

# `extra_data.parquet`
- `"IsHoliday"` - Whether the week contains a public holiday - 1 if yes, 0 if no.
- `"Temperature"` - Temperature on the day of sale
- `"Fuel_Price"` - Cost of fuel in the region
- `"CPI"` – Prevailing consumer price index
- `"Unemployment"` - The prevailing unemployment rate
- `"MarkDown1"`, `"MarkDown2"`, `"MarkDown3"`, `"MarkDown4"` - number of promotional markdowns
- `"Dept"` - Department Number in each store
- `"Size"` - size of the store
- `"Type"` - type of the store (depends on `Size` column)

You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the `clean_data` variable containing the following columns:
- `"Store_ID"`
- `"Month"`
- `"Dept"`
- `"IsHoliday"`
- `"Weekly_Sales"`
- `"CPI"`
- "`"Unemployment"`"

After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the `agg_data` variable that should look like:

|  Month | Weekly_Sales  | 
|---|---|
| 1.0  |  33174.178494 |
|  2.0 |  34333.326579 |
|  ... | ...  |  

Finally, you should save the `clean_data` and `agg_data` as the csv files.

It is recommended to use `pandas` for this project. 

In [14]:
pip install pyarrow fastparquet

Note: you may need to restart the kernel to use updated packages.


In [15]:
import pyarrow
import fastparquet
import pandas as pd
import os

In [16]:
#J'ai téléchargé grocery_sales en format csv depuis la table PostgreSQL
grocery_sales = pd.read_csv("grocery_sales.csv")

In [17]:
# Fonction d'extraction
def extract(store_data, extra_data):
    extra_df = pd.read_parquet(extra_data)
    merged_df = store_data.merge(extra_df, on = "index")
    return merged_df

In [18]:
#J'appelle la fonction extract() qui fusionne les données extraites avec celles qu'on a déjà pour stocker dans la variable "merged_df"
merged_df = extract(grocery_sales, "extra_data.parquet")

In [19]:
merged_df

Unnamed: 0.1,Unnamed: 0,index,Store_ID,Date,Dept,Weekly_Sales,IsHoliday,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,Type,Size
0,0,0,1,2010-02-05,1,24924.50,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
1,1,1,1,2010-02-05,26,11737.12,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
2,2,2,1,2010-02-05,17,13223.76,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
3,3,3,1,2010-02-05,45,37.44,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
4,4,4,1,2010-02-05,28,1085.29,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
231517,231517,232414,24,2011-05-06,8,49471.07,0,55.75,4.192,0.0,0.0,0.0,0.0,0.0,134.514367,8.212,3.0,203819.0
231518,231518,232415,24,2011-05-06,50,1210.00,0,55.75,4.192,0.0,0.0,0.0,0.0,0.0,134.514367,8.212,3.0,203819.0
231519,231519,232416,24,2011-05-06,87,25893.32,0,55.75,4.192,0.0,0.0,0.0,0.0,0.0,134.514367,8.212,3.0,203819.0
231520,231520,232417,24,2011-05-06,85,1357.83,0,55.75,4.192,0.0,0.0,0.0,0.0,0.0,134.514367,8.212,3.0,203819.0


In [20]:
print(merged_df.columns.tolist())

['Unnamed: 0', 'index', 'Store_ID', 'Date', 'Dept', 'Weekly_Sales', 'IsHoliday', 'Temperature', 'Fuel_Price', 'MarkDown1', 'MarkDown2', 'MarkDown3', 'MarkDown4', 'MarkDown5', 'CPI', 'Unemployment', 'Type', 'Size']


In [21]:
print(merged_df.dtypes)

Unnamed: 0        int64
index             int64
Store_ID          int64
Date             object
Dept              int64
Weekly_Sales    float64
IsHoliday         int64
Temperature     float64
Fuel_Price      float64
MarkDown1       float64
MarkDown2       float64
MarkDown3       float64
MarkDown4       float64
MarkDown5       float64
CPI             float64
Unemployment    float64
Type            float64
Size            float64
dtype: object


In [24]:
# Une fonction transform() avec un seul paramètre : raw_data
def transform(raw_data):
    raw_data.fillna(raw_data.select_dtypes(include=["number"]).mean(), inplace=True)
    raw_data["Date"] = pd.to_datetime(raw_data["Date"], errors="coerce")
    raw_data['Month'] = raw_data['Date'].dt.month
    raw_data = raw_data.loc[raw_data['Weekly_Sales'] > 10000]
    raw_data = raw_data.drop(["index",'Temperature','Date', 'Fuel_Price', 'MarkDown1', 'MarkDown2', 'MarkDown3', 'MarkDown4', 'MarkDown5','Type', 'Size'], axis=1)
    return raw_data

In [25]:
#On appelle transform() sur la variable "merged_df"
clean_data = transform(merged_df)
print(clean_data)

        Unnamed: 0  Store_ID  Dept  Weekly_Sales  IsHoliday         CPI  \
0                0         1     1      24924.50          0  211.096358   
1                1         1    26      11737.12          0  211.096358   
2                2         1    17      13223.76          0  211.096358   
5                5         1    79      46729.77          0  211.096358   
6                6         1    55      21249.31          0  211.096358   
...            ...       ...   ...           ...        ...         ...   
231513      231513        24    40      45396.26          0  134.514367   
231515      231515        24    93      41295.84          0  134.514367   
231516      231516        24     9      24024.18          0  134.514367   
231517      231517        24     8      49471.07          0  134.514367   
231519      231519        24    87      25893.32          0  134.514367   

        Unemployment  Month  
0           8.106000    2.0  
1           8.106000    2.0  
2        

In [26]:
print(clean_data.columns.tolist())

['Unnamed: 0', 'Store_ID', 'Dept', 'Weekly_Sales', 'IsHoliday', 'CPI', 'Unemployment', 'Month']


In [27]:
# Fonction avg_weekly_sales_per_month qui prend en paramètre le dataset nettoyé
def avg_weekly_sales_per_month(clean_data):
    data = clean_data[['Month','Weekly_Sales']]
    data = data.groupby('Month').agg(Avg_Sales = ("Weekly_Sales",'mean')).reset_index().round(2)
    return data

In [28]:
#Exécution de avg_weekly_sales_per_month(clean_data)
agg_data = avg_weekly_sales_per_month(clean_data)
print(agg_data)

    Month  Avg_Sales
0     1.0   33174.18
1     2.0   34333.33
2     3.0   33220.89
3     4.0   33392.37
4     5.0   33339.89
5     6.0   34582.47
6     7.0   33922.76
7     8.0   33644.79
8     9.0   33258.05
9    10.0   32736.99
10   11.0   36594.03
11   12.0   39238.80


In [29]:
# Fonction load qui prend en paramètres clean_data , agg_data et leurs chemins
def load(full_data,full_data_file_path, agg_data, agg_data_file_path):
    full_data.to_csv(full_data_file_path , index=False)
    agg_data.to_csv(agg_data_file_path, index=False)
    pass

In [30]:
#Appel de load() pour créer les fichiers csv clean_data et agg_data 
load(clean_data,"clean_data.csv",agg_data,"agg_data.csv")

In [31]:
# Si le fichier existe validation() retourne "The file exists" sinon "The file does not exist"
from pathlib import Path

def validation(file_path):
    try:
        my_file = Path(file_path)
        with open(my_file, 'r') as file:
            print("The file exists.")
    except FileNotFoundError:
        print("The file does not exist.")

In [32]:
# On vérifie l'existence de clean_data.csv
validation("clean_data.csv")

The file exists.


In [33]:
# On vérifie l'existence de agg_data.csv
validation("agg_data.csv")

The file exists.
