![walmartecomm](walmartecomm.jpg)

Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas. 

In this project, a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. There are two data sources: grocery sales and complementary data. You have been provided with the `grocery_sales` table with the following features:

# `grocery_sales`
- `"index"` - unique ID of the row
- `"Store_ID"` - the store number
- `"Date"` - the week of sales
- `"Weekly_Sales"` - sales for the given store

Also, the `extra_data.parquet` file that contains complementary data:

# `extra_data.parquet`
- `"IsHoliday"` - Whether the week contains a public holiday - 1 if yes, 0 if no.
- `"Temperature"` - Temperature on the day of sale
- `"Fuel_Price"` - Cost of fuel in the region
- `"CPI"` – Prevailing consumer price index
- `"Unemployment"` - The prevailing unemployment rate
- `"MarkDown1"`, `"MarkDown2"`, `"MarkDown3"`, `"MarkDown4"` - number of promotional markdowns
- `"Dept"` - Department Number in each store
- `"Size"` - size of the store
- `"Type"` - type of the store (depends on `Size` column)

The two files need to be merged and manipulated. The transformed DataFrame can then be stored as the `clean_data` variable containing the following columns:
- `"Store_ID"`
- `"Month"`
- `"Dept"`
- `"IsHoliday"`
- `"Weekly_Sales"`
- `"CPI"`
- "`"Unemployment"`"

After merging and cleaning the data, the monthly sales of Walmart are stored  as the `agg_data` variable that look like:

|  Month | Weekly_Sales  | 
|---|---|
| 01  |  33174.178494 |
|  02 |  34333.326579 |
|  ... | ...  |  

Finally, the `clean_data` and `agg_data` files are stored as csv files.

In [8]:
# load libraries
import pandas as pd
import os

In [9]:
# functions to extract data from sources

def extract_parq(parquet_file_path, col):
    parquet_df = pd.read_parquet(parquet_file_path, engine='fastparquet')
    return parquet_df[col]

def extract_csv(csv_file_path):
    csv_df = pd.read_csv(csv_file_path)
    return csv_df

In [10]:
# cleaning data into desired format

def transform(merged_df):
    # Fill in the NAs in Date, CPI, Unemployment with the last non-null values
    merged_df['Date'] = merged_df['Date'].fillna(method = 'ffill')
    merged_df['CPI'] = merged_df['CPI'].fillna(method = 'ffill')
    merged_df['Unemployment'] = merged_df['Unemployment'].fillna(method = 'ffill')

    # drop any rows with no recording of weekly sales
    merged_df.dropna(subset='Weekly_Sales', inplace=True)

    # get the month form the dates
    merged_df['Date'] = pd.to_datetime(merged_df['Date'], format='%Y-%m-%d')
    merged_df['Month'] = merged_df['Date'].dt.strftime('%m')

    # keep the rows where the weekly sales are over $10,000
    merged_df = merged_df.loc[merged_df["Weekly_Sales"] > 10000, :]

    return merged_df[['Store_ID','Month','Dept','IsHoliday','Weekly_Sales','CPI','Unemployment']]

In [11]:
# get the average weekly sales by month
def avg_monthly_sales(clean_data):
    # group the weekly sales by month
    groups = clean_data.groupby(by=['Month'])['Weekly_Sales'].sum()
    # create a dataframe for the averge weekly sales by month
    agg_data = pd.DataFrame(groups)
    # rename the column names
    agg_data = agg_data.reset_index()
    agg_data = agg_data.rename(columns={'Weekly_Sales':'Avg_Sales'})

    return agg_data

In [12]:
# save the data to csv files
def load_csv(clean_data, csv_path):
    clean_data.to_csv(csv_path, index=False)

In [13]:
# to validate the existence of clean_data and agg_data file
def validation(file_path):
    if os.path.isfile(file_path):
        print('CSV file exist in the current directory')
    else:
        print("The CSV file do not exist in the current working directory")

In [14]:
import logging

logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)

try:
    # Extract, Transform, Aggregate and Load Data
    grocery_sales_df = extract_csv('data/grocery_sales.csv')
    extra_data_df = extract_parq('data/extra_data.parquet',  ['index','IsHoliday', 'CPI','Unemployment'])

    # merge two tables together
    merged_df = grocery_sales_df.merge(extra_data_df, left_on='index', right_on='index')

    # transform the data to get clean data
    clean_data = transform(merged_df)

    # gather average weekly sales by month
    agg_data = avg_monthly_sales(clean_data)
    
    # give the file path for loading
    clean_data_path = 'data/clean_data.csv'
    agg_data_path = 'data/agg_data.csv'

    # loading the cleaned data and aggregated data
    load_csv(clean_data,clean_data_path)
    load_csv(agg_data,agg_data_path)

    # validate the existence of clean_data and agg_data file
    validation(clean_data_path)
    validation(agg_data_path)

    logging.info("Successfully extracted, transformed and loaded data.") # Log success message

# Handle exceptions, log message
except Exception as e:
    logging.error(f"Pipeline failed with error: {e}")
    

INFO:Successfully extracted, transformed and loaded data.


CSV file exist in the current directory
CSV file exist in the current directory
