![walmartecomm](walmartecomm.jpg)

Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas. 

In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the `grocery_sales` table in `PostgreSQL` database with the following features:

# `grocery_sales`
- `"index"` - unique ID of the row
- `"Store_ID"` - the store number
- `"Date"` - the week of sales
- `"Weekly_Sales"` - sales for the given store

Also, you have the `extra_data.parquet` file that contains complementary data:

# `extra_data.parquet`
- `"IsHoliday"` - Whether the week contains a public holiday - 1 if yes, 0 if no.
- `"Temperature"` - Temperature on the day of sale
- `"Fuel_Price"` - Cost of fuel in the region
- `"CPI"` – Prevailing consumer price index
- `"Unemployment"` - The prevailing unemployment rate
- `"MarkDown1"`, `"MarkDown2"`, `"MarkDown3"`, `"MarkDown4"` - number of promotional markdowns
- `"Dept"` - Department Number in each store
- `"Size"` - size of the store
- `"Type"` - type of the store (depends on `Size` column)

You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the `clean_data` variable containing the following columns:
- `"Store_ID"`
- `"Month"`
- `"Dept"`
- `"IsHoliday"`
- `"Weekly_Sales"`
- `"CPI"`
- "`"Unemployment"`"

After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the `agg_data` variable that should look like:

|  Month | Weekly_Sales  | 
|---|---|
| 1.0  |  33174.178494 |
|  2.0 |  34333.326579 |
|  ... | ...  |  

Finally, you should save the `clean_data` and `agg_data` as the csv files.

It is recommended to use `pandas` for this project. 

In [27]:
# # Install necessary packages
!pip install pyarrow




[notice] A new release of pip is available: 24.0 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [28]:
import pandas as pd
import os

# Extract function is already implemented for you 
def extract(store_data, extra_data):
    try:
        store_data = pd.read_csv(store_data)
        extra_df = pd.read_parquet(extra_data)
        merged_df = store_data.merge(extra_df, on = "index")
        print("Merge complete.")
        return merged_df
    except Exception as e:
        print(e)

# Call the extract() function and store it as the "merged_df" variable
# Replaced the data from the database with a csv file for demonstration purposes
merged_df = extract("sample_grocery_sales.csv", "extra_data.parquet")

Merge complete.


In [29]:
# Looking at the data
print(merged_df.head)

<bound method NDFrame.head of          index  Store_ID        Date  Dept  Weekly_Sales  IsHoliday  \
0            0         1  2010-02-05     1      24924.50          0   
1            1         1  2010-02-05    26      11737.12          0   
2            2         1  2010-02-05    17      13223.76          0   
3            3         1  2010-02-05    45         37.44          0   
4            4         1  2010-02-05    28       1085.29          0   
...        ...       ...         ...   ...           ...        ...   
231517  232414        24  2011-05-06     8      49471.07          0   
231518  232415        24  2011-05-06    50       1210.00          0   
231519  232416        24  2011-05-06    87      25893.32          0   
231520  232417        24  2011-05-06    85       1357.83          0   
231521  232418        24  2011-05-06    35       3648.91          0   

        Temperature  Fuel_Price  MarkDown1  MarkDown2  MarkDown3  MarkDown4  \
0             42.31       2.572       

In [30]:
# Create the transform() function with one parameter: "raw_data"
def transform(raw_data):
    try:
        # Fill missing cells with their column mean values
        # raw_data[['Weekly_Sales', 'CPI', 'Unemployment']].fillna(0, inplace=True)
        raw_data.fillna(
            {
                'Weekly_Sales': raw_data['Weekly_Sales'].mean(),
                'CPI': raw_data['CPI'].mean(),
                'Unemployment': raw_data['Unemployment'].mean(),
            },
            inplace=True
        )

        # Convert Date to datetime format
        raw_data['Date'] = pd.to_datetime(raw_data['Date'], format='%Y-%m-%d')
        # Create Month column
        raw_data["Month"] = raw_data['Date'].dt.month

        # Drop rows with weekly sales $10,000 or less
        raw_data = raw_data.loc[raw_data['Weekly_Sales'] > 10000, :]
        
        # Filtering unnecessary columns
        raw_data = raw_data[["Store_ID", "Month", "Dept", 
                 "IsHoliday", "Weekly_Sales", "CPI", "Unemployment"]]

        print(raw_data.head)
        return raw_data
        
    except Exception as e:
        print(e)

In [31]:
# Call the transform() function and pass the merged DataFrame
clean_data = transform(merged_df)

<bound method NDFrame.head of         Store_ID  Month  Dept  IsHoliday  Weekly_Sales         CPI  \
0              1    2.0     1          0      24924.50  211.096358   
1              1    2.0    26          0      11737.12  211.096358   
2              1    2.0    17          0      13223.76  211.096358   
5              1    2.0    79          0      46729.77  211.096358   
6              1    2.0    55          0      21249.31  211.096358   
...          ...    ...   ...        ...           ...         ...   
231513        24    5.0    40          0      45396.26  134.514367   
231515        24    5.0    93          0      41295.84  134.514367   
231516        24    5.0     9          0      24024.18  134.514367   
231517        24    5.0     8          0      49471.07  134.514367   
231519        24    5.0    87          0      25893.32  134.514367   

        Unemployment  
0           8.106000  
1           8.106000  
2           8.106000  
5           7.500052  
6           7.

In [32]:
# Create the avg_weekly_sales_per_month function that takes in the cleaned data from the last step
def avg_weekly_sales_per_month(clean_data):
    try:
        # Slicing columns
        clean_data = clean_data[["Month", "Weekly_Sales"]]
        
        # Grouping by month, finding the avg weekly sales, reseting the index, rounding to 2 decimals
        clean_data = clean_data.groupby("Month").agg({'Weekly_Sales': 'mean'}).reset_index().round(2)

        # Renaming weekly_sales column
        clean_data.rename(columns={'Weekly_Sales': 'Avg_Sales'}, inplace=True)
        
        print(clean_data)
        return clean_data
    except Exception as e:
        print(e)

In [33]:
# Call the avg_weekly_sales_per_month() function and pass the cleaned DataFrame
agg_data = avg_weekly_sales_per_month(clean_data)

    Month  Avg_Sales
0     1.0   33174.18
1     2.0   34333.33
2     3.0   33220.89
3     4.0   33392.37
4     5.0   33339.89
5     6.0   34582.47
6     7.0   33922.76
7     8.0   33644.79
8     9.0   33258.05
9    10.0   32736.99
10   11.0   36594.03
11   12.0   39238.80


In [34]:
# Create the load() function that takes in the cleaned DataFrame and the aggregated one with the paths where they are going to be stored
def load(full_data, full_data_file_path, agg_data, agg_data_file_path):
    # Write your code here
    try:
        # Load full data to csv
        full_data.to_csv(full_data_file_path, index=False)
        print(f'Full data loaded to: {full_data_file_path}')

        #Load agg. data to csv
        agg_data.to_csv(agg_data_file_path, index=False)
        print(f'Agg. data loaded to: {agg_data_file_path}')
        
    except Exception as e:
        print(e)

In [35]:
# Call the load() function and pass the cleaned and aggregated DataFrames with their paths  
load(clean_data, 'clean_data.csv', agg_data, 'agg_data.csv')

Full data loaded to: clean_data.csv
Agg. data loaded to: agg_data.csv


In [36]:
# Create the validation() function with one parameter: file_path - to check whether the previous function was correctly executed
def validation(file_path):
    # Write your code here
    try:
        if os.path.exists(file_path):
            print(f'File Path: {file_path} exists in the current directory!')
        else:
            print(f'File Path: {file_path} does not exist!')
    except Exception as e:
        print(e)

In [37]:
# Call the validation() function and pass first, the cleaned DataFrame path, and then the aggregated DataFrame path
validation('clean_data.csv')
validation('agg_data.csv')

File Path: clean_data.csv exists in the current directory!
File Path: agg_data.csv exists in the current directory!
