This notebook demonstrates the complete data journey from raw source to analytics-ready platform. 

# Online Retail Data Pipeline - Executive Summary

## Business Problem

A UK-based online retailer was struggling with **inconsistent data quality** that blocked their analytics team from performing reliable sales analysis. Without a trusted data foundation, business decisions were being made on incomplete or unreliable information.

## The Dataset

**Source:** UCI Machine Learning Repository - Online Retail Dataset  
**Scope:** 541,909 transactions from December 2010 to December 2011  
**Challenge:** 24.9% missing customer data, invalid prices, duplicates, and complex business rules

## Our Solution

Built an **automated ETL pipeline** that transforms raw transactional data into an analytics-ready star schema with:

- **Built-in data quality checks** and validation rules
- **Business logic enforcement** for cancellations, returns, and wholesale orders
- **Production-ready patterns** including job tracking and historical profiling
- **Star schema optimisation** for fast analytical queries

## Key Results

- **98.56% data quality pass rate** after cleaning
- **6 business constraints** automatically identified and handled
- **534,129 clean transactions** ready for analysis
- **Complete audit trail** with job tracking and profiling history

In [None]:
# Display architecture diagram
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import pandas as pd
from IPython.display import Image, display, Markdown

display(Markdown("## Data Pipeline Architecture"))
display(Image(filename='../docs/architecture_diagram.png', width=900))

In [None]:
# Display dimensional model (star schema)
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import pandas as pd
from IPython.display import Image, display, Markdown

display(Markdown("## Dimensional Model (Star Schema)"))
display(Image(filename='../docs/online_retail_schema.png', width=900))

## Key Design Decisions

### 1. **Job ID Traceability**
- **Problem:** Need to track pipeline runs for debugging and auditing
- **Solution:** Unique job IDs (ex. `f719451c`) with comprehensive logging
- **Result:** Every run is traceable from raw data to final output

### 2. **Modular Python Architecture**
- **Problem:** Monolithic scripts are hard to maintain and test
- **Solution:** Separate classes for ingestion, profiling, cleaning, and modeling
- **Result:** Each component can be developed, tested, and reused independently

### 3. **Business Logic First Approach**
- **Problem:** Generic cleaning doesn't handle business-specific rules
- **Solution:** Implemented 6 specific business constraints:
  1. Cancellation handling (9,288 transactions)
  2. Missing CustomerID management (135,080 records)  
  3. Negative quantity validation (returns vs errors)
  4. Invalid price filtering (2,512 records)
  5. Extreme quantity flagging (wholesale orders)
  6. Missing description handling

### 4. **Star Schema for Analytics**
- **Problem:** Transactional data is hard to query for business questions
- **Solution:** Dimensional modeling with fact and dimension tables
- **Result:** Optimised for business intelligence and reporting

### 5. **Multiple Output Formats**
- **Problem:** Different consumers need different data formats
- **Solution:** Generate both SQLite (relational queries) and Parquet (analytical processing)
- **Result:** Flexibility for both SQL analysts and data scientists

In [None]:
# Quick stats from logs
stats_data = {
    'Metric': ['Initial Data Volume', 'Final Clean Data', 'Data Quality Pass Rate', 
               'Business Rules Applied', 'Dimensional Model Size'],
    'Value': ['541,909 transactions', '534,129 transactions', '98.56%', 
              '6 constraints', '538,813 total records']
}

stats_df = pd.DataFrame(stats_data)
display(stats_df.style.hide())

# Data Ingestion

## Ingestion Strategy

**Challenge:** Reliably acquire data from an external source with potential API failures  
**Solution:** Multi-layered approach with fallback mechanisms

### Key Features:
- **UCI ML Repository API** as primary source
- **Local file fallback** for reliability
- **Job ID integration** for traceability

## Raw Data Assessment

**Initial Dataset:** 541,909 transactions across 8 dimensions
- Transaction details (InvoiceNo, StockCode, Description)
- Quantitative measures (Quantity, UnitPrice)  
- Customer information (CustomerID, Country)
- Temporal data (InvoiceDate)

**Immediate Observations:**
- Mixed data types requiring careful handling
- Potential data quality issues visible at ingestion

In [None]:
#Load Raw Data
import sys
import os
import logging
from IPython.display import display, Markdown

logging.getLogger().setLevel(logging.WARNING)

project_root = os.path.abspath(os.path.join('..'))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

display(Markdown("## Step 1: Data Ingestion"))

from src.data_ingestion import DataIngestion

ingestion = DataIngestion()
raw_df, raw_path = ingestion.fetch_data(save_local=True)

print(f"Raw data loaded: {len(raw_df):,} rows, {len(raw_df.columns)} columns")
print(f"Saved to: {raw_path}")

# Show sample of raw data
display(Markdown("### Sample Raw Data:"))
display(raw_df.head(10))

# Data Quality Assessment

## Comprehensive Data Quality Analysis

**Objective:** Systematically identify and quantify data quality issues before transformation  
**Approach:** Automated profiling with business context awareness

## Critical Findings from Profiling

**Major Data Quality Issues Identified:**
1. **135,080 missing CustomerIDs** (24.93% of records) - B2B/wholesale transactions
2. **5,268 duplicate records** - Potential data loading errors
3. **2,517 invalid prices** (≤ 0) - Data entry errors
4. **10,624 negative quantities** - Returns and cancellations
5. **1,454 missing descriptions** - Product data gaps

**Overall Data Quality Score:** 96.85% completeness

In [None]:
# Data Quality Assessment Display
import pandas as pd
from IPython.display import display, Markdown

def generate_quality_summary(df):
    """Generate comprehensive data quality summary (standalone version)"""
    
    summary = {
        'dataset_overview': {
            'row_count': len(df),
            'column_count': len(df.columns),
            'memory_usage_mb': round(df.memory_usage(deep=True).sum() / 1024**2, 2)
        },
        'column_types': df.dtypes.astype(str).to_dict(),
        'completeness': {
            'missing_values': df.isnull().sum().to_dict(),
            'missing_percentage': (df.isnull().sum() / len(df) * 100).round(2).to_dict(),
            'completeness_score': round((1 - df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100, 2)
        },
        'data_quality_issues': {
            'duplicate_rows': df.duplicated().sum(),
            'negative_quantities': (df['Quantity'] < 0).sum() if 'Quantity' in df.columns else 0,
            'zero_quantities': (df['Quantity'] == 0).sum() if 'Quantity' in df.columns else 0,
            'non_positive_prices': (df['UnitPrice'] <= 0).sum() if 'UnitPrice' in df.columns else 0,
            'zero_prices': (df['UnitPrice'] == 0).sum() if 'UnitPrice' in df.columns else 0,
            'missing_customer_ids': df['CustomerID'].isnull().sum() if 'CustomerID' in df.columns else 0,
            'missing_descriptions': df['Description'].isnull().sum() if 'Description' in df.columns else 0
        },
        'business_logic_constraints': identify_business_constraints(df)
    }
    
    return summary

def identify_business_constraints(df):
    """Identify business logic constraints that need to be applied"""
    constraints = []
    
    # Rule 1: Cancellations handling
    if 'InvoiceNo' in df.columns:
        cancellation_count = df['InvoiceNo'].astype(str).str.startswith('C').sum()
        constraints.append({
            'constraint': 'Cancellation transactions',
            'count': int(cancellation_count),
            'action_needed': 'Flag as cancellations but keep for refund analysis'
        })
    
    # Rule 2: Missing CustomerIDs
    if 'CustomerID' in df.columns:
        missing_customers = df['CustomerID'].isnull().sum()
        constraints.append({
            'constraint': 'Missing CustomerIDs',
            'count': int(missing_customers),
            'percentage': round((missing_customers / len(df)) * 100, 2),
            'action_needed': 'Assign surrogate key (0) for unknown customers'
        })
    
    # Rule 3: Negative Quantities
    if 'Quantity' in df.columns:
        negative_qty = (df['Quantity'] < 0).sum()
        constraints.append({
            'constraint': 'Negative Quantities',
            'count': int(negative_qty),
            'action_needed': 'Validate against cancellation flag; keep legitimate returns'
        })
    
    # Rule 4: Invalid Prices
    if 'UnitPrice' in df.columns:
        invalid_prices = (df['UnitPrice'] <= 0).sum()
        constraints.append({
            'constraint': 'Invalid Prices (≤ 0)',
            'count': int(invalid_prices),
            'action_needed': 'Exclude from fact table as they represent data errors'
        })
    
    # Rule 5: Extreme Quantities
    if 'Quantity' in df.columns:
        high_quantities = (df['Quantity'].abs() > 10000).sum()
        constraints.append({
            'constraint': 'Extreme Quantities',
            'count': int(high_quantities),
            'action_needed': 'Flag for business review but keep for wholesale analysis'
        })
    
    # Rule 6: Missing Descriptions
    if 'Description' in df.columns:
        missing_desc = df['Description'].isnull().sum()
        if missing_desc > 0:
            constraints.append({
                'constraint': 'Missing Product Descriptions',
                'count': int(missing_desc),
                'action_needed': 'Fill with "Unknown Product" placeholder'
            })
    
    return constraints

# Generate quality assessment
try:
    # Use the raw_df from previous cells
    if 'raw_df' in locals():
        quality_summary = generate_quality_summary(raw_df)
        
        display(Markdown(f"### Data Quality Scorecard"))
        
        # Overall Quality Metrics
        overall_metrics = {
            'Metric': [
                'Data Completeness Score',
                'Total Data Quality Issues', 
                'Records with Quality Flags',
                'Business Constraints Identified',
                'Data Quality Pass Rate'
            ],
            'Value': [
                f"{quality_summary['completeness']['completeness_score']}%",
                f"{sum(quality_summary['data_quality_issues'].values()):,}",
                f"{(sum(quality_summary['data_quality_issues'].values()) / quality_summary['dataset_overview']['row_count'] * 100):.1f}% of records",
                f"{len(quality_summary['business_logic_constraints'])} rules",
                '98.56% (after cleaning)'
            ]
        }
        
        metrics_df = pd.DataFrame(overall_metrics)
        display(metrics_df.style.hide(axis='index'))
        
        # Data Quality Issues Breakdown
        display(Markdown("### Data Quality Issues Breakdown"))
        
        issues_data = quality_summary['data_quality_issues']
        issues_df = pd.DataFrame({
            'Issue Type': [key.replace('_', ' ').title() for key in issues_data.keys()],
            'Count': list(issues_data.values()),
            'Percentage': [f"{(count / quality_summary['dataset_overview']['row_count'] * 100):.2f}%" 
                          for count in issues_data.values()]
        })
        
        # Sort by count descending
        issues_df = issues_df.sort_values('Count', ascending=False)
        display(issues_df.style.hide(axis='index'))
         
    else:
        display(Markdown("### No raw data available"))
        display(Markdown("Please run Cell 1 first to load the raw data."))
        
except Exception as e:
    display(Markdown("### Error in quality assessment"))
    display(Markdown(f"Error details: {str(e)}"))



In [None]:
# Interactive Data Profiling
display(Markdown("## Interactive Data Profiling"))

# Basic dataset overview
display(Markdown("### Dataset Overview"))
print(f"Shape: {raw_df.shape}")
print(f"Memory usage: {raw_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# Data types
display(Markdown("### Data Types"))
type_summary = pd.DataFrame({
    'Column': raw_df.columns,
    'Data Type': raw_df.dtypes,
    'Non-Null Count': raw_df.count(),
    'Null Count': raw_df.isnull().sum(),
    'Null Percentage': (raw_df.isnull().sum() / len(raw_df) * 100).round(2)
})
display(type_summary.style.hide(axis='index'))

In [None]:
# Example Statistical Summary
display(Markdown("## Statistical Summary"))

# Numerical columns summary
numerical_cols = ['Quantity', 'UnitPrice']
display(Markdown("### Numerical Columns Summary"))
display(raw_df[numerical_cols].describe())



In [None]:
# Display Latest Profile Report
display(Markdown("## Latest Profiling Report"))

import glob

# Find the latest profile report
report_files = glob.glob(os.path.join(project_root, 'data', 'profiling', 'reports', 'profile_report_*.txt'))
if report_files:
    latest_report = sorted(report_files)[-1]
    
    # Display report info
    display(Markdown(f"### Latest Report: `{os.path.basename(latest_report)}`"))
    
    # Read and display the report content
    with open(latest_report, 'r') as f:
        report_content = f.read()
    
    # Display in a scrollable text area
    from IPython.display import display, HTML
    display(HTML(f"<textarea rows='25' cols='120' readonly style='font-family: monospace; font-size: 12px;'>{report_content}</textarea>"))
    
else:
    display(Markdown("### No Profile Reports Found"))
    print("Run the profiling pipeline first to generate reports.")
    print("Execute: python src/main_pipeline.py")

# Data Cleaning & Business Logic Implementation

## From Raw Data to Trusted Analytics

**Objective:** Systematically apply business rules to transform raw data into clean, reliable datasets  
**Approach:** Rule-based cleaning with domain-specific logic preservation

### Cleaning Philosophy:
- **Preserve Business Context:** Don't just remove "bad" data. Understand and handle it appropriately
- **Graceful Degradation:** Handle edge cases without breaking the pipeline
- **Audit Trail:** Maintain records of what was changed and why

## Applied Business Logic Rules

**6 Critical Business Rules Implemented:**

1. **Cancellation Handling** (9,288 transactions) - Flag but preserve for refund analysis
2. **Missing CustomerID Management** (135,080 records) - Assign surrogate keys for B2B/wholesale
3. **Negative Quantity Validation** (10,624 records) - Distinguish returns from data errors  
4. **Invalid Price Filtering** (2,512 records) - Remove data entry errors
5. **Extreme Quantity Flagging** (4 records) - Identify wholesale orders for review
6. **Missing Description Handling** (1,454 records) - Standardize product information

In [None]:
# Missing Values Visualisation
display(Markdown("### Missing Values Analysis"))

# Calculate missing values
missing_data = raw_df.isnull().sum()
missing_percentage = (missing_data / len(raw_df)) * 100

# Create visualization
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

# Bar plot of missing values
missing_data[missing_data > 0].plot(kind='bar', ax=ax1, color='coral')
ax1.set_title('Missing Values by Column')
ax1.set_ylabel('Count')
ax1.tick_params(axis='x', rotation=45)

# Percentage plot
missing_percentage[missing_percentage > 0].plot(kind='bar', ax=ax2, color='lightcoral')
ax2.set_title('Missing Values Percentage')
ax2.set_ylabel('Percentage (%)')
ax2.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

# Display missing values table
missing_df = pd.DataFrame({
    'Missing Count': missing_data,
    'Missing Percentage': missing_percentage
}).round(2)
display(missing_df[missing_df['Missing Count'] > 0])

# Dimensional Modeling & Star Schema Design

## Building an Analytics-Optimised Database

**Objective:** Transform cleaned transactional data into a star schema optimised for business intelligence  
**Approach:** Kimball methodology with fact and dimension tables for fast, intuitive queries

### Star Schema Benefits:
- **Query Performance:** Denormalised structure for fast aggregations
- **Scalability:** Easy to extend with new dimensions and facts
- **Maintainability:** Clear separation of dimensions and measures

## Schema Architecture

**Fact Table:** `fact_sales` - 534,129 transaction records with measures
- **Measures:** Quantity, UnitPrice, LineTotal
- **Foreign Keys:** Date, Product, Customer
- **Flags:** Cancellations, High quantity alerts

**Dimension Tables:**
- `dim_date` - 374 date records with hierarchical attributes
- `dim_product` - 3,938 products with descriptions and lifecycle
- `dim_customer` - 4,372 customers with geographic and behavioral attributes

In [None]:
# Dimensional Modeling Implementation & Analysis
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import sqlite3
import os
from IPython.display import display, Markdown

def demonstrate_dimensional_model(cleaned_df):
    """Demonstrate the dimensional modeling process and results"""
    
    display(Markdown("### Dimensional Modeling Process"))
    
    modeling_steps = [
        {
            'step': 'Date Dimension Creation',
            'description': 'Generate complete date range with hierarchical attributes',
            'business_value': 'Enables time intelligence (YTD, QoQ, MoM analysis)'
        },
        {
            'step': 'Product Dimension Creation', 
            'description': 'Deduplicate products and track lifecycle dates',
            'business_value': 'Clean product master for categorization and trending'
        },
        {
            'step': 'Customer Dimension Creation',
            'description': 'Standardize customer information with geographic context',
            'business_value': 'Customer segmentation and geographic analysis'
        },
        {
            'step': 'Fact Table Assembly',
            'description': 'Join transactions with dimension keys and calculate measures',
            'business_value': 'Single source of truth for all sales metrics'
        }
    ]
    
    steps_df = pd.DataFrame(modeling_steps)
    display(steps_df.style.hide(axis='index'))
    
    # Check if we have the modeled data available
    db_path = '../data/model/retail_analytics.db'
    
    if os.path.exists(db_path):
        display(Markdown("### Star Schema Analysis"))
        
        # Connect to the database
        conn = sqlite3.connect(db_path)
        
        # Get table statistics
        table_stats = []
        tables = ['dim_date', 'dim_product', 'dim_customer', 'fact_sales']
        
        for table in tables:
            count = pd.read_sql(f"SELECT COUNT(*) as count FROM {table}", conn).iloc[0]['count']
            table_stats.append({
                'Table': table,
                'Records': f"{count:,}",
                'Type': 'Dimension' if table.startswith('dim_') else 'Fact',
                'Primary Key': 'date_key' if table == 'dim_date' else 
                              'product_key' if table == 'dim_product' else 
                              'customer_key' if table == 'dim_customer' else 'transaction_key'
            })
        
        stats_df = pd.DataFrame(table_stats)
        display(stats_df.style.hide(axis='index'))
        
        # Schema Visualization
        display(Markdown("### Schema Structure"))
        
        # Show sample data from each table
        display(Markdown("#### Date Dimension Sample"))
        date_sample = pd.read_sql("SELECT * FROM dim_date LIMIT 5", conn)
        display(date_sample.style.hide(axis='index'))
        
        display(Markdown("#### Product Dimension Sample")) 
        product_sample = pd.read_sql("SELECT product_key, stock_code, description, is_active FROM dim_product LIMIT 5", conn)
        display(product_sample.style.hide(axis='index'))
        
        display(Markdown("#### Customer Dimension Sample"))
        customer_sample = pd.read_sql("SELECT customer_key, customer_id, country, is_unknown_customer FROM dim_customer LIMIT 5", conn)
        display(customer_sample.style.hide(axis='index'))
        
        display(Markdown("#### Fact Table Sample"))
        fact_sample = pd.read_sql("""
            SELECT transaction_key, date_key, product_key, customer_key, 
                   quantity, unit_price, line_total, is_cancelled 
            FROM fact_sales 
            LIMIT 5
        """, conn)
        display(fact_sample.style.hide(axis='index'))      
           
        # Analytical Capabilities Demonstration
        display(Markdown("### Analytical Capabilities Enabled"))
        
        # Example 1: Monthly Revenue Trend
        display(Markdown("#### Example 1: Monthly Revenue Trend"))
        revenue_trend = pd.read_sql("""
            SELECT 
                d.year,
                d.month,
                d.month_name,
                SUM(f.line_total) as monthly_revenue,
                COUNT(*) as transaction_count
            FROM fact_sales f
            JOIN dim_date d ON f.date_key = d.date_key
            WHERE NOT f.is_cancelled
            GROUP BY d.year, d.month, d.month_name
            ORDER BY d.year, d.month
        """, conn)
        
        display(revenue_trend.style.hide(axis='index'))
        
        # Example 2: Top Products by Revenue
        display(Markdown("#### Example 2: Top 10 Products by Revenue"))
        top_products = pd.read_sql("""
            SELECT 
                p.description,
                SUM(f.line_total) as total_revenue,
                SUM(f.quantity) as total_quantity,
                COUNT(*) as transaction_count
            FROM fact_sales f
            JOIN dim_product p ON f.product_key = p.product_key
            WHERE NOT f.is_cancelled
            GROUP BY p.description
            ORDER BY total_revenue DESC
            LIMIT 10
        """, conn)
        
        display(top_products.style.hide(axis='index'))
        
        # Example 3: Customer Geographic Analysis
        display(Markdown("#### Example 3: Revenue by Country"))
        country_revenue = pd.read_sql("""
            SELECT 
                c.country,
                SUM(f.line_total) as total_revenue,
                COUNT(DISTINCT c.customer_key) as unique_customers,
                COUNT(*) as transaction_count
            FROM fact_sales f
            JOIN dim_customer c ON f.customer_key = c.customer_key
            WHERE NOT f.is_cancelled
            GROUP BY c.country
            ORDER BY total_revenue DESC
            LIMIT 10
        """, conn)
        
        display(country_revenue.style.hide(axis='index'))
        
        # Performance Benefits
        display(Markdown("### Performance & Usability Benefits"))
        
        benefits = {
            'Benefit': [
                'Query Performance',
                'Business Understanding',
                'Data Consistency',
                'Scalability',
                'Maintainability'
            ],
            'Technical Impact': [
                'Fast aggregations with pre-joined dimensions',
                'Intuitive table structure aligned with business concepts',
                'Single source of truth for metrics and dimensions',
                'Easy to add new dimensions without schema changes',
                'Clear separation of concerns between teams'
            ],
            'Business Value': [
                'Faster reporting and dashboard performance',
                'Reduced training time for business users',
                'Consistent metrics across all reports',
                'Flexible to evolving business needs',
                'Lower maintenance costs'
            ]
        }
        
        benefits_df = pd.DataFrame(benefits)
        display(benefits_df.style.hide(axis='index'))
        
        conn.close()
        
    else:
        display(Markdown("### Model database not found"))
        display(Markdown("Please run the complete pipeline to generate the dimensional model."))
        
        # Fallback: Show what the modeling would create
        display(Markdown("#### Planned Schema Structure"))
        
        schema_overview = {
            'Table': ['fact_sales', 'dim_date', 'dim_product', 'dim_customer'],
            'Records': ['534,129', '374', '3,938', '4,372'],
            'Key Columns': [
                'transaction_key, date_key, product_key, customer_key, quantity, unit_price, line_total',
                'date_key, full_date, year, quarter, month, day, day_of_week',
                'product_key, stock_code, description, first_seen_date, last_seen_date',
                'customer_key, customer_id, country, first_purchase_date, last_purchase_date'
            ]
        }
        
        schema_df = pd.DataFrame(schema_overview)
        display(schema_df.style.hide(axis='index'))

# Execute dimensional modeling demonstration
try:
    if 'cleaned_df' in locals():
        demonstrate_dimensional_model(cleaned_df)
    else:
        display(Markdown("### ❌ Cleaned data not available"))
        display(Markdown("Please run the cleaning process in previous cells first."))
        
except Exception as e:
    display(Markdown("### ❌ Error in dimensional modeling demonstration"))
    display(Markdown(f"Error details: {str(e)}"))

# Pipeline Performance and Historical Trends

## Monitoring Data Quality Evolution

**Objective:** Track pipeline performance and data quality metrics over time to identify trends and improvements  
**Approach:** Comprehensive profiling history with trend analysis and performance monitoring

## Historical Analysis Benefits

**Proactive Monitoring:** Identify degradation before it impacts business users  
**Continuous Improvement:** Measure the impact of pipeline enhancements
**Audit Compliance:** Maintain complete history of data transformations

In [None]:
# Pipeline Performance and Historical Trends Analysis
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta
from IPython.display import display, Markdown
import os

def analyse_pipeline_history():
    """Analyze pipeline performance and data quality trends over time"""
    
    display(Markdown("### Pipeline Execution History"))
    
    # Try multiple possible paths
    possible_paths = [
        'data/profiling/profiling_history.csv',  # Relative path from notebook
        '/data/profiling/profiling_history.csv',  # Absolute path you mentioned
        '../data/profiling/profiling_history.csv',  # If notebook is in docs folder
        '../profiling/profiling_history.csv'  # If profiling is in root
    ]
    
    history_df = None
    used_path = None
    
    for path in possible_paths:
        if os.path.exists(path):
            history_df = pd.read_csv(path)
            used_path = path
            display(Markdown(f"Found profiling history at: `{path}`"))
            break
    
    if history_df is None:
        display(Markdown("### No profiling history found at any expected location"))
        display(Markdown("**Tried paths:**"))
        for path in possible_paths:
            display(Markdown(f"- `{path}`"))
        display(Markdown("\n**To fix:**"))
        display(Markdown("1. Run the pipeline to generate profiling data"))
        display(Markdown("2. Check the actual file location"))
        display(Markdown("3. Update the path in this cell if needed"))
        return
    
    # Process the data
    history_df['run_timestamp'] = pd.to_datetime(history_df['run_timestamp'])
    history_df = history_df.sort_values('run_timestamp')
    
    display(Markdown(f"#### Historical Overview: {len(history_df)} Pipeline Runs"))
    
    # Basic history stats
    history_stats = {
        'Metric': [
            'Total Pipeline Runs',
            'Date Range Covered',
            'Average Data Volume',
            'Average Completeness Score',
            'Most Recent Job ID'
        ],
        'Value': [
            f"{len(history_df)}",
            f"{history_df['run_timestamp'].min().strftime('%Y-%m-%d %H:%M')} to {history_df['run_timestamp'].max().strftime('%Y-%m-%d %H:%M')}",
            f"{history_df['total_rows'].mean():,.0f} records",
            f"{history_df['completeness_score'].mean():.1f}%",
            f"{history_df.iloc[-1]['job_id']}"
        ]
    }
    
    stats_df = pd.DataFrame(history_stats)
    display(stats_df.style.hide(axis='index'))
    
    # Display recent runs
    display(Markdown("#### Recent Pipeline Executions"))
    recent_runs = history_df.tail(5)[['job_id', 'run_timestamp', 'total_rows', 'completeness_score', 'duplicate_rows']].copy()
    recent_runs['run_timestamp'] = recent_runs['run_timestamp'].dt.strftime('%Y-%m-%d %H:%M')
    recent_runs.columns = ['Job ID', 'Run Time', 'Total Rows', 'Completeness %', 'Duplicates']
    display(recent_runs.style.hide(axis='index'))       

# Execute pipeline performance analysis
try:
    analyse_pipeline_history()
except Exception as e:
    display(Markdown("### Error in pipeline performance analysis"))
    display(Markdown(f"Error details: {str(e)}"))
    
    # Debug: Show current directory and files
    display(Markdown("#### Debug Information"))
    display(Markdown(f"**Current working directory:** `{os.getcwd()}`"))
    display(Markdown("**Files in current directory:**"))
    try:
        files = os.listdir('.')
        for file in files:
            display(Markdown(f"- `{file}`"))
    except:
        display(Markdown("Could not list directory contents"))

# Analytical Queries and Business Insights

## Transforming Data into Business Value

**Objective:** Demonstrate how the cleaned, modeled data enables powerful business analytics  
**Approach:** Real-world business questions answered through SQL queries on the star schema


In [None]:
# Analytical Queries and Business Insights
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import sqlite3
import os
from IPython.display import display, Markdown

def demonstrate_business_insights():
    """Demonstrate business insights enabled by the star schema"""
    
    display(Markdown("### Business Intelligence Summary"))
    
    # Try multiple possible database paths
    possible_db_paths = [
        'data/model/retail_analytics.db',  # Relative path
        '/data/model/retail_analytics.db',  # Absolute path you mentioned
        '../data/model/retail_analytics.db'  # If notebook is in docs folder
    ]
    
    conn = None
    used_path = None
    
    for db_path in possible_db_paths:
        if os.path.exists(db_path):
            conn = sqlite3.connect(db_path)
            used_path = db_path
            display(Markdown(f"Connected to database at: `{db_path}`"))
            break
    
    if conn is None:
        display(Markdown("### Database not found at any expected location"))
        display(Markdown("**Tried paths:**"))
        for path in possible_db_paths:
            display(Markdown(f"- `{path}`"))
        display(Markdown("\n**Available files in data/model/:**"))
        try:
            model_files = os.listdir('data/model')
            for file in model_files:
                display(Markdown(f"- `{file}`"))
        except:
            display(Markdown("Could not list model directory"))
        return
    
    try:
        # Continue with the rest of the analysis...
        # Insight 1: Overall Business Performance
        display(Markdown("#### Overall Business Performance"))
        
        performance_metrics = pd.read_sql("""
            SELECT 
                COUNT(*) as total_transactions,
                SUM(CASE WHEN NOT is_cancelled THEN 1 ELSE 0 END) as successful_transactions,
                SUM(CASE WHEN is_cancelled THEN 1 ELSE 0 END) as cancelled_transactions,
                SUM(CASE WHEN NOT is_cancelled THEN line_total ELSE 0 END) as total_revenue,
                AVG(CASE WHEN NOT is_cancelled THEN line_total ELSE NULL END) as avg_transaction_value,
                COUNT(DISTINCT customer_key) as unique_customers,
                COUNT(DISTINCT product_key) as unique_products
            FROM fact_sales
        """, conn)
        
        perf_data = {
            'Metric': [
                'Total Transactions',
                'Successful Transactions', 
                'Cancellation Rate',
                'Total Revenue',
                'Average Transaction Value',
                'Unique Customers',
                'Unique Products'
            ],
            'Value': [
                f"{performance_metrics.iloc[0]['total_transactions']:,}",
                f"{performance_metrics.iloc[0]['successful_transactions']:,}",
                f"{(performance_metrics.iloc[0]['cancelled_transactions'] / performance_metrics.iloc[0]['total_transactions'] * 100):.1f}%",
                f"£{performance_metrics.iloc[0]['total_revenue']:,.2f}",
                f"£{performance_metrics.iloc[0]['avg_transaction_value']:.2f}",
                f"{performance_metrics.iloc[0]['unique_customers']:,}",
                f"{performance_metrics.iloc[0]['unique_products']:,}"
            ]
        }
        
        perf_df = pd.DataFrame(perf_data)
        display(perf_df.style.hide(axis='index'))
        
        # Continue with the rest of the insights...
        # [Rest of your analytical queries and visualizations]
        
    except Exception as e:
        display(Markdown("### Error in business insights analysis"))
        display(Markdown(f"Error details: {str(e)}"))
    finally:
        if conn:
            conn.close()

# Execute business insights demonstration
try:
    demonstrate_business_insights()
except Exception as e:
    display(Markdown("### Error in business insights analysis"))
    display(Markdown(f"Error details: {str(e)}"))