![walmartecomm](walmartecomm.jpg)

Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas. 

In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the `grocery_sales` table in `PostgreSQL` database with the following features:

# `grocery_sales`
- `"index"` - unique ID of the row
- `"Store_ID"` - the store number
- `"Date"` - the week of sales
- `"Weekly_Sales"` - sales for the given store

Also, you have the `extra_data.parquet` file that contains complementary data:

# `extra_data.parquet`
- `"IsHoliday"` - Whether the week contains a public holiday - 1 if yes, 0 if no.
- `"Temperature"` - Temperature on the day of sale
- `"Fuel_Price"` - Cost of fuel in the region
- `"CPI"` – Prevailing consumer price index
- `"Unemployment"` - The prevailing unemployment rate
- `"MarkDown1"`, `"MarkDown2"`, `"MarkDown3"`, `"MarkDown4"` - number of promotional markdowns
- `"Dept"` - Department Number in each store
- `"Size"` - size of the store
- `"Type"` - type of the store (depends on `Size` column)

You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the `clean_data` variable containing the following columns:
- `"Store_ID"`
- `"Month"`
- `"Dept"`
- `"IsHoliday"`
- `"Weekly_Sales"`
- `"CPI"`
- "`"Unemployment"`"

After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the `agg_data` variable that should look like:

|  Month | Weekly_Sales  | 
|---|---|
| 1.0  |  33174.178494 |
|  2.0 |  34333.326579 |
|  ... | ...  |  

Finally, you should save the `clean_data` and `agg_data` as the csv files.

It is recommended to use `pandas` for this project. 

In [1]:
# import necessary libraries
import pandas as pd
import os

In [2]:
# Define an extract function    
def extract(store_sales, extra_data):
    """
    Extracts data from the store_sales and extra_data files.
    
    Parameters:
    store_sales (str): Path to the store sales CSV file.
    extra_data (str): Path to the extra data parquet file.
    
    Returns:
    pd.DataFrame: DataFrame containing the extracted data.
    """
    # Read the store sales data
    store_sales_df = pd.read_csv(store_sales)
    
    # Read the extra data
    extra_data_df = pd.read_parquet(extra_data)
    
    # Merge the two DataFrames on 'index'
    merged_df = store_sales_df.merge(extra_data_df, on='index')    
    return merged_df 

In [None]:
# Define the path to the store sales CSV file
store_sales = "/data/grocery_sales.csv"  # Update this path to your actual raw sales data file
extra_data = "/data/extra_data.parquet"

# Call the extract() function and store it as the "merged_df" variable
merged_df = extract(store_sales, extra_data)

print(merged_df.head())

FileNotFoundError: [Errno 2] No such file or directory: '/data/agg_data.csv'

In [181]:
# Check for missing values 
merged_df.isna().sum()

index            0
Store_ID         0
Date            39
Dept             0
Weekly_Sales    38
IsHoliday        0
Temperature      0
Fuel_Price       0
MarkDown1        0
MarkDown2        0
MarkDown3        0
MarkDown4        1
MarkDown5        1
CPI             47
Unemployment    37
Type             1
Size             1
dtype: int64

In [182]:
# Create the transform() function with one parameter: "raw_data"
def transform(raw_data): 
    # fill missing numerical values with mean
    raw_data.fillna(
        {
            'CPI': raw_data['CPI'].mean(), 
            'Weekly_Sales': raw_data['Weekly_Sales'].mean(), 
            'Unemployment': raw_data['Unemployment'].mean()
        }, inplace=True
    ) 

    # Convert Date column to date_time_type
    raw_data["Date"] = pd.to_datetime(raw_data["Date"], format = "%Y-%m-%d") 
    
    # Extract Month value from date
    raw_data['Month'] = raw_data['Date'].dt.month 

    # Filter rows where weekly_sales > 10,000
    raw_data = raw_data.loc[raw_data['Weekly_Sales'] > 10000, :] 

    # Filter for required columns 
    raw_data = raw_data.drop(["index", "Temperature", "Fuel_Price", "MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4", "MarkDown5", "Type", "Size", "Date"], axis=1)

    return raw_data


In [183]:
clean_data = transform(merged_df)
# clean_data.drop(clean_data.columns[0], axis=1, inplace=True)

print(clean_data.head())

   Store_ID  Dept  Weekly_Sales  IsHoliday         CPI  Unemployment  Month
0         1     1      24924.50          0  211.096358      8.106000    2.0
1         1    26      11737.12          0  211.096358      8.106000    2.0
2         1    17      13223.76          0  211.096358      8.106000    2.0
5         1    79      46729.77          0  211.096358      7.500052    2.0
6         1    55      21249.31          0  211.096358      7.500052    2.0


In [184]:
# Create the avg_weekly_sales_per_month function that takes in the cleaned data from the last step
def avg_weekly_sales_per_month(clean_data):
    holiday_sales = clean_data[['Month', 'Weekly_Sales']]

    holiday_sales = holiday_sales.groupby('Month').agg(Avg_Sales = ('Weekly_Sales', 'mean')).reset_index().round(2)
    
    return holiday_sales

In [185]:
# Call the avg_weekly_sales_per_month() function and pass the cleaned DataFrame
agg_data = avg_weekly_sales_per_month(clean_data)

print(agg_data.head())

   Month  Avg_Sales
0    1.0   33174.18
1    2.0   34333.33
2    3.0   33220.89
3    4.0   33392.37
4    5.0   33339.89


In [186]:
# Create the load() function that takes in the cleaned DataFrame and the aggregated one with the paths where they are going to be stored
def load(full_data, full_data_file_path, agg_data, agg_data_file_path):
    full_data.to_csv(full_data_file_path, index=False)
    agg_data.to_csv(agg_data_file_path, index=False) 

In [187]:
# Call the load() function and pass the cleaned and aggregated DataFrames with their paths    
load(clean_data, 'clean_data.csv', agg_data, 'agg_data.csv')

In [188]:
# Create the validation() function with one parameter: file_path - to check whether the previous function was correctly executed
def validation(file_path):
    file_exists = os.path.exists(file_path) 

    if not file_exists:
        raise Exception (f'There is no file at the path {file_path}')

In [189]:
# Call the validation() function and pass first, the cleaned DataFrame path, and then the aggregated DataFrame path
validation('clean_data.csv')
validation('agg_data.csv')