# Data Processing Pipeline Demo

This notebook demonstrates the complete data processing pipeline with real examples.

In [None]:
# Setup and imports
import sys
import os
sys.path.append('..')

import pandas as pd
import numpy as np
from pathlib import Path

# Import pipeline modules
from core.pipeline import DataPipeline
from core.config import PipelineConfig, TransformConfig, ValidationConfig
from monitoring.visualizer import QualityVisualizer

## 1. Generate Sample Data

In [None]:
# Create sample dataset
np.random.seed(42)

n_samples = 1000
sample_data = pd.DataFrame({
    'customer_id': range(1, n_samples + 1),
    'age': np.random.randint(18, 80, n_samples),
    'income': np.random.lognormal(10.5, 0.5, n_samples),
    'purchase_amount': np.random.lognormal(4, 1.5, n_samples),
    'category': np.random.choice(['Electronics', 'Clothing', 'Food', 'Books', 'Sports'], n_samples),
    'rating': np.random.uniform(1, 5, n_samples),
    'signup_date': pd.date_range('2023-01-01', periods=n_samples, freq='H'),
    'city': np.random.choice(['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix'], n_samples),
    'has_premium': np.random.choice([0, 1], n_samples, p=[0.7, 0.3])
})

# Add some missing values
missing_mask = np.random.random(n_samples) < 0.05
sample_data.loc[missing_mask, 'rating'] = np.nan

# Add some duplicates
duplicates = sample_data.sample(20)
sample_data = pd.concat([sample_data, duplicates], ignore_index=True)

# Save to CSV
sample_data.to_csv('sample_data.csv', index=False)
print(f"Created sample dataset with {len(sample_data)} rows and {len(sample_data.columns)} columns")
sample_data.head()

## 2. Configure Pipeline

In [None]:
# Create pipeline configuration
config = PipelineConfig(
    name="Customer Analytics Pipeline",
    source_type="csv",
    source_path="sample_data.csv",
    output_path="processed_data.csv",
    enable_monitoring=True,
    enable_profiling=True,
    chunk_size=500
)

# Transformation configuration
transform_config = TransformConfig(
    handle_missing="median",
    remove_outliers=True,
    scale_numeric=True,
    encode_categorical=True
)

# Validation configuration
validation_config = ValidationConfig(
    check_schema=True,
    check_quality=True,
    quality_checks=[
        "missing_values",
        "duplicates",
        "outliers",
        "data_types"
    ]
)

print("Pipeline configured successfully")

## 3. Run Basic Pipeline

In [None]:
# Initialize and run pipeline
pipeline = DataPipeline(config)

# Execute pipeline
result = pipeline.execute(
    transform_config=transform_config,
    validation_config=validation_config
)

# Display execution summary
if result['success']:
    print("[SUCCESS] Pipeline executed successfully")
    print(f"\nProcessing Summary:")
    print(f"- Original shape: {result['original_shape']}")
    print(f"- Final shape: {result['final_shape']}")
    print(f"- Rows processed: {result['rows_processed']}")
    print(f"- Execution time: {result['execution_time']:.2f} seconds")
    
    if 'validation_report' in result:
        print(f"\nValidation Status: {'PASSED' if result['validation_report']['valid'] else 'FAILED'}")
else:
    print("[ERROR] Pipeline failed:")
    for error in result.get('errors', []):
        print(f"  - {error}")

## 4. Data Validation Deep Dive

In [None]:
# Load processed data
processed_df = pd.read_csv('processed_data.csv')

# Visualize validation results
if result.get('validation_report'):
    visualizer = QualityVisualizer()
    visualizer.plot_validation_results(result['validation_report'])

## 5. Feature Engineering Example

In [None]:
from transformers.feature_engineer import FeatureEngineer

# Create feature engineer
engineer = FeatureEngineer(transform_config)

# Define custom feature specifications
feature_specs = [
    {
        'type': 'interaction',
        'column1': 'age',
        'column2': 'income',
        'operation': 'multiply'
    },
    {
        'type': 'transform',
        'column': 'purchase_amount',
        'transform': 'log'
    }
]

# Apply feature engineering
df_with_features = engineer.engineer(sample_data.copy(), feature_specs)

print(f"Original features: {len(sample_data.columns)}")
print(f"After engineering: {len(df_with_features.columns)}")
print(f"\nNew features created:")
new_features = set(df_with_features.columns) - set(sample_data.columns)
for feat in list(new_features)[:10]:
    print(f"  - {feat}")

## 6. Performance Monitoring

In [None]:
# Display performance metrics
if result.get('monitoring_metrics'):
    metrics = result['monitoring_metrics']
    
    print("Pipeline Performance Metrics:")
    print("=" * 50)
    
    # Stage performance
    if 'stages' in metrics:
        print("\nStage Durations:")
        for stage, stats in metrics['stages'].items():
            print(f"  {stage}: {stats.get('duration', 0):.2f}s")
    
    # Resource usage
    if 'summary' in metrics:
        summary = metrics['summary']
        print(f"\nResource Usage:")
        print(f"  Peak Memory: {summary.get('peak_memory_mb', 0):.1f} MB")
        print(f"  Avg CPU: {summary.get('average_cpu_percent', 0):.1f}%")
    
    # Issues
    print(f"\nIssues Encountered:")
    print(f"  Errors: {len(metrics.get('errors', []))}")
    print(f"  Warnings: {len(metrics.get('warnings', []))}")
    
    # Visualize pipeline execution
    visualizer = QualityVisualizer()
    visualizer.plot_pipeline_summary(result)

## 7. Data Profiling and Correlation Analysis

In [None]:
# Profile the processed data
visualizer = QualityVisualizer()

# Create data profiling visualization
print("Data Profiling:")
visualizer.plot_data_profiling(processed_df, max_cols=12)

# Correlation matrix for numeric features
print("\nCorrelation Analysis:")
visualizer.plot_correlation_matrix(processed_df, max_features=15)

## 8. Advanced Pipeline with Custom Transformations

In [None]:
# Define custom transformation function
def custom_age_group(df):
    """Create age group categories."""
    bins = [0, 25, 35, 50, 65, 100]
    labels = ['Gen Z', 'Millennial', 'Gen X', 'Boomer', 'Senior']
    df['age_group'] = pd.cut(df['age'], bins=bins, labels=labels)
    return df

def custom_revenue_segment(df):
    """Segment customers by revenue."""
    df['revenue_segment'] = pd.qcut(df['purchase_amount'], 
                                    q=[0, 0.25, 0.75, 1],
                                    labels=['Low', 'Medium', 'High'])
    return df

# Configure pipeline with custom transformations
advanced_config = PipelineConfig(
    name="Advanced Customer Pipeline",
    source_type="csv",
    source_path="sample_data.csv",
    output_path="advanced_processed.csv",
    custom_transforms=[custom_age_group, custom_revenue_segment],
    enable_monitoring=True
)

# Run advanced pipeline
advanced_pipeline = DataPipeline(advanced_config)
advanced_result = advanced_pipeline.execute(
    transform_config=transform_config,
    validation_config=validation_config
)

# Check new features
advanced_df = pd.read_csv('advanced_processed.csv')
print("Custom features added:")
print(advanced_df[['age', 'age_group', 'purchase_amount', 'revenue_segment']].head(10))

## 9. Batch Processing Example

In [None]:
# Create multiple sample files
for i in range(3):
    batch_data = sample_data.sample(n=300)
    batch_data.to_csv(f'batch_{i}.csv', index=False)

# Process multiple files
from core.pipeline import BatchProcessor

batch_processor = BatchProcessor(config)
batch_files = ['batch_0.csv', 'batch_1.csv', 'batch_2.csv']

batch_results = []
for file in batch_files:
    config.source_path = file
    config.output_path = f'processed_{file}'
    
    pipeline = DataPipeline(config)
    result = pipeline.execute(
        transform_config=transform_config,
        validation_config=validation_config
    )
    batch_results.append(result)
    print(f"Processed {file}: {result['success']}")

# Aggregate results
total_rows = sum(r['rows_processed'] for r in batch_results)
avg_time = np.mean([r['execution_time'] for r in batch_results])
print(f"\nBatch Processing Summary:")
print(f"  Total rows processed: {total_rows}")
print(f"  Average processing time: {avg_time:.2f}s")

## 10. Export and Save Results

In [None]:
# Save monitoring report
if result.get('monitoring_metrics'):
    from monitoring.monitor import PipelineMonitor
    import json
    
    # Save metrics to JSON
    with open('pipeline_metrics.json', 'w') as f:
        json.dump(result['monitoring_metrics'], f, indent=2, default=str)
    
    print("Metrics saved to pipeline_metrics.json")

# Generate performance report
if 'pipeline_monitor' in dir():
    report = pipeline_monitor.generate_performance_report()
    with open('performance_report.txt', 'w') as f:
        f.write(report)
    print("Performance report saved to performance_report.txt")

# Clean up temporary files
import os
for file in ['sample_data.csv', 'processed_data.csv', 'advanced_processed.csv'] + batch_files:
    if os.path.exists(file):
        os.remove(file)
print("\nCleanup completed")