In [3]:
# FinMark Corporation: Robust Data Pipeline
# **Enhanced with Schema Validation & Fallback Logic**
# Key Features:
# 1. Pre-processing schema validation
# 2. Critical column protection
# 3. Graceful degradation for partial failures
# 4. Comprehensive error logging

In [83]:
import pandas as pd
import numpy as np
import logging
from datetime import datetime
import json

In [6]:
# Configure error logging
logging.basicConfig(
    filename=f'data_pipeline_{datetime.now().strftime("%Y%m%d")}.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('FinMarkPipeline')

In [7]:
# Define expected schemas for each dataset
SCHEMAS = {
    'event_logs': {
        'required': ['user_id', 'event_type', 'event_time', 'product_id', 'amount'],
        'dtypes': {
            'user_id': 'str',
            'event_type': 'str',
            'event_time': 'datetime64[ns]',
            'product_id': 'str',
            'amount': 'float'
        }
    },
    'trend_report': {
        'required': ['week', 'avg_users', 'sales_growth_rate'],
        'dtypes': {
            'week': 'datetime64[ns]',
            'avg_users': 'int',
            'sales_growth_rate': 'float'
        }
    },
    'marketing_summary': {
        'required': ['date', 'users_active', 'total_sales', 'new_customers'],
        'dtypes': {
            'date': 'datetime64[ns]',
            'users_active': 'int',
            'total_sales': 'float',
            'new_customers': 'int'
        }
    }
}

In [8]:
# Fallback defaults for critical columns
FALLBACK_VALUES = {
    'amount': 0.0,
    'total_sales': 0.0,
    'avg_users': 0,
    'users_active': 0,
    'new_customers': 0,
    'sales_growth_rate': 0.0
}

In [9]:
# Save schemas for documentation
with open('data_schemas.json', 'w') as f:
    json.dump(SCHEMAS, f, indent=2)


In [10]:
# ## Validation & Repair Functions

def validate_and_repair(df, dataset_name):
    """Validate schema and repair missing/corrupted columns"""
    schema = SCHEMAS[dataset_name]
    validation_report = {
        'dataset': dataset_name,
        'timestamp': datetime.now().isoformat(),
        'original_columns': list(df.columns),
        'missing_columns': [],
        'repaired_columns': [],
        'type_issues': {},
        'critical_failure': False
    }
        # 1. Check for missing columns
    missing_cols = [col for col in schema['required'] if col not in df.columns]
    validation_report['missing_columns'] = missing_cols
    
    if missing_cols:
        logger.error(f"Missing columns in {dataset_name}: {missing_cols}")
        for col in missing_cols:
            if col in FALLBACK_VALUES:
                df[col] = FALLBACK_VALUES[col]
                logger.warning(f"Created fallback column: {col} with default values")
                validation_report['repaired_columns'].append(col)
            else:
                logger.critical(f"Unrecoverable missing column: {col}")
                validation_report['critical_failure'] = True
    
    # 2. Validate data types
    for col, expected_type in schema['dtypes'].items():
        if col not in df.columns:
            continue
            
        current_type = str(df[col].dtype)
        type_match = current_type == expected_type
        
        # Special handling for datetime columns
        if expected_type == 'datetime64[ns]' and not pd.api.types.is_datetime64_any_dtype(df[col]):
            type_match = False
            
        if not type_match:
            validation_report['type_issues'][col] = {
                'expected': expected_type,
                'actual': current_type
            }
            logger.warning(f"Type mismatch in {dataset_name}.{col}: {current_type} vs {expected_type}")
            
            try:
                # Handle datetime conversion
                if expected_type == 'datetime64[ns]':
                    df[col] = pd.to_datetime(df[col], format='ISO8601', errors='coerce')
                    # Check if conversion created too many NaTs
                    if df[col].isna().mean() > 0.5:
                        logger.error(f"Over 50% date conversion failures for {col}")
                        raise ValueError("Excessive date conversion errors")
                else:
                    df[col] = df[col].astype(expected_type)
                    
                logger.info(f"Successfully converted {col} to {expected_type}")
                validation_report['type_issues'][col]['repaired'] = True
                
            except (TypeError, ValueError) as e:
                logger.error(f"Type conversion failed for {col}: {str(e)}")
                if col in FALLBACK_VALUES:
                    df[col] = FALLBACK_VALUES[col]
                    logger.warning(f"Reset corrupted column: {col} to default values")
                    validation_report['type_issues'][col]['fallback_used'] = True
                else:
                    validation_report['type_issues'][col]['repaired'] = False
    
    return df, validation_report

In [11]:
# ## Core Processing Pipeline
def process_dataset(file_path, dataset_name):
    """Load, validate, and clean dataset with error handling"""
    logger.info(f"Starting processing: {dataset_name}")
    validation_result = None
    
    try:
        # Load data
        df = pd.read_csv(file_path)
        logger.info(f"Loaded {len(df)} records from {dataset_name}")
        
        # Schema validation and repair
        df, validation_result = validate_and_repair(df, dataset_name)
        
        if validation_result.get('critical_failure', False):
            logger.error(f"Critical validation failure for {dataset_name}. Aborting processing.")
            return None, validation_result
        
        # Dataset-specific cleaning
        if dataset_name == 'event_logs':
            df = clean_event_logs(df)
        elif dataset_name == 'trend_report':
            df = clean_trend_report(df)
        elif dataset_name == 'marketing_summary':
            df = clean_marketing_summary(df)
            
        logger.info(f"Successfully processed {dataset_name}")
        return df, validation_result
        
    except Exception as e:
        logger.exception(f"Fatal error processing {dataset_name}")
        return None, validation_result

In [77]:
## Dataset-Specific Cleaning Functions

def clean_event_logs(df):
    """Cleaning logic for event logs"""
    # Remove undefined columns
    df = df[SCHEMAS['event_logs']['required']].copy()
    
    # Handle missing values
    df['event_type'] = df['event_type'].fillna('unknown')
    df['user_id'] = df['user_id'].fillna('UNK').astype(str)
    
    # Product/amount handling: Only relevant for orders
    order_mask = df['event_type'].str.contains('order|purchase', case=False, na=False)
    df.loc[order_mask, 'product_id'] = df.loc[order_mask, 'product_id'].fillna('PROD_UNK')
    df.loc[order_mask, 'amount'] = df.loc[order_mask, 'amount'].fillna(0)
    df.loc[~order_mask, 'product_id'] = df.loc[~order_mask, 'product_id'].fillna('N/A')
    df.loc[~order_mask, 'amount'] = 0
    
    return df

def clean_trend_report(df):
    """Cleaning logic for trend report"""
    # Keep only relevant columns
    df = df[SCHEMAS['trend_report']['required']].copy()
    
    # Handle dates
    if not pd.api.types.is_datetime64_any_dtype(df['week']):
        df['week'] = pd.to_datetime(df['week'], format='ISO8601', errors='coerce')
    
    # Sort before forward-filling
    df.sort_values('week', inplace=True)
    
    # Fill numeric metrics
    df['avg_users'] = df['avg_users'].fillna(0)
    df['sales_growth_rate'] = df['sales_growth_rate'].ffill().fillna(0)
    
    return df

def clean_marketing_summary(df):
    """Cleaning logic for marketing summary"""
    # Keep relevant columns
    df = df[SCHEMAS['marketing_summary']['required']].copy()
    
    # Date conversion
    if not pd.api.types.is_datetime64_any_dtype(df['date']):
        df['date'] = pd.to_datetime(df['date'], format='ISO8601', errors='coerce')
    
    # Fill sequential data
    df.sort_values('date', inplace=True)
    for col in ['users_active', 'total_sales', 'new_customers']:
        df[col] = df[col].ffill().fillna(0)
    
    return df

In [21]:
## Pipeline Execution

if __name__ == "__main__":
    datasets = {
        'event_logs': 'event_logs.csv',
        'trend_report': 'trend_report.csv',
        'marketing_summary': 'marketing_summary.csv'
    }
    
    results = {}
    validation_reports = {}
    
    print("=== Starting FinMark Data Pipeline ===")
    print(f"Execution timestamp: {datetime.now().isoformat()}")
    print("--------------------------------------")
    
    for name, path in datasets.items():
        print(f"\nProcessing {name}...")
        cleaned, report = process_dataset(path, name)
        
        validation_reports[name] = report
        
        if cleaned is not None:
            results[name] = cleaned
            # Save cleaned data
            cleaned.to_csv(f'cleaned_{name}.csv', index=False)
            print(f"✅ Successfully processed {name} ({len(cleaned)} records)")
        else:
            print(f"❌ Failed to process {name} - check error logs")

=== Starting FinMark Data Pipeline ===
Execution timestamp: 2025-07-02T11:01:01.534031
--------------------------------------

Processing event_logs...
✅ Successfully processed event_logs (2000 records)

Processing trend_report...
✅ Successfully processed trend_report (20 records)

Processing marketing_summary...
✅ Successfully processed marketing_summary (100 records)


In [22]:
    # Generate validation summary
    print("\n=== Validation Summary ===")
    for name, report in validation_reports.items():
        status = "PASSED" if name in results else "FAILED"
        print(f"\nDataset: {name} - {status}")
        
        if report:
            print(f"Missing columns: {len(report['missing_columns'])}")
            print(f"Repaired columns: {len(report['repaired_columns'])}")
            print(f"Type issues: {len(report.get('type_issues', {}))}")
    
    print("\nPipeline completed")
    print(f"Successful datasets: {len(results)}/{len(datasets)}")
    print(f"Log file: data_pipeline_{datetime.now().strftime('%Y%m%d')}.log")


=== Validation Summary ===

Dataset: event_logs - PASSED
Missing columns: 0
Repaired columns: 0
Type issues: 5

Dataset: trend_report - PASSED
Missing columns: 0
Repaired columns: 0
Type issues: 3

Dataset: marketing_summary - PASSED
Missing columns: 0
Repaired columns: 0
Type issues: 4

Pipeline completed
Successful datasets: 3/3
Log file: data_pipeline_20250702.log


In [95]:
pip install pandas numpy matplotlib

Note: you may need to restart the kernel to use updated packages.
