In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [57]:
import pandas as pd
import numpy as np
import holidays
import logging
import datetime as dt

import os

In this project, we’ll create a data pipeline to analyze supply and demand patterns using two key data sources: **grocery sales data** and **complementary data**.

**In brief:**

**grocery_sales.csv**:
- **index**: Unique row ID
- **Store_ID**: Store number
- **Date**: Week of sales
- **Weekly_Sales**: Sales amount for the specific store

**extra_data.parquet**:
- **IsHoliday**: 1 if the week includes a public holiday, 0 otherwise
- **Temperature**: Temperature on the sale date
- **Fuel_Price**: Regional fuel cost
- **CPI**: Consumer Price Index
- **Unemployment**: Unemployment rate at the time
- **MarkDown1** to **MarkDown4**: Number of promotional markdowns
- **Dept**: Department number within the store
- **Size**: Store size
- **Type**: Store type, based on size

# ▶ Extract

Create extract() function that reads data from a CSV file and a Parquet file, merging them into a single DataFrame based on a specified column. The function returns the merged DataFrame.

In [29]:
def extract(csv_file, parquet_file, merge_id):
  """
  Extracts data from CSV and Parquet files and outputs a merged DataFrame using the merge id.

  Args:
    csv_file: The path to the CSV file.
    parquet_file: The path to the Parquet file.
    merge_id: The column name to merge on.

  Returns:
    A merged DataFrame.
  """

  df_1 = pd.read_csv(csv_file)
  df_2 = pd.read_parquet(parquet_file)
  merged_df = pd.merge(df_1, df_2, on=merge_id)
  return merged_df

In [30]:
# Apply extract function on given files
merged_data = extract('grocery_sales.csv', 'extra_data.parquet', 'index')

# Display the merged data
display(merged_data)

Unnamed: 0.1,Unnamed: 0,index,Store_ID,Date,Dept,Weekly_Sales,IsHoliday,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,Type,Size
0,0,0,1,2010-02-05,1,24924.50,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
1,1,1,1,2010-02-05,26,11737.12,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
2,2,2,1,2010-02-05,17,13223.76,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,3.0,151315.0
3,3,3,1,2010-02-05,45,37.44,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
4,4,4,1,2010-02-05,28,1085.29,0,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,,3.0,151315.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
231517,231517,232414,24,2011-05-06,8,49471.07,0,55.75,4.192,0.0,0.0,0.0,0.0,0.0,134.514367,8.212,3.0,203819.0
231518,231518,232415,24,2011-05-06,50,1210.00,0,55.75,4.192,0.0,0.0,0.0,0.0,0.0,134.514367,8.212,3.0,203819.0
231519,231519,232416,24,2011-05-06,87,25893.32,0,55.75,4.192,0.0,0.0,0.0,0.0,0.0,134.514367,8.212,3.0,203819.0
231520,231520,232417,24,2011-05-06,85,1357.83,0,55.75,4.192,0.0,0.0,0.0,0.0,0.0,134.514367,8.212,3.0,203819.0


In [None]:
# Print the sumamry information of merged data
print(merged_data.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 231522 entries, 0 to 231521
Data columns (total 18 columns):
 #   Column        Non-Null Count   Dtype  
---  ------        --------------   -----  
 0   Unnamed: 0    231522 non-null  int64  
 1   index         231522 non-null  int64  
 2   Store_ID      231522 non-null  int64  
 3   Date          231483 non-null  object 
 4   Dept          231522 non-null  int64  
 5   Weekly_Sales  231484 non-null  float64
 6   IsHoliday     231522 non-null  int64  
 7   Temperature   231522 non-null  float64
 8   Fuel_Price    231522 non-null  float64
 9   MarkDown1     231522 non-null  float64
 10  MarkDown2     231522 non-null  float64
 11  MarkDown3     231522 non-null  float64
 12  MarkDown4     231521 non-null  float64
 13  MarkDown5     231521 non-null  float64
 14  CPI           231475 non-null  float64
 15  Unemployment  231485 non-null  float64
 16  Type          231521 non-null  float64
 17  Size          231521 non-null  float64
dtypes: f

# ▶   Transform

Implement a transform() function that takes merged_df as input. This function will fill missing values, add a "Month" column and keep only rows where "Weekly_Sales" exceeds $10,000. It will also drop any unnecessary columns, keeping only the following:

- "Store_ID"
- "Month"
- "Dept"
- "IsHoliday"
- "Weekly_Sales"
- "CPI"
- "Unemployment"

The function will return a cleaned DataFrame.


In [46]:
# Set up logging configuration
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')


def transform(df):
  """
      Transforms data by applying the following:
      - raises an error if 'Store_ID' or 'Dept_ID' are missing and outputs missing values for further investigation.
      - converts 'Date' to datetime
      - creates a 'Month' column
      - fills missing values of 'CPI', 'Weekly_Sales' and 'Unemployment' with mean values
      - fills missing values of 'IsHoliday' using holiday library

      Args:
        df: The input DataFrame.

      Returns:
        A transformed DataFrame.
  """

  # Flag and separate missing values in Store id and Dept for further analysis
  if df['Store_ID'].isna().sum() > 0:
    df['missing_store_id'] = df['Store_ID'].isna().astype(int)
    logging.error("There are missing values in the 'Store_ID' column.")
    raise ValueError("Check on missing values of 'Store_ID in output DataFrame before proceeding.")
    return df['missing_store_id']
  if df['Dept'].isna().sum() > 0:
    df['missing_dept'] = df['Dept'].isna().astype(int)
    logging.error("There are missing values in the 'Dept' column.")
    raise ValueError("Check on missing values of 'Dept' in output DataFrame before proceeding.")
    return df['missing_dp_id']

  # Convert Date column data type from object to datetime
  df['Date'] = pd.to_datetime(df['Date'], format='%Y-%m-%d')

  # Fill missing 'Date' values with the previous non-missing value
  df['Date'] = df['Date'].ffill()

  # Create 'Month' column
  df['Month'] = df['Date'].dt.month

  # Initialize US holidays for checking
  us_holidays = holidays.US()

  # Fill missing values
  df.fillna({
            'CPI': df['CPI'].mean(),
            'Weekly_Sales' : df['Weekly_Sales'].mean(),
            'Unemployment' : df['Unemployment'].mean(),
            'IsHoliday' : df['Date'].apply(lambda x: 1 if x in us_holidays else 0)
            }, inplace = True)

  # Return rows where weekly sales are above 10000$
  cleaned_df = df[df['Weekly_Sales']>10000]

  # Return df only with selected columns
  cleaned_df = cleaned_df[["Store_ID", "Month", "Dept" , "IsHoliday", "Weekly_Sales", "CPI", "Unemployment"]]

  return cleaned_df


In [47]:
# Apply transform function to merged_data
cleaned_data = transform(merged_data)

# Display the cleaned data
display(cleaned_data)

Unnamed: 0,Store_ID,Month,Dept,IsHoliday,Weekly_Sales,CPI,Unemployment
0,1,2,1,0,24924.50,211.096358,8.106000
1,1,2,26,0,11737.12,211.096358,8.106000
2,1,2,17,0,13223.76,211.096358,8.106000
5,1,2,79,0,46729.77,211.096358,7.500052
6,1,2,55,0,21249.31,211.096358,7.500052
...,...,...,...,...,...,...,...
231513,24,5,40,0,45396.26,134.514367,8.212000
231515,24,5,93,0,41295.84,134.514367,8.212000
231516,24,5,9,0,24024.18,134.514367,8.212000
231517,24,5,8,0,49471.07,134.514367,8.212000


In [48]:
# Check for missing values in cleaned data
print(cleaned_data.isna().sum())

Store_ID        0
Month           0
Dept            0
IsHoliday       0
Weekly_Sales    0
CPI             0
Unemployment    0
dtype: int64


In [49]:
def avg_weekly_sales_per_month(df):
  """
      Calculates the average weekly sales per month for each store.

      Args:
        df: The input DataFrame.

      Returns:
        DataFrame with the average weekly sales per month for each store.
  """

  df = df.groupby(['Store_ID', 'Month'])['Weekly_Sales'].agg('mean').round(2).reset_index()

  return df

In [50]:
# Create a dataframe with the average weekly sales per month for each store
avg_weekly_sales = avg_weekly_sales_per_month(cleaned_data)

# Display the average weekly sales per month for each store
display(avg_weekly_sales)


Unnamed: 0,Store_ID,Month,Weekly_Sales
0,1,1,38059.25
1,1,2,37813.26
2,1,3,37422.25
3,1,4,38343.56
4,1,5,38081.70
...,...,...,...
283,24,8,32660.73
284,24,9,31436.47
285,24,10,30824.13
286,24,11,34865.07


# ▶ Load

Create a load() function that takes the cleaned and aggregated DataFrames along with their file paths as input. This function will save them ensuring no index is included in the saved files.

In [52]:
def load(clean_data, agg_data, clean_data_path, agg_data_path):
  """
      Loads the cleaned and aggregated DataFrames into CSV files.
      Args:
        clean_data: The cleaned DataFrame.
        agg_data: The aggregated DataFrame.
        clean_data_path: The path to save the cleaned DataFrame.
        agg_data_path: The path to save the aggregated DataFrame.
  """
  clean_data.to_csv(clean_data_path, index=False)
  agg_data.to_csv(agg_data_path, index=False)

In [53]:
# Load the two dataframes to csv paths
load(cleaned_data, avg_weekly_sales, 'clean_data.csv', 'agg_data.csv')

# ▶ Validate

Create a validate() function two check whether teh csv file generated from load() function exists in the working directory.

In [56]:
def validation(csv_path):
  """
      Validates if CSV file in the current working directory exist.

      Args:
        data_path: The path to the CSV file.

      Returns:
        True if both files exist, False otherwise.
  """
  if os.path.exists(csv_path):
    return True
  else:
    return False

In [55]:
# Validate both csv paths if they exist

print(validation('clean_data.csv'))
print(validation('agg_data.csv'))

True
True
