In [1]:
"""
Automated Data Pipeline for Retail Orders Analytics
Implements Bronze-Silver-Gold (Medallion) Architecture

Author: Sarah Bani Issa
Date: September 2025
Purpose: Deloitte Case Study 
"""

import pandas as pd
import numpy as np
import sqlite3
import zipfile
import os
import re
import logging
from datetime import datetime, timedelta
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')


In [1]:
# ============================================================================
# PIPELINE CONFIGURATION
# ============================================================================

class PipelineConfig:
    """Configuration settings for the data pipeline"""
    
    def __init__(self):
        self.zip_file_path = "Case_Study_202309_Data.zip"
        self.database_path = "retail_dwh.db"
        self.log_level = logging.INFO
        
        # Schema names for medallion architecture
        self.bronze_schema = "bronze_layer"
        self.silver_schema = "silver_layer" 
        self.gold_schema = "gold_layer"
        
        # Data quality thresholds
        self.max_null_percentage = 10  # Max % of nulls allowed
        self.min_records_threshold = 100  # Min records per file
        
        # File naming pattern: YYYYMM_Orders_YYYY_MM_DD_HH_MM_SS
        self.file_pattern = r'^(\d{6})_Orders_(\d{4})_(\d{2})_(\d{2})_(\d{2})_(\d{2})_(\d{2})\.csv$'

# ============================================================================
# LOGGING SETUP
# ============================================================================

def setup_logging():
    """Setup logging configuration"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('pipeline_execution.log'),
            logging.StreamHandler()
        ]
    )
    return logging.getLogger(__name__)


In [None]:
# ============================================================================
# DATABASE CONNECTION MANAGER
# ============================================================================

class DatabaseManager:
    """Manages database connections and operations"""
    
    def __init__(self, db_path):
        self.db_path = db_path
        self.logger = logging.getLogger(__name__)
    
    def get_connection(self):
        """Get database connection"""
        return sqlite3.connect(self.db_path)
    
    def execute_query(self, query, params=None):
        """Execute SQL query"""
        try:
            with self.get_connection() as conn:
                if params:
                    return conn.execute(query, params)
                else:
                    return conn.execute(query)
        except Exception as e:
            self.logger.error(f"Database query failed: {e}")
            raise
    
    def create_schemas(self):
        """Create database schemas/tables for medallion architecture"""
        self.logger.info("Creating database schemas...")
        
        # Bronze Layer - Raw data
        bronze_ddl = """
        CREATE TABLE IF NOT EXISTS bronze_orders_raw (
            row_id INTEGER,
            order_id TEXT,
            order_date TEXT,
            ship_date TEXT,
            ship_mode TEXT,
            customer_id TEXT,
            customer_name TEXT,
            segment TEXT,
            country TEXT,
            city TEXT,
            state TEXT,
            postal_code TEXT,
            region TEXT,
            product_id TEXT,
            category TEXT,
            sub_category TEXT,
            product_name TEXT,
            sales TEXT,
            quantity TEXT,
            discount TEXT,
            profit TEXT,
            source_file TEXT,
            load_timestamp TEXT,
            file_month TEXT,
            file_timestamp TEXT
        )
        """
        
        # Silver Layer - Cleaned data
        silver_ddl = """
        CREATE TABLE IF NOT EXISTS silver_orders_clean (
            row_id INTEGER,
            order_id TEXT NOT NULL,
            order_date DATE,
            ship_date DATE,
            ship_mode TEXT,
            customer_id TEXT,
            customer_name TEXT,
            segment TEXT,
            country TEXT,
            city TEXT,
            state TEXT,
            postal_code TEXT,
            region TEXT,
            product_id TEXT,
            category TEXT,
            sub_category TEXT,
            product_name TEXT,
            sales REAL,
            quantity INTEGER,
            discount REAL,
            profit REAL,
            delivery_days INTEGER,
            data_quality_score REAL,
            quality_flags TEXT,
            source_file TEXT,
            processed_timestamp TEXT
        )
        """
        
        # Gold Layer - Dimensional model
        dim_customer_ddl = """
        CREATE TABLE IF NOT EXISTS gold_dim_customer (
            customer_id INTEGER PRIMARY KEY,
            customer_id TEXT UNIQUE,
            customer_name TEXT,
            segment TEXT,
            created_date TEXT,
            updated_date TEXT
        )
        """
        
        dim_product_ddl = """
        CREATE TABLE IF NOT EXISTS gold_dim_product (
            product_id INTEGER PRIMARY KEY,
            product_id TEXT UNIQUE,
            product_name TEXT,
            category TEXT,
            sub_category TEXT,
            created_date TEXT,
            updated_date TEXT
        )
        """
        
        dim_geography_ddl = """
        CREATE TABLE IF NOT EXISTS gold_dim_geography (
            geography_id INTEGER PRIMARY KEY,
            country TEXT,
            state TEXT,
            city TEXT,
            postal_code TEXT,
            region TEXT,
            created_date TEXT,
            UNIQUE(country, state, city, postal_code)
        )
        """
        
        fact_orders_ddl = """
        CREATE TABLE IF NOT EXISTS gold_fact_orders (
            order_id INTEGER PRIMARY KEY,
            order_id TEXT,
            customer_id INTEGER,
            product_id INTEGER,
            geography_id INTEGER,
            order_date DATE,
            ship_date DATE,
            ship_mode TEXT,
            sales REAL,
            quantity INTEGER,
            discount REAL,
            profit REAL,
            delivery_days INTEGER,
            row_id INTEGER,
            load_date TEXT,
            FOREIGN KEY (customer_id) REFERENCES gold_dim_customer (customer_id),
            FOREIGN KEY (product_id) REFERENCES gold_dim_product (product_id),
            FOREIGN KEY (geography_id) REFERENCES gold_dim_geography (geography_id)
        )
        """
        
        # Execute DDL statements
        ddl_statements = [
            bronze_ddl, silver_ddl, dim_customer_ddl, 
            dim_product_ddl, dim_geography_ddl, fact_orders_ddl
        ]
        
        with self.get_connection() as conn:
            for ddl in ddl_statements:
                conn.execute(ddl)
            conn.commit()
        
        self.logger.info("✅ Database schemas created successfully")


In [None]:
# ============================================================================
# FILE PROCESSOR - Handles ZIP file extraction and latest file selection
# ============================================================================

class FileProcessor:
    """Handles file extraction and processing from ZIP archive"""
    
    def __init__(self, zip_path, config):
        self.zip_path = zip_path
        self.config = config
        self.logger = logging.getLogger(__name__)
    
    def extract_and_get_latest_files(self):
        """Extract ZIP and get latest file for each month"""
        self.logger.info(f"Processing ZIP file: {self.zip_path}")
        
        if not os.path.exists(self.zip_path):
            raise FileNotFoundError(f"ZIP file not found: {self.zip_path}")
        
        file_info = {}
        latest_files = {}
        
        try:
            with zipfile.ZipFile(self.zip_path, 'r') as zip_ref:
                csv_files = [f for f in zip_ref.namelist() if f.endswith('.csv')]
                
                self.logger.info(f"Found {len(csv_files)} CSV files in archive")
                
                # Parse file information
                for csv_file in csv_files:
                    match = re.match(self.config.file_pattern, os.path.basename(csv_file))
                    
                    if match:
                        month_key = match.group(1)  # YYYYMM
                        year, month, day, hour, minute, second = match.groups()[1:]
                        
                        file_timestamp = datetime(
                            int(year), int(month), int(day), 
                            int(hour), int(minute), int(second)
                        )
                        
                        file_info[csv_file] = {
                            'month_key': month_key,
                            'timestamp': file_timestamp,
                            'filename': csv_file
                        }
                        
                        # Keep track of latest file for each month
                        if month_key not in latest_files or file_timestamp > latest_files[month_key]['timestamp']:
                            latest_files[month_key] = file_info[csv_file]
                
                # Extract latest files only
                extracted_data = []
                for month_key, file_data in latest_files.items():
                    filename = file_data['filename']
                    self.logger.info(f"Processing latest file for {month_key}: {filename}")
                    
                    with zip_ref.open(filename) as file:
                        df = pd.read_csv(file, encoding='latin-1', dtype=str)
                        df['source_file'] = filename
                        df['file_month'] = month_key
                        df['file_timestamp'] = file_data['timestamp'].strftime('%Y-%m-%d %H:%M:%S')
                        df['load_timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                        
                        extracted_data.append(df)
                        self.logger.info(f"✅ Loaded {len(df)} records from {filename}")
                
                self.logger.info(f"✅ Processing complete. Latest files for {len(latest_files)} months")
                return extracted_data, latest_files
                
        except Exception as e:
            self.logger.error(f"Error processing ZIP file: {e}")
            raise


In [None]:
# ============================================================================
# DATA QUALITY CHECKER
# ============================================================================

class DataQualityChecker:
    """Performs data quality validation and scoring"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.quality_issues = []
    
    def validate_and_score_data(self, df):
        """Validate data and assign quality scores"""
        self.logger.info("Starting data quality validation...")
        
        quality_flags = []
        quality_score = 100.0  # Start with perfect score
        
        # Check 1: Missing critical values
        critical_columns = ['Order ID', 'Customer ID', 'Product ID', 'Order Date', 'Sales']
        for col in critical_columns:
            if col in df.columns:
                null_count = df[col].isna().sum() + (df[col] == '').sum()
                null_percentage = (null_count / len(df)) * 100
                
                if null_percentage > 0:
                    quality_flags.append(f"Missing_{col.replace(' ', '_')}")
                    quality_score -= min(null_percentage * 2, 20)  # Deduct up to 20 points
        
        # Check 2: Date format issues
        date_columns = ['Order Date', 'Ship Date']
        for col in date_columns:
            if col in df.columns:
                invalid_dates = 0
                for value in df[col].dropna():
                    if not self._is_valid_date(value):
                        invalid_dates += 1
                
                if invalid_dates > 0:
                    quality_flags.append(f"Invalid_{col.replace(' ', '_')}")
                    quality_score -= min((invalid_dates / len(df)) * 100, 15)
        
        # Check 3: Numeric validation
        numeric_columns = ['Sales', 'Quantity', 'Discount', 'Profit']
        for col in numeric_columns:
            if col in df.columns:
                invalid_numeric = 0
                for value in df[col].dropna():
                    if not self._is_numeric(value):
                        invalid_numeric += 1
                
                if invalid_numeric > 0:
                    quality_flags.append(f"Invalid_{col}")
                    quality_score -= min((invalid_numeric / len(df)) * 100, 10)
        
        # Check 4: Duplicate Order IDs
        if 'Order ID' in df.columns:
            duplicates = df['Order ID'].duplicated().sum()
            if duplicates > 0:
                quality_flags.append("Duplicate_Order_IDs")
                quality_score -= min((duplicates / len(df)) * 100, 25)
        
        quality_score = max(quality_score, 0)  # Don't go below 0
        
        self.logger.info(f"✅ Data quality validation complete. Average score: {quality_score:.1f}")
        return quality_flags, quality_score
    
    def _is_valid_date(self, date_str):
        """Check if string can be parsed as date"""
        date_formats = ['%m/%d/%Y', '%d/%m/%Y', '%Y-%m-%d', '%m-%d-%Y', '%d-%m-%Y']
        for fmt in date_formats:
            try:
                datetime.strptime(str(date_str), fmt)
                return True
            except ValueError:
                continue
        return False
    
    def _is_numeric(self, value):
        """Check if value can be converted to number"""
        try:
            float(str(value).replace('$', '').replace(',', '').strip())
            return True
        except (ValueError, TypeError):
            return False


In [None]:
# ============================================================================
# DATA CLEANER - Silver Layer Processing
# ============================================================================

class DataCleaner:
    """Handles data cleaning and transformation for Silver layer"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def clean_data(self, df):
        """Clean and standardize data for Silver layer"""
        self.logger.info("Starting data cleaning process...")
        
        cleaned_df = df.copy()
        
        # Clean dates
        cleaned_df = self._clean_dates(cleaned_df)
        
        # Clean numeric fields
        cleaned_df = self._clean_numeric_fields(cleaned_df)
        
        # Clean text fields
        cleaned_df = self._clean_text_fields(cleaned_df)
        
        # Calculate derived fields
        cleaned_df = self._calculate_derived_fields(cleaned_df)
        
        # Add quality scores
        quality_checker = DataQualityChecker()
        quality_flags, quality_score = quality_checker.validate_and_score_data(cleaned_df)
        
        cleaned_df['data_quality_score'] = quality_score
        cleaned_df['quality_flags'] = '|'.join(quality_flags) if quality_flags else ''
        cleaned_df['processed_timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        self.logger.info(f"✅ Data cleaning complete. {len(cleaned_df)} records processed")
        return cleaned_df
    
    def _clean_dates(self, df):
        """Clean and standardize date columns"""
        date_columns = ['Order Date', 'Ship Date']
        
        for col in date_columns:
            if col in df.columns:
                df[col.lower().replace(' ', '_')] = pd.to_datetime(
                    df[col], errors='coerce', infer_datetime_format=True
                ).dt.date
        
        return df
    
    def _clean_numeric_fields(self, df):
        """Clean and convert numeric fields"""
        numeric_mappings = {
            'Sales': 'sales',
            'Quantity': 'quantity', 
            'Discount': 'discount',
            'Profit': 'profit'
        }
        
        for original_col, new_col in numeric_mappings.items():
            if original_col in df.columns:
                # Clean and convert to numeric
                df[new_col] = pd.to_numeric(
                    df[original_col].astype(str).str.replace('[$,]', '', regex=True),
                    errors='coerce'
                )
        
        return df
    
    def _clean_text_fields(self, df):
        """Clean and standardize text fields"""
        text_columns = ['Customer Name', 'Product Name', 'City', 'State']
        
        for col in text_columns:
            if col in df.columns:
                new_col = col.lower().replace(' ', '_')
                df[new_col] = df[col].astype(str).str.strip().str.title()
        
        return df
    
    def _calculate_derived_fields(self, df):
        """Calculate derived fields"""
        # Calculate delivery days
        if 'order_date' in df.columns and 'ship_date' in df.columns:
            df['delivery_days'] = (
                pd.to_datetime(df['ship_date']) - pd.to_datetime(df['order_date'])
            ).dt.days
        
        return df


In [None]:
# ============================================================================
# DIMENSIONAL MODEL BUILDER - Gold Layer
# ============================================================================

class DimensionalModelBuilder:
    """Builds dimensional model for Gold layer"""
    
    def __init__(self, db_manager):
        self.db_manager = db_manager
        self.logger = logging.getLogger(__name__)
    
    def build_dimensions_and_facts(self, silver_df):
        """Build dimensional model from silver layer data"""
        self.logger.info("Building dimensional model...")
        
        # Build dimensions
        customer_dim = self._build_customer_dimension(silver_df)
        product_dim = self._build_product_dimension(silver_df)
        geography_dim = self._build_geography_dimension(silver_df)
        
        # Build fact table
        fact_orders = self._build_fact_orders(silver_df, customer_dim, product_dim, geography_dim)
        
        # Load to database
        self._load_dimensions_to_db(customer_dim, product_dim, geography_dim)
        self._load_facts_to_db(fact_orders)
        
        return {
            'dim_customer': customer_dim,
            'dim_product': product_dim, 
            'dim_geography': geography_dim,
            'fact_orders': fact_orders
        }
    
    def _build_customer_dimension(self, df):
        """Build customer dimension"""
        customer_cols = ['customer_id', 'customer_name', 'segment']
        available_cols = [col for col in customer_cols if col in df.columns]
        
        if available_cols:
            customer_dim = df[available_cols].drop_duplicates().reset_index(drop=True)
            customer_dim['customer_id'] = range(1, len(customer_dim) + 1)
            customer_dim['created_date'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            customer_dim['updated_date'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            
            self.logger.info(f"✅ Customer dimension: {len(customer_dim)} records")
            return customer_dim
        
        return pd.DataFrame()
    
    def _build_product_dimension(self, df):
        """Build product dimension"""
        product_cols = ['product_id', 'product_name', 'category', 'sub_category']
        available_cols = [col for col in product_cols if col in df.columns]
        
        if available_cols:
            product_dim = df[available_cols].drop_duplicates().reset_index(drop=True)
            product_dim['product_id'] = range(1, len(product_dim) + 1)
            product_dim['created_date'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            product_dim['updated_date'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            
            self.logger.info(f"✅ Product dimension: {len(product_dim)} records")
            return product_dim
        
        return pd.DataFrame()
    
    def _build_geography_dimension(self, df):
        """Build geography dimension"""
        geo_cols = ['country', 'state', 'city', 'postal_code', 'region']
        available_cols = [col for col in geo_cols if col in df.columns]
        
        if available_cols:
            geography_dim = df[available_cols].drop_duplicates().reset_index(drop=True)
            geography_dim['geography_id'] = range(1, len(geography_dim) + 1)
            geography_dim['created_date'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            
            self.logger.info(f"✅ Geography dimension: {len(geography_dim)} records")
            return geography_dim
        
        return pd.DataFrame()
    
    def _build_fact_orders(self, df, customer_dim, product_dim, geography_dim):
        """Build fact orders table"""
        fact_orders = df.copy()
        
        # Add surrogate keys by joining with dimensions
        if not customer_dim.empty and 'customer_id' in fact_orders.columns:
            fact_orders = fact_orders.merge(
                customer_dim[['customer_id', 'customer_id']], 
                on='customer_id', how='left'
            )
        
        if not product_dim.empty and 'product_id' in fact_orders.columns:
            fact_orders = fact_orders.merge(
                product_dim[['product_id', 'product_id']], 
                on='product_id', how='left'
            )
        
        if not geography_dim.empty:
            geo_join_cols = ['country', 'state', 'city', 'postal_code']
            available_geo_cols = [col for col in geo_join_cols if col in fact_orders.columns]
            
            if available_geo_cols:
                fact_orders = fact_orders.merge(
                    geography_dim[['geography_id'] + available_geo_cols],
                    on=available_geo_cols, how='left'
                )
        
        # Select final fact table columns
        fact_columns = [
            'order_id', 'customer_id', 'product_id', 'geography_id',
            'order_date', 'ship_date', 'ship_mode', 'sales', 'quantity',
            'discount', 'profit', 'delivery_days', 'row_id'
        ]
        
        available_fact_cols = [col for col in fact_columns if col in fact_orders.columns]
        fact_orders = fact_orders[available_fact_cols]
        fact_orders['order_id'] = range(1, len(fact_orders) + 1)
        fact_orders['load_date'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        self.logger.info(f"✅ Fact orders: {len(fact_orders)} records")
        return fact_orders
    
    def _load_dimensions_to_db(self, customer_dim, product_dim, geography_dim):
        """Load dimension tables to database"""
        with self.db_manager.get_connection() as conn:
            if not customer_dim.empty:
                customer_dim.to_sql('gold_dim_customer', conn, if_exists='replace', index=False)
            
            if not product_dim.empty:
                product_dim.to_sql('gold_dim_product', conn, if_exists='replace', index=False)
            
            if not geography_dim.empty:
                geography_dim.to_sql('gold_dim_geography', conn, if_exists='replace', index=False)
        
        self.logger.info("✅ Dimensions loaded to database")
    
    def _load_facts_to_db(self, fact_orders):
        """Load fact table to database"""
        with self.db_manager.get_connection() as conn:
            fact_orders.to_sql('gold_fact_orders', conn, if_exists='replace', index=False)
        
        self.logger.info("✅ Facts loaded to database")


In [None]:

# ============================================================================
# MAIN PIPELINE ORCHESTRATOR
# ============================================================================

class DataPipeline:
    """Main pipeline orchestrator"""
    
    def __init__(self, config):
        self.config = config
        self.logger = setup_logging()
        self.db_manager = DatabaseManager(config.database_path)
        
    def run_pipeline(self):
        """Execute complete data pipeline"""
        self.logger.info("🚀 Starting Data Pipeline Execution")
        self.logger.info("="*60)
        
        try:
            # Step 1: Setup database
            self._setup_database()
            
            # Step 2: Extract and process files (Bronze Layer)
            bronze_data = self._bronze_layer_processing()
            
            # Step 3: Clean and validate data (Silver Layer)
            silver_data = self._silver_layer_processing(bronze_data)
            
            # Step 4: Build dimensional model (Gold Layer)
            gold_data = self._gold_layer_processing(silver_data)
            
            # Step 5: Generate reports
            self._generate_pipeline_reports(bronze_data, silver_data, gold_data)
            
            self.logger.info("✅ Pipeline execution completed successfully!")
            self.logger.info("="*60)
            
            return {
                'status': 'success',
                'bronze_records': len(bronze_data) if bronze_data is not None else 0,
                'silver_records': len(silver_data) if silver_data is not None else 0,
                'gold_records': gold_data
            }
            
        except Exception as e:
            self.logger.error(f"❌ Pipeline failed: {e}")
            return {'status': 'failed', 'error': str(e)}
    
    def _setup_database(self):
        """Setup database schemas"""
        self.logger.info("📊 Setting up database...")
        self.db_manager.create_schemas()
    
    def _bronze_layer_processing(self):
        """Bronze layer - raw data ingestion"""
        self.logger.info("🥉 Bronze Layer Processing...")
        
        file_processor = FileProcessor(self.config.zip_file_path, self.config)
        extracted_data, latest_files = file_processor.extract_and_get_latest_files()
        
        # Combine all dataframes
        if extracted_data:
            combined_df = pd.concat(extracted_data, ignore_index=True)
            
            # Load to bronze layer
            with self.db_manager.get_connection() as conn:
                combined_df.to_sql('bronze_orders_raw', conn, if_exists='replace', index=False)
            
            self.logger.info(f"✅ Bronze layer: {len(combined_df)} records loaded")
            return combined_df
        
        return None
    
    def _silver_layer_processing(self, bronze_data):
        """Silver layer - data cleaning and validation"""
        self.logger.info("🥈 Silver Layer Processing...")
        
        if bronze_data is not None:
            data_cleaner = DataCleaner()
            silver_data = data_cleaner.clean_data(bronze_data)
            
            # Load to silver layer
            with self.db_manager.get_connection() as conn:
                silver_data.to_sql('silver_orders_clean', conn, if_exists='replace', index=False)
            
            self.logger.info(f"✅ Silver layer: {len(silver_data)} records processed")
            return silver_data
        
        return None
    
    def _gold_layer_processing(self, silver_data):
        """Gold layer - dimensional modeling"""
        self.logger.info("🥇 Gold Layer Processing...")
        
        if silver_data is not None:
            dim_builder = DimensionalModelBuilder(self.db_manager)
            gold_data = dim_builder.build_dimensions_and_facts(silver_data)
            
            counts = {
                'customers': len(gold_data['dim_customer']),
                'products': len(gold_data['dim_product']),
                'geography': len(gold_data['dim_geography']),
                'orders': len(gold_data['fact_orders'])
            }
            
            self.logger.info("✅ Gold layer: Dimensional model created")
            return counts
        
        return {}
    
    def _generate_pipeline_reports(self, bronze_data, silver_data, gold_data):
        """Generate pipeline execution reports"""
        self.logger.info("📋 Generating pipeline reports...")
        
        # Pipeline summary report
        report = {
            'execution_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'bronze_records': len(bronze_data) if bronze_data is not None else 0,
            'silver_records': len(silver_data) if silver_data is not None else 0,
            'gold_dim_customer': gold_data.get('customers', 0),
            'gold_dim_product': gold_data.get('products', 0),
            'gold_dim_geography': gold_data.get('geography', 0),
            'gold_fact_orders': gold_data.get('orders', 0)
        }
        
        # Save report
        report_df = pd.DataFrame([report])
        report_df.to_csv('pipeline_execution_report.csv', index=False)
        
        self.logger.info("✅ Pipeline reports generated")


In [None]:

# ============================================================================
# UTILITY FUNCTIONS FOR MANUAL TESTING
# ============================================================================

def test_pipeline_components():
    """Test individual pipeline components"""
    print("🧪 Testing Pipeline Components...")
    
    config = PipelineConfig()
    
    # Test file processor
    if os.path.exists(config.zip_file_path):
        file_processor = FileProcessor(config.zip_file_path, config)
        try:
            data, files = file_processor.extract_and_get_latest_files()
            print(f"✅ File Processor: {len(data)} datasets extracted")
        except Exception as e:
            print(f"❌ File Processor failed: {e}")
    else:
        print(f"⚠️