In [17]:
import pandas as pd
import os
from datetime import datetime
import logging

SETUP & CONFIGURATION

In [None]:
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_rows', 10)
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

# Configuration
CONFIG = {
    'source_file': 'dynamic_pricing.csv',
    'raw_folder': 'data/raw',
    'processed_folder': 'data/processed',
    'logs_folder': 'logs',
    'date_format': '%Y%m%d'
}


HELPER FUNCTIONS

In [19]:
def create_directories(folders):
    try:
        logger.info("Creating project directories...")
        for folder in folders.values():
            os.makedirs(folder, exist_ok=True)
            logger.info(f" Created/verified: {folder}")
        return True
    except Exception as e:
        logger.error(f"Failed to create directories: {e}")
        return False


def validate_source_file(filepath):
    if not os.path.exists(filepath):
        logger.error(f"Source file not found: {filepath}")
        return False
    if not filepath.endswith('.csv'):
        logger.error("Invalid file format. Expected CSV file.")
        return False
    logger.info(f"Source file validated: {filepath}")
    return True


def extract_data(filepath):
    try:
        logger.info(f"Extracting data from: {filepath}")
        df = pd.read_csv(filepath)
        logger.info(f" Extracted {len(df)} rows, {len(df.columns)} columns")
        return df
    except Exception as e:
        logger.error(f"Failed to extract data: {e}")
        return None


def validate_schema(df):
    required_columns = [
        'Number_of_Riders',
        'Number_of_Drivers',
        'Location_Category',
        'Customer_Loyalty_Status',
        'Number_of_Past_Rides',
        'Average_Ratings',
        'Time_of_Booking',
        'Vehicle_Type',
        'Expected_Ride_Duration',
        'Historical_Cost_of_Ride'
    ]
    missing = [col for col in required_columns if col not in df.columns]
    if missing:
        logger.error(f"Missing required columns: {missing}")
        return False, missing
    logger.info(" Schema validation passed")
    return True, []


def check_data_quality(df):
    logger.info("Checking data quality...")
    metrics = {
        'total_rows': len(df),
        'total_columns': len(df.columns),
        'missing_values': df.isnull().sum().sum(),
        'duplicate_rows': df.duplicated().sum(),
        'empty_dataset': len(df) == 0
    }
    if metrics['missing_values'] > 0:
        logger.warning(f"Found {metrics['missing_values']} missing values")
    if metrics['duplicate_rows'] > 0:
        logger.warning(f"Found {metrics['duplicate_rows']} duplicate rows")
    if metrics['empty_dataset']:
        logger.error("Dataset is empty!")
    else:
        logger.info(" Data quality check completed")
    return metrics


def save_data(df, folder, filename_prefix):
    try:
        today = datetime.now().strftime(CONFIG['date_format'])
        filepath = os.path.join(folder, f"{filename_prefix}_{today}.csv")
        df.to_csv(filepath, index=False)
        file_size = os.path.getsize(filepath) / 1024
        logger.info(f" Saved: {filepath} ({file_size:.1f} KB)")
        return filepath
    except Exception as e:
        logger.error(f"Failed to save data: {e}")
        return None


def generate_ingestion_report(metrics, files_created):
    try:
        today = datetime.now().strftime('%Y-%m-%d')
        report_path = os.path.join(CONFIG['logs_folder'], f'ingestion_report_{today}.txt')
        with open(report_path, 'w') as f:
            f.write(f"Report Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
            status = "SUCCESS" if not metrics['empty_dataset'] else "FAILED"
            f.write("INGESTION STATUS\n")
            f.write(f"Status: {status}\n\n")
            f.write("DATA QUALITY METRICS\n")
            f.write(f"Total Records:      {metrics['total_rows']:,}\n")
            f.write(f"Total Features:     {metrics['total_columns']}\n")
            f.write(f"Missing Values:     {metrics['missing_values']}\n")
            f.write(f"Duplicate Records:  {metrics['duplicate_rows']}\n\n")
            f.write("FILES CREATED\n")
            for idx, filepath in enumerate(files_created, 1):
                f.write(f"{idx}. {filepath}\n")
        logger.info(f"Report generated: {report_path}")
        return report_path
    except Exception as e:
        logger.error(f"Failed to generate report: {e}")
        return None


INGESTION PIPELINE

In [20]:
def run_ingestion_pipeline():
    print("EXECUTING DATA INGESTION PIPELINE")
    
    print("Step 1: Creating project directories...")
    folders = {
        'raw': CONFIG['raw_folder'],
        'processed': CONFIG['processed_folder'],
        'logs': CONFIG['logs_folder']
    }
    if not create_directories(folders):
        print(" Failed to create directories")
        return False
    print("Directories ready\n")
    
    print("Step 2: Validating source file...")
    if not validate_source_file(CONFIG['source_file']):
        print("Source file validation failed")
        return False
    print("Source file validated\n")
    
    print("Step 3: Extracting data...")
    df = extract_data(CONFIG['source_file'])
    if df is None:
        print("Data extraction failed")
        return False
    print(f"Extracted {len(df):,} records\n")
    
    print("Step 4: Validating data schema...")
    is_valid, missing_cols = validate_schema(df)
    if not is_valid:
        print(f"Schema validation failed. Missing: {missing_cols}")
        return False
    print("Schema validated\n")
    
    print("Step 5: Checking data quality...")
    metrics = check_data_quality(df)
    print(f"Quality check completed")
    print(f" Total rows: {metrics['total_rows']:,}")
    print(f" Missing values: {metrics['missing_values']}")
    print(f" Duplicates: {metrics['duplicate_rows']}\n")
    
    print("Step 6: Saving raw data backup...")
    raw_file = save_data(df, CONFIG['raw_folder'], 'pricing')
    if raw_file is None:
        print("Failed to save raw data")
        return False
    print(f"Raw data saved\n")
    
    print("Step 7: Saving processed data...")
    processed_file = save_data(df, CONFIG['processed_folder'], 'pricing')
    if processed_file is None:
        print("Failed to save processed data")
        return False
    print(f"Processed data saved\n")
    
    print("Step 8: Generating ingestion report...")
    files_created = [raw_file, processed_file]
    report_file = generate_ingestion_report(metrics, files_created)
    if report_file is None:
        print("Failed to generate report")
        return False
    print(f"Report generated\n")
    print(f"\n Files Created:")
    print(f"  1. {raw_file}")
    print(f"  2. {processed_file}")
    print(f"  3. {report_file}")    
    logger.info("Pipeline execution completed successfully")
    return True


EXECUTE PIPELINE

In [21]:
if __name__ == "__main__":
    try:
        success = run_ingestion_pipeline()
        if not success:
            print("\n Pipeline completed with errors. Check logs for details.")
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        print(f"\n Pipeline failed with error: {e}")


2025-10-07 12:30:20 - INFO - Creating project directories...
2025-10-07 12:30:20 - INFO -  Created/verified: data/raw
2025-10-07 12:30:20 - INFO -  Created/verified: data/processed
2025-10-07 12:30:20 - INFO -  Created/verified: logs
2025-10-07 12:30:20 - INFO - Source file validated: dynamic_pricing.csv
2025-10-07 12:30:20 - INFO - Extracting data from: dynamic_pricing.csv
2025-10-07 12:30:20 - INFO -  Extracted 1000 rows, 10 columns
2025-10-07 12:30:20 - INFO -  Schema validation passed
2025-10-07 12:30:20 - INFO - Checking data quality...
2025-10-07 12:30:20 - INFO -  Data quality check completed
2025-10-07 12:30:20 - INFO -  Saved: data/raw\pricing_20251007.csv (64.5 KB)
2025-10-07 12:30:20 - INFO -  Saved: data/processed\pricing_20251007.csv (64.5 KB)
2025-10-07 12:30:20 - INFO - Report generated: logs\ingestion_report_2025-10-07.txt
2025-10-07 12:30:20 - INFO - Pipeline execution completed successfully


EXECUTING DATA INGESTION PIPELINE
Step 1: Creating project directories...
Directories ready

Step 2: Validating source file...
Source file validated

Step 3: Extracting data...
Extracted 1,000 records

Step 4: Validating data schema...
Schema validated

Step 5: Checking data quality...
Quality check completed
 Total rows: 1,000
 Missing values: 0
 Duplicates: 0

Step 6: Saving raw data backup...
Raw data saved

Step 7: Saving processed data...
Processed data saved

Step 8: Generating ingestion report...
Report generated


 Files Created:
  1. data/raw\pricing_20251007.csv
  2. data/processed\pricing_20251007.csv
  3. logs\ingestion_report_2025-10-07.txt
