# 📉 Retail Sales Data Analysis: ETL Pipeline

## 🎯 Project Goal

This notebook details the **Extract, Transform, Load (ETL)** pipeline for the retail sales data. The primary objective is to clean and prepare the raw data from three CSV files into a single, cohesive dataset ready for analysis. The pipeline is designed to be automated and includes documentation and validation checks to ensure data integrity.

---

### 1.1 Extract: Loading Data from CSV

This initial step involves loading the raw data from the provided CSV files into pandas DataFrames. This is the "extraction" phase of the ETL process. The code below loads the data and provides a quick inspection of the first few rows and data types to understand the initial state of the data.


In [1]:
import pandas as pd
import numpy as np
import os


def extract_raw_data(raw_data_dir='../data/raw_data/', preview=True):
    """
    Extracts raw retail sales data from CSV files.

    Parameters:
        raw_data_dir (str): Path to the directory containing raw CSV files.
        preview (bool): Whether to display head and shape of each DataFrame.

    Returns:
        stores_df, features_df, sales_df: Loaded pandas DataFrames.
    """
    try:
        stores_file = os.path.join(raw_data_dir, 'Stores.csv')
        features_file = os.path.join(raw_data_dir, 'Features.csv')
        sales_file = os.path.join(raw_data_dir, 'Sales.csv')

        stores_df = pd.read_csv(stores_file)
        features_df = pd.read_csv(features_file)
        sales_df = pd.read_csv(sales_file)

        if preview:
            print("\n✅ Data extracted successfully!")
            print(f"📦 Stores data shape:   {stores_df.shape}")
            print(f"📊 Features data shape: {features_df.shape}")
            print(f"🛒 Sales data shape:    {sales_df.shape}")
            display(stores_df.head())
            display(features_df.head())
            display(sales_df.head())

        return stores_df, features_df, sales_df

    except FileNotFoundError as e:
        print(f"\n❌ File not found: {e.filename}")
        print("🔍 Please ensure the CSV files are located in the '../data/raw_data/' directory.")
        return None, None, None


# Run extraction
stores_df, features_df, sales_df = extract_raw_data()


✅ Data extracted successfully!
📦 Stores data shape:   (45, 3)
📊 Features data shape: (8190, 12)
🛒 Sales data shape:    (421570, 5)


Unnamed: 0,Store,Type,Size
0,1,A,151315
1,2,A,202307
2,3,B,37392
3,4,A,205863
4,5,B,34875


Unnamed: 0,Store,Date,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,IsHoliday
0,1,05/02/2010,42.31,2.572,,,,,,211.096358,8.106,False
1,1,12/02/2010,38.51,2.548,,,,,,211.24217,8.106,True
2,1,19/02/2010,39.93,2.514,,,,,,211.289143,8.106,False
3,1,26/02/2010,46.63,2.561,,,,,,211.319643,8.106,False
4,1,05/03/2010,46.5,2.625,,,,,,211.350143,8.106,False


Unnamed: 0,Store,Dept,Date,Weekly_Sales,IsHoliday
0,1,1,05/02/2010,24924.5,False
1,1,1,12/02/2010,46039.49,True
2,1,1,19/02/2010,41595.55,False
3,1,1,26/02/2010,19403.54,False
4,1,1,05/03/2010,21827.9,False


### 1.2 Transform: Cleaning and Preprocessing Data

This is the core of the ETL pipeline where data is cleaned, transformed, and merged. The transformation logic is as follows:

- **Data Type Conversion**: The `Date` column in the `features_df` and `sales_df` is converted from a string to a datetime object for proper time-series analysis.
- **Data Merging**: The three DataFrames are merged into a single, comprehensive DataFrame (`merged_df`) using `Store` and `Date` as common keys.
- **Handling Missing Values**:
  - `MarkDown` columns: Missing values are filled with `0.0`, as `NaN` likely indicates that no markdown was applied.
  - `CPI` and `Unemployment`: Missing values are imputed using the mean of their respective columns.
- **Feature Engineering**: A new feature, `Sales_Holiday_Difference`, is created to quantify the difference between a week's sales and the average sales for a holiday or non-holiday period.


Step 1: Convert Date Columns

- Convert Date columns in features_df and sales_df to datetime format.
- Use dayfirst=True to correctly interpret DD/MM/YYYY.


In [2]:
for df, name in [(features_df, 'features_df'), (sales_df, 'sales_df')]:
    if 'Date' in df.columns:
        df['Date'] = pd.to_datetime(df['Date'], dayfirst=True)
    else:
        print(f"⚠️ 'Date' column missing in {name}")

Step 2: Merge DataFrames

- Merge sales_df with features_df on Store, Date, and IsHoliday.
- Then merge the result with stores_df on Store.


In [3]:
merged_df = pd.merge(sales_df, features_df, on=[
                     'Store', 'Date', 'IsHoliday'], how='left')
merged_df = pd.merge(merged_df, stores_df, on='Store', how='left')

print("\n✅ Data merged successfully. Final shape:")
print(f"🧮 {merged_df.shape}")


✅ Data merged successfully. Final shape:
🧮 (421570, 16)


Step 3: Handle Missing Values

- Fill missing MarkDown values with 0.0 (assumed no promotion).
- Impute missing CPI and Unemployment using forward-fill and backward-fill


In [4]:
markdown_cols = ['MarkDown1', 'MarkDown2',
                 'MarkDown3', 'MarkDown4', 'MarkDown5']
for col in markdown_cols:
    if col in merged_df.columns:
        merged_df[col] = merged_df[col].fillna(0)

for col in ['CPI', 'Unemployment']:
    if col in merged_df.columns:
        merged_df[col] = merged_df[col].ffill().bfill()

print("\n🧼 Missing values imputed.")
print(merged_df.isnull().sum())


🧼 Missing values imputed.
Store           0
Dept            0
Date            0
Weekly_Sales    0
IsHoliday       0
Temperature     0
Fuel_Price      0
MarkDown1       0
MarkDown2       0
MarkDown3       0
MarkDown4       0
MarkDown5       0
CPI             0
Unemployment    0
Type            0
Size            0
dtype: int64


Step 4: Feature Engineering

- Create Sales_Holiday_Difference: deviation from average sales for holiday vs. non-holiday weeks.


In [5]:
merged_df['Sales_Holiday_Difference'] = merged_df.groupby('IsHoliday')['Weekly_Sales'].transform(
    lambda x: x - x.mean()
)

Step 5: Inflation Adjustment & Base Sales

- Adjust Weekly_Sales for inflation using the first CPI value as baseline.
- Identify promotional periods and calculate average non-promotional sales.


In [6]:
baseline_cpi = merged_df['CPI'].iloc[0]
merged_df['Inflation_Adjusted_Weekly_Sales'] = merged_df['Weekly_Sales'] / \
    (merged_df['CPI'] / baseline_cpi)

merged_df['Is_Promotional_Period'] = (merged_df[markdown_cols] > 0).any(axis=1)
base_sales = merged_df.loc[~merged_df['Is_Promotional_Period'],
                           'Weekly_Sales'].mean()

print("\n📈 Additional Metrics Calculated:")
print(f"🧮 Baseline CPI: {baseline_cpi}")
print(f"📊 Avg Weekly Sales (Non-Promotional): {base_sales:.2f}")


📈 Additional Metrics Calculated:
🧮 Baseline CPI: 211.0963582
📊 Avg Weekly Sales (Non-Promotional): 15871.52


Step 6: Final Sorting and Preview

- Sort by Store, Dept, and Date, then reset index.


In [7]:
merged_df = merged_df.sort_values(
    by=['Store', 'Dept', 'Date']).reset_index(drop=True)
print("\n✅ Final data preparation complete. Preview:")
display(merged_df.head())


✅ Final data preparation complete. Preview:


Unnamed: 0,Store,Dept,Date,Weekly_Sales,IsHoliday,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,Type,Size,Sales_Holiday_Difference,Inflation_Adjusted_Weekly_Sales,Is_Promotional_Period
0,1,1,2010-02-05,24924.5,False,42.31,2.572,0.0,0.0,0.0,0.0,0.0,211.096358,8.106,A,151315,9023.054931,24924.5,False
1,1,1,2010-02-12,46039.49,True,38.51,2.548,0.0,0.0,0.0,0.0,0.0,211.24217,8.106,A,151315,29003.666813,46007.710873,False
2,1,1,2010-02-19,41595.55,False,39.93,2.514,0.0,0.0,0.0,0.0,0.0,211.289143,8.106,A,151315,25694.104931,41557.597337,False
3,1,1,2010-02-26,19403.54,False,46.63,2.561,0.0,0.0,0.0,0.0,0.0,211.319643,8.106,A,151315,3502.094931,19383.037819,False
4,1,1,2010-03-05,21827.9,False,46.5,2.625,0.0,0.0,0.0,0.0,0.0,211.350143,8.106,A,151315,5926.454931,21801.689528,False


### 1.5 Validate: Data Quality Checks

This step ensures the integrity and quality of the transformed data. We will validate that:

1.  There are no missing values after the imputation process.
2.  The new `Sales_Holiday_Difference` column has been created successfully.


Step 1: Check for Missing Values

- Ensure all missing values have been handled.
- This includes MarkDown, CPI, and Unemployment columns.


In [8]:
print("🔎 Checking for null values after transformation:")
null_counts = merged_df.isnull().sum()
print(null_counts)

if null_counts.sum() == 0:
    print("\n✅ No missing values detected.")
else:
    print("\n⚠️ Warning: Missing values still present. Please review the imputation steps.")

🔎 Checking for null values after transformation:
Store                              0
Dept                               0
Date                               0
Weekly_Sales                       0
IsHoliday                          0
Temperature                        0
Fuel_Price                         0
MarkDown1                          0
MarkDown2                          0
MarkDown3                          0
MarkDown4                          0
MarkDown5                          0
CPI                                0
Unemployment                       0
Type                               0
Size                               0
Sales_Holiday_Difference           0
Inflation_Adjusted_Weekly_Sales    0
Is_Promotional_Period              0
dtype: int64

✅ No missing values detected.


Step 2: Validate Feature Creation

- Confirm that Sales_Holiday_Difference exists and contains valid values.


In [9]:
if 'Sales_Holiday_Difference' in merged_df.columns:
    print("\n✅ 'Sales_Holiday_Difference' column created successfully.")
    print("📊 First 5 values:")
    print(merged_df['Sales_Holiday_Difference'].head())
else:
    print("\n❌ Error: 'Sales_Holiday_Difference' column was not created.")


✅ 'Sales_Holiday_Difference' column created successfully.
📊 First 5 values:
0     9023.054931
1    29003.666813
2    25694.104931
3     3502.094931
4     5926.454931
Name: Sales_Holiday_Difference, dtype: float64


Additional Checks (for deeper validation):

- Confirm expected data types for key columns (Date, Weekly_Sales, CPI)
- Check for duplicate rows
- Validate value ranges (e.g. no negative sales)


In [10]:
# 🧪 Data Validation Function
def validate_data(df):
    """
    Validates key aspects of the transformed dataset.
    Prints warnings or confirmations for:
    - Expected data types
    - Duplicate rows
    - Value ranges (e.g. no negative sales)
    """
    print("🔍 Starting data validation...\n")

    # 1. Check expected data types
    expected_types = {
        'Date': 'datetime64[ns]',
        'Weekly_Sales': 'float',
        'CPI': 'float'
    }

    print("📌 Checking column data types:")
    for col, expected_type in expected_types.items():
        if col in df.columns:
            actual_type = df[col].dtype
            if str(actual_type) == expected_type:
                print(f"✅ {col}: {actual_type} (as expected)")
            else:
                print(f"⚠️ {col}: {actual_type} (expected {expected_type})")
        else:
            print(f"❌ {col} column not found in DataFrame.")

    # 2. Check for duplicate rows
    duplicate_count = df.duplicated().sum()
    if duplicate_count == 0:
        print("\n✅ No duplicate rows found.")
    else:
        print(f"\n⚠️ {duplicate_count} duplicate rows detected.")

    # 3. Validate value ranges
    print("\n📊 Checking for negative values in 'Weekly_Sales':")
    if 'Weekly_Sales' in df.columns:
        negative_sales = (df['Weekly_Sales'] < 0).sum()
        if negative_sales == 0:
            print("✅ No negative sales values.")
        else:
            print(f"⚠️ {negative_sales} rows have negative sales values.")
    else:
        print("❌ 'Weekly_Sales' column not found.")

    print("\n✅ Data validation complete.\n")

### 1.3 Load: Storing the Transformed Data

The final step of the ETL process involves saving the cleaned and transformed DataFrame to a new CSV file. This file, `cleaned_sales_data.csv`, is the final output of the pipeline and is ready for in-depth analysis.


In [11]:
print("Type of column index:", type(merged_df.columns))
print("Column index contents:", merged_df.columns.tolist())

Type of column index: <class 'pandas.core.indexes.base.Index'>
Column index contents: ['Store', 'Dept', 'Date', 'Weekly_Sales', 'IsHoliday', 'Temperature', 'Fuel_Price', 'MarkDown1', 'MarkDown2', 'MarkDown3', 'MarkDown4', 'MarkDown5', 'CPI', 'Unemployment', 'Type', 'Size', 'Sales_Holiday_Difference', 'Inflation_Adjusted_Weekly_Sales', 'Is_Promotional_Period']


In [12]:
import os
import pandas as pd

# 🧼 Sanitize index and column headers
merged_df.index = pd.RangeIndex(start=0, stop=len(merged_df), step=1)
merged_df.columns = merged_df.columns.astype(str)

# 📁 Define output path using os.getcwd() for interactive environments
project_root = os.getcwd()
output_dir = os.path.join(project_root, 'data', 'processed_data')
output_file = os.path.join(output_dir, 'cleaned_sales_data.csv')

# 📂 Create directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# 💾 Save to CSV
merged_df.to_csv(output_file, index=False)

print(f"\n✅ Data has been successfully saved to '{output_file}'")


✅ Data has been successfully saved to 'c:\Users\masna\OneDrive\Documents\vscode-projects\Project Assessment Folder\Retail_Sales_Analysis_Project\Retail_Sales_Analysis_Project\jupyter_notebooks\data\processed_data\cleaned_sales_data.csv'


### 1.4 Automate & 1.6 Document: Final Notes

This entire notebook serves as the automation and documentation for the ETL process.

- **Automation**: By running all cells in this notebook sequentially, the full ETL pipeline is executed automatically, from data extraction to loading the final cleaned dataset. This script can be scheduled or integrated into a larger workflow.

- **Documentation**: The markdown cells provide clear explanations of the data sources, the transformation logic, and the validation checks performed. This documentation makes the process transparent and easy for others to understand and replicate.
