# üöÄ End-to-End Data Engineering Pipeline
## Production-Grade Cryptocurrency Analytics Platform

**Author**: Data Engineering Portfolio Project  
**Architecture**: Medallion (Bronze ‚Üí Silver ‚Üí Gold)  
**Data Source**: CoinGecko Public API  
**Storage**: SQLite Analytical Warehouse  

---

### üìã Pipeline Overview

This project demonstrates a complete data engineering workflow:

1. **Data Ingestion** - Continuous API data collection
2. **Bronze Layer** - Raw data storage (no modifications)
3. **Silver Layer** - Data cleaning, validation, deduplication
4. **Gold Layer** - Analytics-ready aggregations
5. **Statistics Engine** - Advanced metrics computation
6. **Visualization** - Business intelligence dashboards
7. **Orchestration** - Automated pipeline execution
8. **Streaming Simulation** - Continuous data processing

---

## üì¶ 1. Environment Setup & Imports

In [None]:
# Core libraries
import requests
import pandas as pd
import sqlite3
import logging
import time
import json
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import warnings

# Visualization
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.gridspec import GridSpec

# Numerical operations
import numpy as np

# Configuration
warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8-darkgrid')
%matplotlib inline

print("‚úÖ All libraries imported successfully")

: 

## üîß 2. Configuration & Logging Setup

In [None]:
# Pipeline Configuration
class PipelineConfig:
    """Central configuration for the data pipeline"""
    
    # API Configuration
    API_BASE_URL = "https://api.coingecko.com/api/v3"
    CRYPTO_IDS = ["bitcoin", "ethereum", "cardano", "solana", "polkadot"]
    VS_CURRENCY = "usd"
    API_TIMEOUT = 10
    
    # Database Configuration
    DB_PATH = "crypto_analytics.db"
    
    # Table Names (Medallion Architecture)
    BRONZE_TABLE = "bronze_raw_prices"
    SILVER_TABLE = "silver_cleaned_prices"
    GOLD_TABLE = "gold_analytics"
    
    # Processing Configuration
    BATCH_SIZE = 100
    ROLLING_WINDOW = 5  # for rolling averages
    
    # Streaming Simulation
    INGESTION_INTERVAL = 10  # seconds between API calls
    MAX_ITERATIONS = 20  # for demo purposes


# Logging Configuration
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

logger = logging.getLogger('DataPipeline')
logger.info("üéØ Pipeline configuration loaded successfully")

## üóÑÔ∏è 3. Database Initialization

Setting up SQLite warehouse with Medallion Architecture tables

In [None]:
def initialize_database() -> None:
    """
    Initialize SQLite database with Bronze, Silver, and Gold layer tables.
    
    Bronze: Raw data with full API response
    Silver: Cleaned, validated, deduplicated data
    Gold: Aggregated analytics-ready data
    """
    conn = sqlite3.connect(PipelineConfig.DB_PATH)
    cursor = conn.cursor()
    
    # Bronze Layer - Raw ingestion
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {PipelineConfig.BRONZE_TABLE} (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            crypto_id TEXT NOT NULL,
            price REAL,
            market_cap REAL,
            volume_24h REAL,
            price_change_24h REAL,
            ingestion_timestamp TEXT NOT NULL,
            raw_json TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    
    # Silver Layer - Cleaned data
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {PipelineConfig.SILVER_TABLE} (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            crypto_id TEXT NOT NULL,
            price REAL NOT NULL,
            market_cap REAL NOT NULL,
            volume_24h REAL NOT NULL,
            price_change_24h REAL,
            ingestion_timestamp TEXT NOT NULL,
            processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            UNIQUE(crypto_id, ingestion_timestamp)
        )
    """)
    
    # Gold Layer - Analytics
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {PipelineConfig.GOLD_TABLE} (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            crypto_id TEXT NOT NULL,
            avg_price REAL,
            min_price REAL,
            max_price REAL,
            std_price REAL,
            total_volume REAL,
            avg_market_cap REAL,
            data_points INTEGER,
            calculation_timestamp TEXT NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    
    conn.commit()
    conn.close()
    
    logger.info("‚úÖ Database initialized with Bronze, Silver, and Gold tables")


# Initialize the database
initialize_database()

## ü•â 4. Bronze Layer - Raw Data Ingestion

Fetch live cryptocurrency data from CoinGecko API and store as-is

In [None]:
def fetch_crypto_prices() -> Optional[List[Dict]]:
    """
    Fetch current cryptocurrency prices from CoinGecko API.
    
    Returns:
        List of dictionaries containing price data, or None on failure
    """
    try:
        url = f"{PipelineConfig.API_BASE_URL}/simple/price"
        params = {
            'ids': ','.join(PipelineConfig.CRYPTO_IDS),
            'vs_currencies': PipelineConfig.VS_CURRENCY,
            'include_market_cap': 'true',
            'include_24hr_vol': 'true',
            'include_24hr_change': 'true'
        }
        
        response = requests.get(url, params=params, timeout=PipelineConfig.API_TIMEOUT)
        response.raise_for_status()
        
        data = response.json()
        logger.info(f"‚úÖ Successfully fetched data for {len(data)} cryptocurrencies")
        return data
        
    except requests.exceptions.RequestException as e:
        logger.error(f"‚ùå API request failed: {e}")
        return None
    except Exception as e:
        logger.error(f"‚ùå Unexpected error in data fetch: {e}")
        return None


def ingest_to_bronze(data: Dict) -> int:
    """
    Insert raw API data into Bronze layer without any transformation.
    
    Args:
        data: Raw API response dictionary
        
    Returns:
        Number of records inserted
    """
    if not data:
        logger.warning("‚ö†Ô∏è No data to ingest to Bronze layer")
        return 0
    
    conn = sqlite3.connect(PipelineConfig.DB_PATH)
    cursor = conn.cursor()
    
    ingestion_time = datetime.now().isoformat()
    records_inserted = 0
    
    for crypto_id, metrics in data.items():
        try:
            cursor.execute(f"""
                INSERT INTO {PipelineConfig.BRONZE_TABLE} 
                (crypto_id, price, market_cap, volume_24h, price_change_24h, 
                 ingestion_timestamp, raw_json)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            """, (
                crypto_id,
                metrics.get(f'{PipelineConfig.VS_CURRENCY}'),
                metrics.get(f'{PipelineConfig.VS_CURRENCY}_market_cap'),
                metrics.get(f'{PipelineConfig.VS_CURRENCY}_24h_vol'),
                metrics.get(f'{PipelineConfig.VS_CURRENCY}_24h_change'),
                ingestion_time,
                json.dumps(metrics)
            ))
            records_inserted += 1
        except Exception as e:
            logger.error(f"‚ùå Failed to insert {crypto_id} to Bronze: {e}")
    
    conn.commit()
    conn.close()
    
    logger.info(f"‚úÖ Bronze Layer: Inserted {records_inserted} raw records")
    return records_inserted


# Test the ingestion
test_data = fetch_crypto_prices()
if test_data:
    ingest_to_bronze(test_data)
    print("\nüìä Sample Bronze Layer Data:")
    df_bronze = pd.read_sql_query(
        f"SELECT * FROM {PipelineConfig.BRONZE_TABLE} LIMIT 5", 
        sqlite3.connect(PipelineConfig.DB_PATH)
    )
    print(df_bronze[['crypto_id', 'price', 'market_cap', 'ingestion_timestamp']])

## ü•à 5. Silver Layer - Data Cleaning & Validation

Clean, validate, and deduplicate data from Bronze layer

In [None]:
def validate_record(record: pd.Series) -> bool:
    """
    Validate a single data record against business rules.
    
    Validation rules:
    - Price must be positive
    - Market cap must be positive
    - Volume must be non-negative
    - No null values in critical fields
    """
    try:
        if pd.isna(record['price']) or record['price'] <= 0:
            return False
        if pd.isna(record['market_cap']) or record['market_cap'] <= 0:
            return False
        if pd.isna(record['volume_24h']) or record['volume_24h'] < 0:
            return False
        if pd.isna(record['crypto_id']) or pd.isna(record['ingestion_timestamp']):
            return False
        return True
    except Exception:
        return False


def clean_and_load_silver() -> int:
    """
    Process Bronze layer data: clean, validate, deduplicate, and load to Silver.
    
    Returns:
        Number of records successfully processed to Silver layer
    """
    conn = sqlite3.connect(PipelineConfig.DB_PATH)
    
    # Read unprocessed Bronze records
    query = f"""
        SELECT DISTINCT b.* 
        FROM {PipelineConfig.BRONZE_TABLE} b
        LEFT JOIN {PipelineConfig.SILVER_TABLE} s 
            ON b.crypto_id = s.crypto_id 
            AND b.ingestion_timestamp = s.ingestion_timestamp
        WHERE s.id IS NULL
    """
    
    df_bronze = pd.read_sql_query(query, conn)
    
    if df_bronze.empty:
        logger.info("‚ÑπÔ∏è No new Bronze records to process")
        conn.close()
        return 0
    
    logger.info(f"üîÑ Processing {len(df_bronze)} Bronze records")
    
    # Data Quality Checks
    initial_count = len(df_bronze)
    
    # Remove duplicates
    df_clean = df_bronze.drop_duplicates(
        subset=['crypto_id', 'ingestion_timestamp'], 
        keep='first'
    )
    duplicates_removed = initial_count - len(df_clean)
    
    # Validate records
    df_clean['is_valid'] = df_clean.apply(validate_record, axis=1)
    df_valid = df_clean[df_clean['is_valid']].copy()
    invalid_records = len(df_clean) - len(df_valid)
    
    # Handle nulls in optional fields
    df_valid['price_change_24h'] = df_valid['price_change_24h'].fillna(0)
    
    # Insert into Silver layer
    records_inserted = 0
    cursor = conn.cursor()
    
    for _, row in df_valid.iterrows():
        try:
            cursor.execute(f"""
                INSERT OR IGNORE INTO {PipelineConfig.SILVER_TABLE}
                (crypto_id, price, market_cap, volume_24h, price_change_24h, ingestion_timestamp)
                VALUES (?, ?, ?, ?, ?, ?)
            """, (
                row['crypto_id'],
                row['price'],
                row['market_cap'],
                row['volume_24h'],
                row['price_change_24h'],
                row['ingestion_timestamp']
            ))
            records_inserted += cursor.rowcount
        except Exception as e:
            logger.error(f"‚ùå Failed to insert to Silver: {e}")
    
    conn.commit()
    conn.close()
    
    logger.info(f"‚úÖ Silver Layer: Processed {records_inserted} records")
    logger.info(f"   üìä Duplicates removed: {duplicates_removed}")
    logger.info(f"   üìä Invalid records filtered: {invalid_records}")
    
    return records_inserted


# Test Silver layer processing
clean_and_load_silver()
print("\nüìä Sample Silver Layer Data:")
df_silver = pd.read_sql_query(
    f"SELECT * FROM {PipelineConfig.SILVER_TABLE} ORDER BY processed_at DESC LIMIT 5",
    sqlite3.connect(PipelineConfig.DB_PATH)
)
print(df_silver[['crypto_id', 'price', 'market_cap', 'volume_24h']])

## ü•á 6. Gold Layer - Analytics Aggregations

Create business-ready analytics tables with aggregated metrics

In [None]:
def compute_gold_analytics() -> int:
    """
    Compute aggregated analytics from Silver layer and store in Gold layer.
    
    Metrics computed per cryptocurrency:
    - Average, min, max, std deviation of price
    - Total trading volume
    - Average market capitalization
    - Number of data points
    
    Returns:
        Number of analytics records created
    """
    conn = sqlite3.connect(PipelineConfig.DB_PATH)
    
    # Read Silver layer data
    df_silver = pd.read_sql_query(
        f"SELECT * FROM {PipelineConfig.SILVER_TABLE}",
        conn
    )
    
    if df_silver.empty:
        logger.warning("‚ö†Ô∏è No Silver data available for analytics")
        conn.close()
        return 0
    
    # Compute aggregations per cryptocurrency
    analytics = df_silver.groupby('crypto_id').agg({
        'price': ['mean', 'min', 'max', 'std'],
        'volume_24h': 'sum',
        'market_cap': 'mean',
        'id': 'count'
    }).reset_index()
    
    # Flatten column names
    analytics.columns = [
        'crypto_id', 'avg_price', 'min_price', 'max_price', 'std_price',
        'total_volume', 'avg_market_cap', 'data_points'
    ]
    
    # Handle NaN in std (occurs when only 1 data point)
    analytics['std_price'] = analytics['std_price'].fillna(0)
    
    # Insert into Gold layer
    cursor = conn.cursor()
    calculation_time = datetime.now().isoformat()
    records_inserted = 0
    
    for _, row in analytics.iterrows():
        try:
            cursor.execute(f"""
                INSERT INTO {PipelineConfig.GOLD_TABLE}
                (crypto_id, avg_price, min_price, max_price, std_price,
                 total_volume, avg_market_cap, data_points, calculation_timestamp)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                row['crypto_id'],
                row['avg_price'],
                row['min_price'],
                row['max_price'],
                row['std_price'],
                row['total_volume'],
                row['avg_market_cap'],
                row['data_points'],
                calculation_time
            ))
            records_inserted += 1
        except Exception as e:
            logger.error(f"‚ùå Failed to insert Gold analytics: {e}")
    
    conn.commit()
    conn.close()
    
    logger.info(f"‚úÖ Gold Layer: Created {records_inserted} analytics records")
    return records_inserted


# Test Gold layer
compute_gold_analytics()
print("\nüìä Sample Gold Layer Analytics:")
df_gold = pd.read_sql_query(
    f"SELECT * FROM {PipelineConfig.GOLD_TABLE} ORDER BY created_at DESC LIMIT 5",
    sqlite3.connect(PipelineConfig.DB_PATH)
)
print(df_gold[['crypto_id', 'avg_price', 'min_price', 'max_price', 'std_price', 'data_points']])

## üìä 7. Advanced Statistics Engine

Compute sophisticated metrics: rolling averages, returns, correlations

In [None]:
def compute_advanced_statistics() -> pd.DataFrame:
    """
    Compute advanced statistical metrics on Silver layer data.
    
    Metrics:
    - Rolling averages (configurable window)
    - Percentage returns
    - Price volatility (std deviation)
    - Cross-asset correlation matrix
    
    Returns:
        DataFrame with enriched statistics
    """
    conn = sqlite3.connect(PipelineConfig.DB_PATH)
    
    # Load Silver data with proper timestamp parsing
    df = pd.read_sql_query(
        f"SELECT * FROM {PipelineConfig.SILVER_TABLE} ORDER BY ingestion_timestamp",
        conn
    )
    conn.close()
    
    if df.empty or len(df) < 2:
        logger.warning("‚ö†Ô∏è Insufficient data for advanced statistics")
        return pd.DataFrame()
    
    df['ingestion_timestamp'] = pd.to_datetime(df['ingestion_timestamp'])
    
    # Compute statistics per cryptocurrency
    results = []
    
    for crypto_id in df['crypto_id'].unique():
        crypto_df = df[df['crypto_id'] == crypto_id].sort_values('ingestion_timestamp').copy()
        
        if len(crypto_df) < 2:
            continue
        
        # Rolling average
        crypto_df['rolling_avg'] = crypto_df['price'].rolling(
            window=min(PipelineConfig.ROLLING_WINDOW, len(crypto_df)),
            min_periods=1
        ).mean()
        
        # Percentage returns
        crypto_df['returns_pct'] = crypto_df['price'].pct_change() * 100
        
        # Volatility (rolling std)
        crypto_df['volatility'] = crypto_df['price'].rolling(
            window=min(PipelineConfig.ROLLING_WINDOW, len(crypto_df)),
            min_periods=1
        ).std()
        
        results.append(crypto_df)
    
    if not results:
        return pd.DataFrame()
    
    df_enriched = pd.concat(results, ignore_index=True)
    
    logger.info(f"‚úÖ Computed advanced statistics for {df_enriched['crypto_id'].nunique()} assets")
    
    return df_enriched


def compute_correlation_matrix() -> Optional[pd.DataFrame]:
    """
    Compute price correlation matrix across all cryptocurrencies.
    
    Returns:
        Correlation matrix DataFrame or None if insufficient data
    """
    conn = sqlite3.connect(PipelineConfig.DB_PATH)
    
    df = pd.read_sql_query(
        f"SELECT crypto_id, price, ingestion_timestamp FROM {PipelineConfig.SILVER_TABLE}",
        conn
    )
    conn.close()
    
    if df.empty:
        return None
    
    # Pivot to get prices per crypto over time
    df_pivot = df.pivot_table(
        index='ingestion_timestamp',
        columns='crypto_id',
        values='price',
        aggfunc='mean'
    )
    
    if df_pivot.shape[0] < 2 or df_pivot.shape[1] < 2:
        logger.warning("‚ö†Ô∏è Insufficient data for correlation matrix")
        return None
    
    corr_matrix = df_pivot.corr()
    
    logger.info(f"‚úÖ Computed correlation matrix for {len(corr_matrix)} assets")
    
    return corr_matrix


# Test statistics computation
df_stats = compute_advanced_statistics()
if not df_stats.empty:
    print("\nüìä Advanced Statistics Sample:")
    print(df_stats[['crypto_id', 'price', 'rolling_avg', 'returns_pct', 'volatility']].head(10))

corr_matrix = compute_correlation_matrix()
if corr_matrix is not None:
    print("\nüìä Price Correlation Matrix:")
    print(corr_matrix)

## üìà 8. Visualization Dashboard

Create comprehensive visualizations for data insights

In [None]:
def create_visualizations(df_stats: pd.DataFrame, corr_matrix: Optional[pd.DataFrame]) -> None:
    """
    Generate comprehensive visualization dashboard.
    
    Visualizations:
    1. Price trends over time
    2. Rolling averages
    3. Percentage returns
    4. Correlation heatmap
    
    Args:
        df_stats: DataFrame with computed statistics
        corr_matrix: Correlation matrix DataFrame
    """
    if df_stats.empty:
        logger.warning("‚ö†Ô∏è No data available for visualization")
        return
    
    # Create figure with subplots
    fig = plt.figure(figsize=(16, 12))
    gs = GridSpec(3, 2, figure=fig, hspace=0.3, wspace=0.3)
    
    # 1. Price Trends
    ax1 = fig.add_subplot(gs[0, :])
    for crypto_id in df_stats['crypto_id'].unique():
        crypto_data = df_stats[df_stats['crypto_id'] == crypto_id]
        ax1.plot(
            crypto_data['ingestion_timestamp'],
            crypto_data['price'],
            marker='o',
            label=crypto_id.capitalize(),
            linewidth=2
        )
    ax1.set_title('Cryptocurrency Price Trends', fontsize=14, fontweight='bold')
    ax1.set_xlabel('Time', fontsize=11)
    ax1.set_ylabel('Price (USD)', fontsize=11)
    ax1.legend(loc='best')
    ax1.grid(True, alpha=0.3)
    ax1.tick_params(axis='x', rotation=45)
    
    # 2. Rolling Averages
    ax2 = fig.add_subplot(gs[1, 0])
    for crypto_id in df_stats['crypto_id'].unique():
        crypto_data = df_stats[df_stats['crypto_id'] == crypto_id]
        ax2.plot(
            crypto_data['ingestion_timestamp'],
            crypto_data['rolling_avg'],
            marker='s',
            label=crypto_id.capitalize(),
            linewidth=2,
            alpha=0.7
        )
    ax2.set_title(f'Rolling Average (Window={PipelineConfig.ROLLING_WINDOW})', 
                  fontsize=12, fontweight='bold')
    ax2.set_xlabel('Time', fontsize=10)
    ax2.set_ylabel('Rolling Avg Price (USD)', fontsize=10)
    ax2.legend(loc='best', fontsize=8)
    ax2.grid(True, alpha=0.3)
    ax2.tick_params(axis='x', rotation=45)
    
    # 3. Percentage Returns
    ax3 = fig.add_subplot(gs[1, 1])
    for crypto_id in df_stats['crypto_id'].unique():
        crypto_data = df_stats[df_stats['crypto_id'] == crypto_id]
        ax3.plot(
            crypto_data['ingestion_timestamp'],
            crypto_data['returns_pct'],
            marker='D',
            label=crypto_id.capitalize(),
            linewidth=1.5,
            alpha=0.7
        )
    ax3.axhline(y=0, color='red', linestyle='--', linewidth=1, alpha=0.5)
    ax3.set_title('Percentage Returns', fontsize=12, fontweight='bold')
    ax3.set_xlabel('Time', fontsize=10)
    ax3.set_ylabel('Returns (%)', fontsize=10)
    ax3.legend(loc='best', fontsize=8)
    ax3.grid(True, alpha=0.3)
    ax3.tick_params(axis='x', rotation=45)
    
    # 4. Correlation Heatmap
    ax4 = fig.add_subplot(gs[2, :])
    if corr_matrix is not None and not corr_matrix.empty:
        im = ax4.imshow(corr_matrix, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1)
        ax4.set_xticks(range(len(corr_matrix.columns)))
        ax4.set_yticks(range(len(corr_matrix.index)))
        ax4.set_xticklabels([col.capitalize() for col in corr_matrix.columns], rotation=45)
        ax4.set_yticklabels([idx.capitalize() for idx in corr_matrix.index])
        
        # Add correlation values
        for i in range(len(corr_matrix.index)):
            for j in range(len(corr_matrix.columns)):
                text = ax4.text(j, i, f'{corr_matrix.iloc[i, j]:.2f}',
                               ha="center", va="center", color="black", fontsize=9)
        
        plt.colorbar(im, ax=ax4, label='Correlation Coefficient')
        ax4.set_title('Price Correlation Matrix', fontsize=12, fontweight='bold')
    else:
        ax4.text(0.5, 0.5, 'Insufficient data for correlation matrix',
                ha='center', va='center', fontsize=12)
        ax4.set_xlim(0, 1)
        ax4.set_ylim(0, 1)
        ax4.axis('off')
    
    plt.suptitle('Cryptocurrency Analytics Dashboard', 
                 fontsize=16, fontweight='bold', y=0.995)
    
    plt.tight_layout()
    plt.show()
    
    logger.info("‚úÖ Visualizations generated successfully")


# Generate visualizations
if not df_stats.empty:
    create_visualizations(df_stats, corr_matrix)

## üîÑ 9. Pipeline Orchestration

Master function to execute the complete end-to-end pipeline

In [None]:
def run_pipeline(visualize: bool = True) -> Dict[str, int]:
    """
    Execute the complete data engineering pipeline.
    
    Pipeline stages:
    1. Fetch data from API
    2. Ingest to Bronze layer (raw)
    3. Clean and load to Silver layer
    4. Compute Gold layer analytics
    5. Calculate advanced statistics
    6. Generate visualizations (optional)
    
    Args:
        visualize: Whether to generate visualizations
        
    Returns:
        Dictionary with record counts per stage
    """
    logger.info("="*60)
    logger.info("üöÄ STARTING DATA PIPELINE EXECUTION")
    logger.info("="*60)
    
    pipeline_stats = {
        'bronze_records': 0,
        'silver_records': 0,
        'gold_records': 0
    }
    
    try:
        # Stage 1: Data Ingestion
        logger.info("\nüì• Stage 1: Data Ingestion")
        raw_data = fetch_crypto_prices()
        
        if not raw_data:
            logger.error("‚ùå Pipeline failed: No data fetched")
            return pipeline_stats
        
        # Stage 2: Bronze Layer
        logger.info("\nü•â Stage 2: Bronze Layer (Raw Storage)")
        pipeline_stats['bronze_records'] = ingest_to_bronze(raw_data)
        
        # Stage 3: Silver Layer
        logger.info("\nü•à Stage 3: Silver Layer (Cleaning & Validation)")
        pipeline_stats['silver_records'] = clean_and_load_silver()
        
        # Stage 4: Gold Layer
        logger.info("\nü•á Stage 4: Gold Layer (Analytics)")
        pipeline_stats['gold_records'] = compute_gold_analytics()
        
        # Stage 5: Advanced Statistics
        logger.info("\nüìä Stage 5: Advanced Statistics")
        df_stats = compute_advanced_statistics()
        corr_matrix = compute_correlation_matrix()
        
        # Stage 6: Visualization
        if visualize and not df_stats.empty:
            logger.info("\nüìà Stage 6: Generating Visualizations")
            create_visualizations(df_stats, corr_matrix)
        
        logger.info("\n" + "="*60)
        logger.info("‚úÖ PIPELINE EXECUTION COMPLETED SUCCESSFULLY")
        logger.info("="*60)
        logger.info(f"üìä Pipeline Statistics:")
        logger.info(f"   Bronze records: {pipeline_stats['bronze_records']}")
        logger.info(f"   Silver records: {pipeline_stats['silver_records']}")
        logger.info(f"   Gold records: {pipeline_stats['gold_records']}")
        
    except Exception as e:
        logger.error(f"‚ùå Pipeline failed with error: {e}")
        import traceback
        logger.error(traceback.format_exc())
    
    return pipeline_stats


# Execute the pipeline
stats = run_pipeline(visualize=True)

## üîÅ 10. Continuous Ingestion Simulation

Simulate streaming data pipeline with periodic execution

In [None]:
def run_streaming_pipeline(iterations: int = None, interval: int = None) -> None:
    """
    Simulate continuous data ingestion and processing.
    
    This function runs the pipeline in a loop, simulating a real-time
    streaming data pipeline that processes data incrementally.
    
    Args:
        iterations: Number of iterations (default from config)
        interval: Seconds between iterations (default from config)
    """
    iterations = iterations or PipelineConfig.MAX_ITERATIONS
    interval = interval or PipelineConfig.INGESTION_INTERVAL
    
    logger.info("="*60)
    logger.info("üåä STARTING STREAMING PIPELINE SIMULATION")
    logger.info(f"   Iterations: {iterations}")
    logger.info(f"   Interval: {interval} seconds")
    logger.info("="*60)
    
    total_stats = {
        'total_bronze': 0,
        'total_silver': 0,
        'total_gold': 0
    }
    
    for i in range(1, iterations + 1):
        logger.info(f"\n{'='*60}")
        logger.info(f"üîÑ Iteration {i}/{iterations}")
        logger.info(f"{'='*60}")
        
        # Run pipeline without visualization for intermediate iterations
        visualize = (i == iterations)  # Only visualize on last iteration
        stats = run_pipeline(visualize=visualize)
        
        # Accumulate statistics
        total_stats['total_bronze'] += stats['bronze_records']
        total_stats['total_silver'] += stats['silver_records']
        total_stats['total_gold'] += stats['gold_records']
        
        # Wait before next iteration (except on last iteration)
        if i < iterations:
            logger.info(f"\n‚è≥ Waiting {interval} seconds before next iteration...")
            time.sleep(interval)
    
    # Final summary
    logger.info("\n" + "="*60)
    logger.info("üéâ STREAMING PIPELINE SIMULATION COMPLETED")
    logger.info("="*60)
    logger.info(f"üìä Total Records Processed:")
    logger.info(f"   Bronze: {total_stats['total_bronze']}")
    logger.info(f"   Silver: {total_stats['total_silver']}")
    logger.info(f"   Gold: {total_stats['total_gold']}")
    logger.info("="*60)
    
    # Show final database state
    conn = sqlite3.connect(PipelineConfig.DB_PATH)
    print("\nüìä Final Database State:")
    
    for table in [PipelineConfig.BRONZE_TABLE, PipelineConfig.SILVER_TABLE, PipelineConfig.GOLD_TABLE]:
        count = pd.read_sql_query(f"SELECT COUNT(*) as count FROM {table}", conn).iloc[0]['count']
        print(f"   {table}: {count} records")
    
    conn.close()


# Run streaming simulation
# Uncomment the line below to run continuous ingestion
# WARNING: This will make multiple API calls and take several minutes

# run_streaming_pipeline(iterations=5, interval=10)

## üìã 11. Data Quality & Monitoring

Query and inspect pipeline health

In [None]:
def generate_pipeline_report() -> None:
    """
    Generate comprehensive pipeline health and data quality report.
    """
    conn = sqlite3.connect(PipelineConfig.DB_PATH)
    
    print("="*70)
    print("üìä DATA PIPELINE HEALTH REPORT")
    print("="*70)
    
    # Record counts per layer
    print("\nüóÑÔ∏è RECORD COUNTS BY LAYER:")
    for table in [PipelineConfig.BRONZE_TABLE, PipelineConfig.SILVER_TABLE, PipelineConfig.GOLD_TABLE]:
        count = pd.read_sql_query(f"SELECT COUNT(*) as count FROM {table}", conn).iloc[0]['count']
        print(f"   {table.upper()}: {count:,} records")
    
    # Data quality metrics
    print("\n‚úÖ DATA QUALITY METRICS:")
    
    # Bronze to Silver conversion rate
    bronze_count = pd.read_sql_query(
        f"SELECT COUNT(*) as count FROM {PipelineConfig.BRONZE_TABLE}", conn
    ).iloc[0]['count']
    
    silver_count = pd.read_sql_query(
        f"SELECT COUNT(*) as count FROM {PipelineConfig.SILVER_TABLE}", conn
    ).iloc[0]['count']
    
    if bronze_count > 0:
        quality_rate = (silver_count / bronze_count) * 100
        print(f"   Data Quality Rate: {quality_rate:.2f}%")
        print(f"   Records Filtered: {bronze_count - silver_count}")
    
    # Latest data timestamps
    print("\nüïê LATEST DATA TIMESTAMPS:")
    for table in [PipelineConfig.BRONZE_TABLE, PipelineConfig.SILVER_TABLE]:
        latest = pd.read_sql_query(
            f"SELECT MAX(ingestion_timestamp) as latest FROM {table}", conn
        ).iloc[0]['latest']
        print(f"   {table.upper()}: {latest}")
    
    # Asset coverage
    print("\nüí∞ ASSET COVERAGE:")
    assets = pd.read_sql_query(
        f"SELECT crypto_id, COUNT(*) as records FROM {PipelineConfig.SILVER_TABLE} GROUP BY crypto_id",
        conn
    )
    for _, row in assets.iterrows():
        print(f"   {row['crypto_id'].capitalize()}: {row['records']} data points")
    
    # Latest analytics
    print("\nüìà LATEST ANALYTICS (GOLD LAYER):")
    gold_latest = pd.read_sql_query(
        f"""
        SELECT crypto_id, avg_price, min_price, max_price, std_price, data_points
        FROM {PipelineConfig.GOLD_TABLE}
        WHERE id IN (
            SELECT MAX(id) FROM {PipelineConfig.GOLD_TABLE} GROUP BY crypto_id
        )
        ORDER BY crypto_id
        """,
        conn
    )
    
    if not gold_latest.empty:
        print(gold_latest.to_string(index=False))
    else:
        print("   No analytics data available yet")
    
    conn.close()
    
    print("\n" + "="*70)


# Generate report
generate_pipeline_report()

## üéì 12. Usage Instructions & Next Steps

### Quick Start Guide

```python
# 1. Run single pipeline execution
run_pipeline(visualize=True)

# 2. Run streaming simulation (5 iterations, 10 seconds apart)
run_streaming_pipeline(iterations=5, interval=10)

# 3. Generate health report
generate_pipeline_report()
```

### Architecture Summary

**Bronze Layer** (`bronze_raw_prices`):
- Raw API data storage
- No transformations
- Includes full JSON response
- Audit trail for debugging

**Silver Layer** (`silver_cleaned_prices`):
- Cleaned and validated data
- Deduplication enforced
- Schema validation
- Production-ready quality

**Gold Layer** (`gold_analytics`):
- Aggregated business metrics
- Analytics-ready format
- Optimized for reporting
- Historical snapshots

### Key Features Demonstrated

‚úÖ **Medallion Architecture** - Industry-standard data lakehouse pattern  
‚úÖ **Data Quality Checks** - Validation, deduplication, null handling  
‚úÖ **Incremental Processing** - Processes only new data  
‚úÖ **Observability** - Comprehensive logging throughout  
‚úÖ **Error Handling** - Graceful failures with recovery  
‚úÖ **Modular Design** - Reusable, testable functions  
‚úÖ **Statistics Engine** - Rolling averages, returns, correlations  
‚úÖ **Visualization** - Business intelligence dashboards  
‚úÖ **Orchestration** - Single master pipeline function  
‚úÖ **Streaming Simulation** - Continuous data processing  

### Portfolio Talking Points

1. **Scalability**: Modular design allows easy addition of new data sources
2. **Data Quality**: Multi-layer validation ensures high-quality analytics
3. **Observability**: Logging enables monitoring and debugging
4. **Best Practices**: Follows industry-standard Medallion Architecture
5. **Production-Ready**: Error handling, incremental processing, idempotency

### Potential Enhancements

- Add Apache Airflow for scheduling
- Implement data versioning (Delta Lake)
- Add data lineage tracking
- Create dbt models for transformations
- Implement CDC (Change Data Capture)
- Add data quality tests (Great Expectations)
- Create alerting for pipeline failures
- Add partitioning for large datasets

---

**Author**: Data Engineering Portfolio Project  
**License**: MIT  
**Contact**: [Your Contact Info]  