![walmartecomm](walmartecomm.jpg)

Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas. 

In this project, you have been tasked with creating a data pipeline for the analysis of supply and demand around the holidays, along with conducting a preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the `grocery_sales` table in `PostgreSQL` database with the following features:

# `grocery_sales`
- `"index"` - unique ID of the row
- `"Store_ID"` - the store number
- `"Date"` - the week of sales
- `"Weekly_Sales"` - sales for the given store

Also, you have the `extra_data.parquet` file that contains complementary data:

# `extra_data.parquet`
- `"IsHoliday"` - Whether the week contains a public holiday - 1 if yes, 0 if no.
- `"Temperature"` - Temperature on the day of sale
- `"Fuel_Price"` - Cost of fuel in the region
- `"CPI"` – Prevailing consumer price index
- `"Unemployment"` - The prevailing unemployment rate
- `"MarkDown1"`, `"MarkDown2"`, `"MarkDown3"`, `"MarkDown4"` - number of promotional markdowns
- `"Dept"` - Department Number in each store
- `"Size"` - size of the store
- `"Type"` - type of the store (depends on `Size` column)

You will need to merge those files and perform some data manipulations. The transformed DataFrame can then be stored as the `clean_data` variable containing the following columns:
- `"Store_ID"`
- `"Month"`
- `"Dept"`
- `"IsHoliday"`
- `"Weekly_Sales"`
- `"CPI"`
- "`"Unemployment"`"

After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis as the `agg_data` variable that should look like:

|  Month | Weekly_Sales  | 
|---|---|
| 1.0  |  33174.178494 |
|  2.0 |  34333.326579 |
|  ... | ...  |  

Finally, you should save the `clean_data` and `agg_data` as the csv files.

It is recommended to use `pandas` for this project. 

Build a data pipeline using custom functions to extracts, transforms, aggregates, and loads e-commerce data.

1. Create a function called extract() that combines the grocery_sales table and the extra_data.parquet file, returning a variable called merged_df containing data from both sources.

2. Implement a function named transform() that takes the merged_df as input, fills missing numerical values (using any method of your choice), adds a column "Month", keeps the rows where the weekly sales are over $10,000 and drops the unnecessary columns. Ultimately, it should return a DataFrame and be stored as the clean_data variable.

3. Define a function called avg_monthly_sales() that takes clean_data as input and returns an aggregated DataFrame containing two columns - "Month" and "Avg_Sales" (rounded to 2 decimals). You should call the function and store the results as a variable called agg_data.

4. Create a function called load() that takes the cleaned and aggregated DataFrames, and their paths, and saves them as clean_data.csv and agg_data.csv respectively, without an index.

5. Lastly, define a validation() function that checks whether the two csv files from the load() exist in the current working directory.

Note: No engine is required to connect to the database, just execute your query in the SQL code cell provided, and the output will automatically be stored as a pandas DataFrame called grocery_sales, available for you to use in Python code cells.

In [1]:
# -- Write your SQL query here
SELECT * FROM grocery_sales;

SyntaxError: invalid syntax (1807553262.py, line 2)

In [None]:
import pandas as pd
import os


# Start here...
def extract(db_data, extra_data):
    #     let's load the parquet file
    extra_data_df = pd.read_parquet(extra_data)

    #     let's merge the grocery data with the extra data
    merged_df = db_data.merge(extra_data_df, on="index")

    return merged_df

In [None]:
# implementing the transform function
def transform(merged_dataframe):
    #     let's fill the missing values in the necessary
    merged_dataframe.fillna({
        "Weekly_Sales": merged_dataframe['Weekly_Sales'].mean(),
        "CPI": merged_dataframe['CPI'].mean(),
        "Unemployment": merged_dataframe['Unemployment'].mean()
    }, inplace=True)

    #     let's add the new column - 'Month'
    merged_dataframe['Date'] = pd.to_datetime(merged_dataframe['Date'], format="%Y-%m-%d")
    merged_dataframe['Month'] = merged_dataframe['Month'].dt.month

    # let's filter the rows with weekly sales above $10,000
    merged_dataframe = merged_dataframe.loc[merged_dataframe['Weekly_Sales'] > 10000, :]

    # let's drop all the unnecessary columns
    merged_dataframe = merged_dataframe.drop(
        [
            "index",
            "Date",
            "Temperature",
            "Fuel_Price",
            "Markdown1",
            "Markdown2",
            "Markdown3",
            "Markdown4",
            'Size',
            'Type'
        ],
        axis=1
    )
    
    return merged_dataframe

In [None]:
# let's implement the average monthly sales
def avg_monthly_sales(clean_data):
    # getting the two columns: Month & Weekly_Sales
    # sales = clean_data[['Month', 'Weekly_Sales']]

    # 
    agg_data = (
        clean_data.groupby("Month")["Sales"]
        .mean()
        .round(2)
        .reset_index(name="Avg_Sales")
    )
    
    return agg_data

In [None]:
# let's implement the load function
def load(full_data, full_data_path, aggregated_data, aggregated_data_path):
    # let's convert the dataframes to csv files
    full_data.to_csv(full_data_path, index=False)
    aggregated_data.to_csv(aggregated_data_path, index=False)

In [None]:
def validation():
    """Check whether the two CSV files from the load() exist in the current working directory."""
    # List of CSV files to check
    files_to_check = ["raw_tax_data.csv", "clean_tax_data.parquet"]

    # Check if each file exists in the current working directory
    for file in files_to_check:
        if not os.path.exists(file):
            print(
                f"Error: File '{file}' does not exist in the current working directory."
            )
            return False

    print("Validation successful: Both files exist in the current working directory.")
    return True