# Goal

Build an automated ETL pipeline to process the e-commerce sales data (dataset1.csv), validate it, store it in a SQLite database with a structured schema, and generate an interactive dashboard to analyze sales and profit trends.

Why more difficult?

Automation: Write reusable functions to process data automatically.
Data Validation: Check data quality (e.g., negative sales, invalid dates).
Modular Code: Organize code into functions for scalability.
Interactive Dashboard: Use Plotly for a web-based dashboard.
ETL Focus: Mimic real-world data engineering workflows.

# Define the Objective

Build an ETL pipeline to:

- Extract data from dataset1.csv.
- Transform it (clean, validate, enrich).
- Load it into a SQLite database.
- Create an interactive dashboard showing sales and profit trends.

In [1]:
import pandas as pd
import logging

In [2]:
# Set up logging to track progress
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Write the ETL Pipeline (Extract)

- What: Load the data and validate it.
- Why: Ensure the data is reliable before processing.

In [3]:
def extract_data(file_path):
    "Load CSV file and validate basic structure"
    try:
        df = pd.read_csv(file_path)
        logging.info(f"Loaded {len(df)} row from {file_path}")
        if df.empty:
            raise ValueError("Dataset is empty")
        if not all (col in df.columns for col in ['Order_Date', 'Sales', 'Profit', 'Product_Category']):
            raise ValueError("Required columns missing")
        return df
    except Exception as e:
        logging.error(f"Error Loading Data {e}")
        raise
    

In [4]:
# # Test the extract function
# df = extract_data('dataset1.csv')
# df.head()

# What’s new?

- Logging: Tracks progress and errors (useful for debugging).
- Validation: Checks for empty data or missing key columns.

# Transform the Data

- What: Clean, validate, and enrich the data.
- Why: To ensure quality and add useful features.

In [5]:
def transform_data(df):
    "Clean, validate, and enrich the dataset."
    try:
        # Create a copy to avoid modifying the original
        df_clean = df.copy()
        
        # Remove Missing values
        df_clean.dropna()
        logging.info(f"Remove Null Values")
        
        # Convert data types
        df_clean['Order_Date'] = pd.to_datetime(df_clean['Order_Date'])
        categorical_Cols = ['Gender', 'Product_Category', 'Order_Priority', 'Payment_method']
        for col in categorical_Cols:
            df_clean[col] = df_clean[col].astype('category')
            
        # Validate numerical data
        if (df_clean['Sales'] < 0).any():
            logging.warning("Negative sales detected, setting to 0")
            df_clean['Sales'] = df_clean['Sales'].clip(lower=0)
        if (df_clean['Profit'] < 0).any():
            logging.warning("Negative profit detected, setting to 0")
            df_clean['Profit'] = df_clean['Profit'].clip(lower=0)
            
        # Remove duplicates
        df_clean = df_clean.drop_duplicates()
        logging.info(f"Removed {len(df) - len(df_clean)} duplicates")
        
        # Enrich: Add year, month, and profit margin
        df_clean['Year'] = df_clean['Order_Date'].dt.year
        df_clean['Month'] = df_clean['Order_Date'].dt.month
        df_clean['Profit_Margin'] = df_clean['Profit'] / df_clean['Sales'] * 100
        
        if df_clean['Order_Date'].isnull().any():
            logging.warning("Invalid dates detected, dropping rows")
            df_clean = df_clean.dropna(subset=['Order_Date'])
        logging.info(f"Transformed data: {len(df_clean)} rows")
        return df_clean
    except Exception as e:
        logging.error(f"Error transforming data: {e}")
        raise

# any()

any() checks if at least one value in a list, array, or Pandas Series is True. In this case, it’s used to check if there are any negative values in the Sales or Profit columns

In [6]:
# # Test the transform function
# df_transformed = transform_data(df)
# df_transformed.info()

In [7]:
# df_transformed[['Sales', 'Profit', 'Profit_Margin']].head()

# Load the Data

- What: Store the transformed data in a SQLite database.
- Why: To practice data storage, a core data engineering skill.

In [8]:
import sqlite3

In [9]:
def load_data(df, db_name, table_name):
    """Save data to SQLite database."""
    try:
        conn = sqlite3.connect(db_name)
        # Define schema explicitly
        df.to_sql(table_name, conn, if_exists='replace', index=False)
        conn.commit()
        logging.info(f"Saved {len(df)} rows to {db_name}.{table_name}")
        conn.close()
    except Exception as e:
        logging.error(f"Error loading data: {e}")
        raise

In [10]:
# Test the Load function
# load_data(df_transformed, 'sales_data.db', 'sales_cleaned')
# # Verify
# conn = sqlite3.connect('sales_data.db')
# test_df = pd.read_sql('SELECT * FROM sales_cleaned', conn)
# print(test_df.head())
# conn.close()

# Automate the Pipeline

- What: Combine extract, transform, and load into a single pipeline.
- Why: Automation is a key data engineering skill.

In [11]:
def run_pipeline(file_path, db_name, table_name):
    """Run the full ETL pipeline."""
    logging.info("Starting ETL pipeline")
    df = extract_data(file_path)
    df_transformed = transform_data(df)
    load_data(df_transformed, db_name, table_name)
    logging.info("ETL pipeline completed")
    return df_transformed

In [12]:
df_final = run_pipeline('dataset1.csv', 'sales_data.db', 'sales_cleaned')

2025-04-20 10:58:49,081 - INFO - Starting ETL pipeline
2025-04-20 10:58:49,214 - INFO - Loaded 51290 row from dataset1.csv
2025-04-20 10:58:49,247 - INFO - Remove Null Values
2025-04-20 10:58:49,342 - INFO - Removed 0 duplicates
2025-04-20 10:58:49,352 - INFO - Transformed data: 51290 rows
2025-04-20 10:58:49,870 - INFO - Saved 51290 rows to sales_data.db.sales_cleaned
2025-04-20 10:58:49,872 - INFO - ETL pipeline completed


# Create an Interactive Dashboard

- What: Build a Plotly dashboard to visualize trends.
- Why: To share insights interactively, a valuable skill.

In [13]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.io as pio
pio.renderers.default = "browser"

In [27]:
def plot_dashboard(df):
    "Create an interactive dashboard with sales and profit trends."
    # Create subplots
    fig = make_subplots(rows=2, cols=2, 
                        subplot_titles=("Sales by Month", "Profit by Product Category", "Profit Margin Distribution", "Discount vs. Profit"))

    # Plot 1: Sales by month
    sales_by_month = df.groupby('Month')['Sales'].sum().reset_index()
    trace1 = go.Scatter(x=sales_by_month['Month'], y=sales_by_month['Sales'], mode='lines+markers')
    fig.add_trace(trace1, row=1, col=1)

    # Plot 2: Profit by product category
    profit_by_category = df.groupby('Product_Category', observed=True)['Profit'].mean().reset_index()
    trace2 = go.Bar(x=profit_by_category['Product_Category'], y=profit_by_category['Profit'])
    fig.add_trace(trace2, row=1, col=2)

    # Plot 3: Profit margin distribution
    trace3 = go.Histogram(x=df['Profit_Margin'], nbinsx=20)
    fig.add_trace(trace3, row=2, col=1)

    # Plot 4: Discount vs. profit
    discount_profit = df.groupby('Discount')['Profit'].mean().reset_index()
    trace4 = go.Bar(x=discount_profit['Discount'], y=discount_profit['Profit'], marker_color='yellow')
    fig.add_trace(trace4, row=2, col=2)

    # Update layout
    fig.update_layout(title_text="Sales and Profit Dashboard", showlegend=False, height=800)
    fig.show(renderer="browser")

In [28]:
print("Starting to generate dashboard...")
plot_dashboard(df_final)
print("Dashboard rendering complete.")


Starting to generate dashboard...
Dashboard rendering complete.


# What’s new?

- Interactive Plots: Click, zoom, and hover to explore data.
- Multiple Visuals: Combines four plots in one dashboard.
- Plotly: Industry-standard tool for dashboards.