![walmartecomm](walmartecomm.jpg)

Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas. 

In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the `grocery_sales` table in `PostgreSQL` database with the following features:

# `grocery_sales`
- `"index"` - unique ID of the row
- `"Store_ID"` - the store number
- `"Date"` - the week of sales
- `"Weekly_Sales"` - sales for the given store

Also, you have the `extra_data.parquet` file that contains complementary data:

# `extra_data.parquet`
- `"IsHoliday"` - Whether the week contains a public holiday - 1 if yes, 0 if no.
- `"Temperature"` - Temperature on the day of sale
- `"Fuel_Price"` - Cost of fuel in the region
- `"CPI"` – Prevailing consumer price index
- `"Unemployment"` - The prevailing unemployment rate
- `"MarkDown1"`, `"MarkDown2"`, `"MarkDown3"`, `"MarkDown4"` - number of promotional markdowns
- `"Dept"` - Department Number in each store
- `"Size"` - size of the store
- `"Type"` - type of the store (depends on `Size` column)

You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the `clean_data` variable containing the following columns:
- `"Store_ID"`
- `"Month"`
- `"Dept"`
- `"IsHoliday"`
- `"Weekly_Sales"`
- `"CPI"`
- "`"Unemployment"`"

After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the `agg_data` variable that should look like:

|  Month | Weekly_Sales  | 
|---|---|
| 1.0  |  33174.178494 |
|  2.0 |  34333.326579 |
|  ... | ...  |  

Finally, you should save the `clean_data` and `agg_data` as the csv files.

It is recommended to use `pandas` for this project. 

Project:

Build a data pipeline using custom functions to extract, transform, aggregate, and load e-commerce data. The SQL query for grocery_sales and the extract() function have already been implemented for you.

To start the project, run the first two cells, then proceed with the following steps:

1. Implement a function named transform() with one argument, taking merged_df as input, filling missing numerical values (using any method of your choice), adding a column "Month", keeping the rows where the weekly sales are over $10,000 and drops the unnecessary columns. Ultimately, it should return a DataFrame and be stored as the clean_data variable.

2. Implement the function avg_weekly_sales_per_month with one argument (the cleaned data). This function will calculate the average monthly sales. For implementing this function you must select the "Month" and "Weekly_Sales" columns as they are the only ones needed for this analysis, then create a chain operation with groupby(), agg(), reset_index(), and round() functions, then group by the "Month" column and calculate the average monthly sales, then call reset_index() to start a new index order and finally round the results to two decimal places.

3. Create a function called load() that takes the cleaned and aggregated DataFrames, and their paths, and saves them as clean_data.csv and agg_data.csv respectively, without an index.

4. Lastly, define a validation() function that checks whether the two csv files from the load() exist in the current working directory.


In [None]:
import pandas as pd
import os

# Extract function
def extract():
    merged_df = pd.read_csv("merged_df.csv")
    return merged_df

# Call the extract() function and store it as the "merged_df" variable
merged_df = extract()

In [178]:
# Overview of the merged_df content
print(merged_df.head())
print(merged_df.info())

   index  Store_ID       Date  Dept  ...         CPI  Unemployment  Type      Size
0      0         1 2010-02-05     1  ...  211.096358         8.106   3.0  151315.0
1      1         1 2010-02-05    26  ...  211.096358         8.106   3.0  151315.0
2      2         1 2010-02-05    17  ...  211.096358         8.106   3.0  151315.0
3      3         1 2010-02-05    45  ...  211.096358           NaN   3.0  151315.0
4      4         1 2010-02-05    28  ...  211.096358           NaN   3.0  151315.0

[5 rows x 17 columns]
<class 'pandas.core.frame.DataFrame'>
Int64Index: 231522 entries, 0 to 231521
Data columns (total 17 columns):
 #   Column        Non-Null Count   Dtype         
---  ------        --------------   -----         
 0   index         231522 non-null  int64         
 1   Store_ID      231522 non-null  int64         
 2   Date          231483 non-null  datetime64[ns]
 3   Dept          231522 non-null  int64         
 4   Weekly_Sales  231484 non-null  float64       
 5   IsHoli

In [182]:
# Create the transform() 
def transform(raw_data):
    
    # Drops rows with Weekly_Sales less equal than 10000
    raw_data = raw_data.dropna(subset='Date').loc[raw_data['Weekly_Sales'] > 10000, :]
    
    na_values_dict = {  # Dict with values to replace into the df
        'Weekly_Sales': raw_data['Weekly_Sales'].mean(),
        'CPI': raw_data['CPI'].mean(),
        'Unemployment': raw_data['Unemployment'].mean(),
    }
    
    # Fill NA fields with the values specified above
    raw_data = raw_data.fillna(value=na_values_dict)
    
    # Creation of the new Month column
    raw_data['Month'] = raw_data['Date'].dt.month
    
    return raw_data.loc[:, ['Weekly_Sales', 'Month', 'CPI', 'Unemployment']]

In [183]:
# Call the transform() function and pass the merged DataFrame
clean_data = transform(merged_df)

In [184]:
clean_data.head()

Unnamed: 0,Weekly_Sales,Month
0,24924.5,2
1,11737.12,2
2,13223.76,2
5,46729.77,2
6,21249.31,2


In [185]:
# Create the avg_weekly_sales_per_month function that takes in the cleaned data from the last step
def avg_weekly_sales_per_month(clean_data):
    # Finding the average weekly sales per month using aggregate functions
    result_data = clean_data.groupby('Month').agg(avg_weekly_sales_per_month=('Weekly_Sales', 'mean'))
    
    # Reindexing the rows and round the average weekly sales per month to two decimal places
    return result_data.reset_index()['avg_weekly_sales_per_month'].round(2)

In [186]:
# Call the avg_weekly_sales_per_month() function and pass the cleaned DataFrame
agg_data = avg_weekly_sales_per_month(clean_data)

In [187]:
# Create the load() function that takes in the cleaned DataFrame and the aggregated one with the paths where they are going 
# to be stored
def load(full_data, full_data_file_path, agg_data, agg_data_file_path):
    full_data.to_csv(full_data_file_path)
    agg_data.to_csv(agg_data_file_path)

In [188]:
# Call the load() function and pass the cleaned and aggregated DataFrames with their paths
load(clean_data, 'clean_data.csv', agg_data, 'agg_data.csv')

In [189]:
# Create the validation() function with one parameter: file_path - to check whether the previous function was correctly executed
import os

def validation(file_path):
    # Write your code here
    assert os.path.exists(file_path + 'clean_data.csv') or os.path.exists(file_path + 'agg_data.csv')

In [190]:
# Call the validation() function and pass first, the cleaned DataFrame path, and then the aggregated DataFrame path
validation('')