## Data Extraction

In [2]:
import pandas as pd
import logging

def configure_logging():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    return logging.getLogger(__name__)

def extract_retail_data(file_path='online_retail.csv'):
    """
    Strictly extracts data with:
    - Column names: InvoiceNo, StockCode, Description, Quantity, 
                   InvoiceDate, UnitPrice, CustomerID, Country
    - Proper data type conversion
    - Missing value handling
    """
    logger = configure_logging()
    
    try:
        logger.info("Starting extraction")
        df = pd.read_csv(
            file_path,
            encoding='ISO-8859-1',
            dtype={
                'InvoiceNo': str,
                'StockCode': str,
                'Description': str,
                'Quantity': float,
                'InvoiceDate': str,
                'UnitPrice': float,
                'CustomerID': str,
                'Country': str
            }
        )
        df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')
        
        logger.info(f"Extracted {len(df)} records")
        return df
    
    except Exception as e:
        logger.error(f"Extraction failed: {str(e)}")
        raise



extract_retail_data()

2025-08-14 19:03:24,961 - INFO - Starting extraction
2025-08-14 19:03:25,802 - INFO - Extracted 541909 records


Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6.0,2010-12-01 08:26:00,2.55,17850,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6.0,2010-12-01 08:26:00,3.39,17850,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8.0,2010-12-01 08:26:00,2.75,17850,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6.0,2010-12-01 08:26:00,3.39,17850,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6.0,2010-12-01 08:26:00,3.39,17850,United Kingdom
...,...,...,...,...,...,...,...,...
541904,581587,22613,PACK OF 20 SPACEBOY NAPKINS,12.0,2011-12-09 12:50:00,0.85,12680,France
541905,581587,22899,CHILDREN'S APRON DOLLY GIRL,6.0,2011-12-09 12:50:00,2.10,12680,France
541906,581587,23254,CHILDRENS CUTLERY DOLLY GIRL,4.0,2011-12-09 12:50:00,4.15,12680,France
541907,581587,23255,CHILDRENS CUTLERY CIRCUS PARADE,4.0,2011-12-09 12:50:00,4.15,12680,France


In [3]:
df = extract_retail_data()

2025-08-14 19:03:25,826 - INFO - Starting extraction
2025-08-14 19:03:26,712 - INFO - Extracted 541909 records


## Data Transformation

In [5]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging

def configure_logging():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    return logging.getLogger(__name__)

def transform_retail_data(df):
    """
    Strict transformation per requirements:
    1. Calculate TotalSales
    2. Remove Quantity < 0 and UnitPrice <= 0
    3. Filter last year (dynamic cutoff: max_date - 1 year)
    4. Create CustomerDim and TimeDim
    """
    logger = configure_logging()
    
    try:
        logger.info("Starting transformation")
        
        # Ensure InvoiceDate is datetime
        if not pd.api.types.is_datetime64_any_dtype(df['InvoiceDate']):
            df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')
        
        # 1. Calculate TotalSales
        df = df.copy()
        df['TotalSales'] = df['Quantity'] * df['UnitPrice']
        
        # 2. Remove invalid rows
        valid_df = df[(df['Quantity'] > 0) & (df['UnitPrice'] > 0)].copy()
        
        # 3. Filter for last year (dynamic)
        max_date = valid_df['InvoiceDate'].max()
        if pd.isna(max_date):
            raise ValueError("No valid dates found in InvoiceDate column")
            
        cutoff_date = max_date - timedelta(days=365)
        filtered_df = valid_df[valid_df['InvoiceDate'] >= cutoff_date].copy()
        
        # 4. Create dimensions
        customer_dim = filtered_df.groupby('CustomerID').agg({
            'Country': 'first',
            'TotalSales': ['sum', 'count']
        }).reset_index()
        customer_dim.columns = ['CustomerID', 'Country', 'TotalPurchases', 'TransactionCount']
        
        time_dim = pd.DataFrame({'Date': filtered_df['InvoiceDate'].dt.date.unique()})
        time_dim['DateKey'] = time_dim['Date'].astype(str).str.replace('-', '')
        time_dim['Day'] = pd.to_datetime(time_dim['Date']).dt.day
        time_dim['Month'] = pd.to_datetime(time_dim['Date']).dt.month
        time_dim['Quarter'] = pd.to_datetime(time_dim['Date']).dt.quarter
        time_dim['Year'] = pd.to_datetime(time_dim['Date']).dt.year
        
        # Prepare fact table
        fact_data = filtered_df[[
            'InvoiceNo', 'StockCode', 'Description', 'Quantity',
            'UnitPrice', 'TotalSales', 'CustomerID', 'InvoiceDate'
        ]].copy()
        fact_data['DateKey'] = fact_data['InvoiceDate'].dt.strftime('%Y%m%d')
        
        logger.info("Transformation completed")
        return fact_data, customer_dim, time_dim
        
    
    except Exception as e:
        logger.error(f"Transformation failed: {str(e)}")
        raise

transform_retail_data(df)

2025-08-14 19:03:26,747 - INFO - Starting transformation
2025-08-14 19:03:33,202 - INFO - Transformation completed


(       InvoiceNo StockCode                         Description  Quantity  \
 20240     538032     22669                   RED BABY BUNTING        5.0   
 20241     538032     22465          HANGING METAL STAR LANTERN      12.0   
 20242     538032    85123A  WHITE HANGING HEART T-LIGHT HOLDER       6.0   
 20243     538032     22727           ALARM CLOCK BAKELIKE RED        4.0   
 20244     538032     22726          ALARM CLOCK BAKELIKE GREEN       4.0   
 ...          ...       ...                                 ...       ...   
 541904    581587     22613         PACK OF 20 SPACEBOY NAPKINS      12.0   
 541905    581587     22899        CHILDREN'S APRON DOLLY GIRL        6.0   
 541906    581587     23254       CHILDRENS CUTLERY DOLLY GIRL        4.0   
 541907    581587     23255     CHILDRENS CUTLERY CIRCUS PARADE       4.0   
 541908    581587     22138       BAKING SET 9 PIECE RETROSPOT        3.0   
 
         UnitPrice  TotalSales CustomerID         InvoiceDate   DateKey  


In [6]:
fact_data, customer_dim, time_dim = transform_retail_data(df)

2025-08-14 19:03:33,270 - INFO - Starting transformation
2025-08-14 19:03:47,689 - INFO - Transformation completed


## Data Loading

In [8]:
# LOADING COMPONENT
import sqlite3

def load_retail_data(fact_data, customer_dim, time_dim, db_name='retail_dw.db'):
    """Load transformed data into SQLite database"""
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    conn = None
    try:
        logger.info("Starting loading")
        conn = sqlite3.connect(db_name)
        
        # Create schema
        conn.executescript("""
        DROP TABLE IF EXISTS CustomerDim;
        CREATE TABLE CustomerDim (
            CustomerID TEXT PRIMARY KEY,
            Country TEXT,
            TotalPurchases REAL,
            TransactionCount INTEGER
        );
        
        DROP TABLE IF EXISTS TimeDim;
        CREATE TABLE TimeDim (
            DateKey TEXT PRIMARY KEY,
            Date DATE,
            Day INTEGER,
            Month INTEGER,
            Quarter INTEGER,
            Year INTEGER
        );
        
        DROP TABLE IF EXISTS SalesFact;
        CREATE TABLE SalesFact (
            InvoiceNo TEXT,
            StockCode TEXT,
            Description TEXT,
            Quantity REAL,
            UnitPrice REAL,
            TotalSales REAL,
            CustomerID TEXT,
            InvoiceDate DATETIME,
            DateKey TEXT,
            FOREIGN KEY (CustomerID) REFERENCES CustomerDim(CustomerID),
            FOREIGN KEY (DateKey) REFERENCES TimeDim(DateKey)
        );
        """)
        
        # Load data
        customer_dim.to_sql('CustomerDim', conn, if_exists='append', index=False)
        time_dim.to_sql('TimeDim', conn, if_exists='append', index=False)
        fact_data.to_sql('SalesFact', conn, if_exists='append', index=False)
        
        logger.info("Data successfully loaded")
        return True
        
    except Exception as e:
        logger.error(f"Loading failed: {str(e)}")
        return False
        
    finally:
        if conn:
            conn.close()

# Execute loading
load_success = load_retail_data(fact_data, customer_dim, time_dim)
if load_success:
    print("ETL completed successfully!")
    # Verify loading
    conn = sqlite3.connect('retail_dw.db')
    print("\nSalesFact sample:")
    display(pd.read_sql("SELECT * FROM SalesFact LIMIT 5", conn))
    print("\nCustomerDim sample:")
    display(pd.read_sql("SELECT * FROM CustomerDim LIMIT 5", conn))
    print("\nTimeDim sample:")
    display(pd.read_sql("SELECT * FROM TimeDim LIMIT 5", conn))
    conn.close()
else:
    print("ETL failed during loading")

2025-08-14 19:03:47,774 - INFO - Starting loading
2025-08-14 19:03:51,757 - INFO - Data successfully loaded


ETL completed successfully!

SalesFact sample:


Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,UnitPrice,TotalSales,CustomerID,InvoiceDate,DateKey
0,538032,22669,RED BABY BUNTING,5.0,2.95,14.75,14479,2010-12-09 12:59:00,20101209
1,538032,22465,HANGING METAL STAR LANTERN,12.0,1.65,19.8,14479,2010-12-09 12:59:00,20101209
2,538032,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6.0,2.95,17.7,14479,2010-12-09 12:59:00,20101209
3,538032,22727,ALARM CLOCK BAKELIKE RED,4.0,3.75,15.0,14479,2010-12-09 12:59:00,20101209
4,538032,22726,ALARM CLOCK BAKELIKE GREEN,4.0,3.75,15.0,14479,2010-12-09 12:59:00,20101209



CustomerDim sample:


Unnamed: 0,CustomerID,Country,TotalPurchases,TransactionCount
0,12346,United Kingdom,77183.6,1
1,12347,Iceland,3598.21,151
2,12348,Finland,1797.24,31
3,12349,Italy,1757.55,73
4,12350,Norway,334.4,17



TimeDim sample:


Unnamed: 0,DateKey,Date,Day,Month,Quarter,Year
0,20101209,2010-12-09,9,12,4,2010
1,20101210,2010-12-10,10,12,4,2010
2,20101212,2010-12-12,12,12,4,2010
3,20101213,2010-12-13,13,12,4,2010
4,20101214,2010-12-14,14,12,4,2010


## Full ETL Function

In [10]:
import pandas as pd
import sqlite3
import numpy as np
from datetime import datetime
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('retail_etl.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# --------------------------
# EXTRACTION 
# --------------------------
def extract_data(file_path='online_retail.csv'):
    """
    Extract data from CSV file into pandas DataFrame
    Strictly handles:
    - Column names as provided
    - Data type conversions
    - Missing values
    """
    try:
        logger.info("Starting data extraction")
        
        # Read CSV with specified columns and data types
        df = pd.read_csv(
            file_path,
            encoding='ISO-8859-1',
            dtype={
                'InvoiceNo': str,
                'StockCode': str,
                'Description': str,
                'Quantity': float,
                'InvoiceDate': str,
                'UnitPrice': float,
                'CustomerID': str,
                'Country': str
            }
        )
        
        # Convert InvoiceDate to datetime
        df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'], errors='coerce')
        
        logger.info(f"Extracted {len(df)} records with columns: {list(df.columns)}")
        return df
    
    except Exception as e:
        logger.error(f"Extraction failed: {str(e)}")
        raise

# --------------------------
# TRANSFORMATION
# --------------------------
def transform_data(df):
    """
    Transform data according to exact requirements:
    1. Calculate TotalSales
    2. Create customer summary
    3. Filter for last year (2024-08-12 to 2025-08-12)
    4. Remove invalid rows
    """
    try:
        original_count = len(df)
        logger.info("Starting data transformation")
        
        # 1. Calculate TotalSales
        df['TotalSales'] = df['Quantity'] * df['UnitPrice']
        
        # 2. Remove outliers and invalid data
        df = df[(df['Quantity'] > 0) & (df['UnitPrice'] > 0)]
        valid_count = len(df)
        logger.info(f"Removed {original_count - valid_count} invalid records")
        
        # 3. Filter for last year (2024-08-12 to 2025-08-12)
        latest_date = df['InvoiceDate'].max()
        cutoff_date = latest_date - timedelta(days=365)
        df = df[df['InvoiceDate'] >= cutoff_date]
        logger.info(f"Filtered to {len(df)} records in last year")
        
        # 4. Create dimension tables
        # Customer dimension
        customer_dim = df.groupby('CustomerID').agg({
            'Country': 'first',
            'TotalSales': 'sum',
            'InvoiceNo': 'count'
        }).reset_index()
        customer_dim.columns = ['CustomerID', 'Country', 'TotalPurchases', 'TransactionCount']
        
        # Time dimension
        time_dim = pd.DataFrame({
            'Date': df['InvoiceDate'].dt.date.unique()
        })
        time_dim['DateKey'] = time_dim['Date'].astype(str).str.replace('-', '')
        time_dim['Day'] = pd.to_datetime(time_dim['Date']).dt.day
        time_dim['Month'] = pd.to_datetime(time_dim['Date']).dt.month
        time_dim['Quarter'] = pd.to_datetime(time_dim['Date']).dt.quarter
        time_dim['Year'] = pd.to_datetime(time_dim['Date']).dt.year
        
        # Prepare fact table
        fact_data = df[[
            'InvoiceNo', 'StockCode', 'Description', 'Quantity',
            'UnitPrice', 'TotalSales', 'CustomerID', 'InvoiceDate'
        ]].copy()
        fact_data['DateKey'] = fact_data['InvoiceDate'].dt.strftime('%Y%m%d')
        
        logger.info(f"Created dimensions: {len(customer_dim)} customers, {len(time_dim)} dates")
        return fact_data, customer_dim, time_dim
    
    except Exception as e:
        logger.error(f"Transformation failed: {str(e)}")
        raise

# --------------------------
# LOADING
# --------------------------
def load_data(fact_data, customer_dim, time_dim, db_name='retail_dw.db'):
    """
    Load data into SQLite database with exact schema:
    - SalesFact table
    - CustomerDim table
    - TimeDim table
    """
    try:
        logger.info("Starting data loading")
        conn = sqlite3.connect(db_name)
        
        # Create tables with exact required schema
        conn.executescript("""
        DROP TABLE IF EXISTS CustomerDim;
        CREATE TABLE CustomerDim (
            CustomerID TEXT PRIMARY KEY,
            Country TEXT,
            TotalPurchases REAL,
            TransactionCount INTEGER
        );
        
        DROP TABLE IF EXISTS TimeDim;
        CREATE TABLE TimeDim (
            DateKey TEXT PRIMARY KEY,
            Date DATE,
            Day INTEGER,
            Month INTEGER,
            Quarter INTEGER,
            Year INTEGER
        );
        
        DROP TABLE IF EXISTS SalesFact;
        CREATE TABLE SalesFact (
            InvoiceNo TEXT,
            StockCode TEXT,
            Description TEXT,
            Quantity REAL,
            UnitPrice REAL,
            TotalSales REAL,
            CustomerID TEXT,
            InvoiceDate DATETIME,
            DateKey TEXT,
            FOREIGN KEY (CustomerID) REFERENCES CustomerDim(CustomerID),
            FOREIGN KEY (DateKey) REFERENCES TimeDim(DateKey)
        );
        """)
        
        # Load data
        customer_dim.to_sql('CustomerDim', conn, if_exists='append', index=False)
        time_dim.to_sql('TimeDim', conn, if_exists='append', index=False)
        fact_data.to_sql('SalesFact', conn, if_exists='append', index=False)
        
        logger.info(f"Successfully loaded data into {db_name}")
        conn.close()
        return True
    
    except Exception as e:
        logger.error(f"Loading failed: {str(e)}")
        if 'conn' in locals():
            conn.close()
        return False

# --------------------------
# FULL ETL PIPELINE
# --------------------------
def run_etl_pipeline():
    """Complete ETL process with logging at each stage"""
    try:
        logger.info("=== STARTING ETL PIPELINE ===")
        
        # 1. EXTRACT
        df = extract_data()
        logger.info(f"Extracted {len(df)} records")
        
        # 2. TRANSFORM
        fact_data, customer_dim, time_dim = transform_data(df)
        logger.info(f"Transformed data: {len(fact_data)} fact records")
        
        # 3. LOAD
        if load_data(fact_data, customer_dim, time_dim):
            logger.info("=== ETL COMPLETED SUCCESSFULLY ===")
            return True
        else:
            logger.error("=== ETL FAILED DURING LOADING ===")
            return False
            
    except Exception as e:
        logger.error(f"ETL pipeline failed: {str(e)}")
        return False

# To execute the pipeline uncomment the following code block
# if __name__ == "__main__":
#     if run_etl_pipeline():
#         # Verify results
#         conn = sqlite3.connect('retail_dw.db')
#         print("\nSalesFact sample:")
#         print(pd.read_sql("SELECT * FROM SalesFact LIMIT 5", conn))
#         print("\nCustomerDim sample:")
#         print(pd.read_sql("SELECT * FROM CustomerDim LIMIT 5", conn))
#         print("\nTimeDim sample:")
#         print(pd.read_sql("SELECT * FROM TimeDim LIMIT 5", conn))
#         conn.close()
#     else:
#         print("ETL process failed - check retail_etl.log for details")