<a href="https://colab.research.google.com/github/anwar11235/market_predictor/blob/main/market_predictor/notebooks/06_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Market Predictor Pipeline Orchestration

This notebook orchestrates the complete market prediction pipeline:
1. Data Collection
2. Feature Engineering
3. Model Training
4. Strategy Backtesting
5. Performance Monitoring

In [None]:
import sys
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns

# Add project root to path
sys.path.append('..')

# Import project components
from src.data import DataLoader
from src.features import FeatureGenerator
from src.models import create_ensemble, ModelFactory
from src.integrations import create_data_clients
from src.utils import setup_project_logger
from config import Config, load_validated_config

# Set up logging
logger = setup_project_logger('pipeline_orchestration')

# Plotting settings
plt.style.use('seaborn')
%matplotlib inline
sns.set_theme(style="whitegrid")

## 1. Data Collection and Integration

Initialize data sources and collect data from all providers:
- Market data
- News and sentiment data
- Macroeconomic data

In [None]:
# Load configuration
config = load_validated_config('config/parameters.yaml')

# Initialize data clients
logger.info("Initializing data clients...")
clients = create_data_clients(config)

# Initialize data loader
data_loader = DataLoader(config)

# Collect market data
logger.info("Collecting market data...")
market_data = data_loader.get_market_data()
print("\nMarket Data Overview:")
print(market_data.info())

# Collect news and sentiment data
logger.info("Collecting news and sentiment data...")
news_data = {}
sentiment_data = {}

if 'newsapi' in clients:
    news_data['newsapi'] = clients['newsapi'].get_market_news()
if 'finnhub' in clients:
    news_data['finnhub'] = clients['finnhub'].get_market_news()

# Collect macroeconomic data
logger.info("Collecting macroeconomic data...")
macro_data = data_loader.get_macro_data()
print("\nMacroeconomic Data Overview:")
print(macro_data.info())

# Plot data overview
fig, axes = plt.subplots(3, 1, figsize=(15, 12))

# Market data plot
market_data['Close'].plot(ax=axes[0])
axes[0].set_title('Market Price')
axes[0].set_ylabel('Price')

# Volume plot
market_data['Volume'].plot(ax=axes[1])
axes[1].set_title('Trading Volume')
axes[1].set_ylabel('Volume')

# Macro indicators plot
if not macro_data.empty:
    macro_data.iloc[:, 0].plot(ax=axes[2])  # Plot first macro indicator
    axes[2].set_title('Macroeconomic Indicator')

plt.tight_layout()
plt.show()

# Save collected data
logger.info("Saving collected data...")
market_data.to_parquet('data/processed/market_data.parquet')
macro_data.to_parquet('data/processed/macro_data.parquet')

for source, data in news_data.items():
    if not data.empty:
        data.to_parquet(f'data/processed/news_{source}.parquet')

logger.info("Data collection completed")

## 1. Data Collection and Integration

Initialize data sources and collect data from all providers:
- Market data
- News and sentiment data
- Macroeconomic data

In [None]:
# Load configuration
config = load_validated_config('config/parameters.yaml')

# Initialize data clients
logger.info("Initializing data clients...")
clients = create_data_clients(config)

# Initialize data loader
data_loader = DataLoader(config)

# Collect market data
logger.info("Collecting market data...")
market_data = data_loader.get_market_data()
print("\nMarket Data Overview:")
print(market_data.info())

# Collect news and sentiment data
logger.info("Collecting news and sentiment data...")
news_data = {}
sentiment_data = {}

if 'newsapi' in clients:
    news_data['newsapi'] = clients['newsapi'].get_market_news()
if 'finnhub' in clients:
    news_data['finnhub'] = clients['finnhub'].get_market_news()

# Collect macroeconomic data
logger.info("Collecting macroeconomic data...")
macro_data = data_loader.get_macro_data()
print("\nMacroeconomic Data Overview:")
print(macro_data.info())

# Plot data overview
fig, axes = plt.subplots(3, 1, figsize=(15, 12))

# Market data plot
market_data['Close'].plot(ax=axes[0])
axes[0].set_title('Market Price')
axes[0].set_ylabel('Price')

# Volume plot
market_data['Volume'].plot(ax=axes[1])
axes[1].set_title('Trading Volume')
axes[1].set_ylabel('Volume')

# Macro indicators plot
if not macro_data.empty:
    macro_data.iloc[:, 0].plot(ax=axes[2])  # Plot first macro indicator
    axes[2].set_title('Macroeconomic Indicator')

plt.tight_layout()
plt.show()

# Save collected data
logger.info("Saving collected data...")
market_data.to_parquet('data/processed/market_data.parquet')
macro_data.to_parquet('data/processed/macro_data.parquet')

for source, data in news_data.items():
    if not data.empty:
        data.to_parquet(f'data/processed/news_{source}.parquet')

logger.info("Data collection completed")

## 2. Feature Engineering

Generate and process features from all data sources:
- Technical features
- Sentiment features
- Macroeconomic features
- Feature selection and combination

In [None]:
# Initialize feature generator
logger.info("Initializing feature generation...")
feature_gen = FeatureGenerator(config)

# Generate technical features
logger.info("Generating technical features...")
tech_features = feature_gen.technical_features.calculate_all_features(market_data)
print("\nTechnical Features Overview:")
print(tech_features.info())

# Generate sentiment features
logger.info("Generating sentiment features...")
sent_features = pd.DataFrame(index=market_data.index)
for source, data in news_data.items():
    if not data.empty:
        source_features = feature_gen.sentiment_features.calculate_all_features(data)
        sent_features = pd.concat([sent_features, source_features], axis=1)

print("\nSentiment Features Overview:")
print(sent_features.info())

# Generate macro features
logger.info("Generating macro features...")
macro_features = feature_gen.macro_features.calculate_all_features(
    macro_data,
    market_data
)
print("\nMacro Features Overview:")
print(macro_features.info())

# Combine all features
logger.info("Combining features...")
all_features = feature_gen.generate_all_features(
    market_data=market_data,
    macro_data=macro_data,
    sentiment_data=sent_features
)

# Create target variable (next day return direction)
returns = market_data['Close'].pct_change()
target = np.where(returns.shift(-1) > 0, 1, 0)
target = pd.Series(target[:-1], index=returns.index[:-1])

# Feature selection
logger.info("Performing feature selection...")
selected_features = feature_gen.select_features(
    all_features.loc[target.index],
    target,
    n_features=20
)

# Plot feature importance
feature_importance = pd.DataFrame({
    'feature': selected_features.columns,
    'importance': feature_gen.feature_importance
}).sort_values('importance', ascending=False)

plt.figure(figsize=(12, 6))
sns.barplot(x='importance', y='feature', data=feature_importance)
plt.title('Feature Importance')
plt.tight_layout()
plt.show()

# Plot correlation matrix
plt.figure(figsize=(12, 10))
sns.heatmap(
    selected_features.corr(),
    annot=True,
    cmap='coolwarm',
    center=0,
    fmt='.2f'
)
plt.title('Feature Correlation Matrix')
plt.tight_layout()
plt.show()

# Save processed features
logger.info("Saving processed features...")
selected_features.to_parquet('data/features/selected_features.parquet')
feature_importance.to_csv('data/features/feature_importance.csv')

# Save feature metadata
feature_metadata = {
    'n_technical_features': len(tech_features.columns),
    'n_sentiment_features': len(sent_features.columns),
    'n_macro_features': len(macro_features.columns),
    'n_selected_features': len(selected_features.columns),
    'feature_names': selected_features.columns.tolist(),
    'generation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}

with open('data/features/feature_metadata.json', 'w') as f:
    json.dump(feature_metadata, f, indent=4)

logger.info("Feature engineering completed")

## 3. Model Training and Ensemble Creation

Train base models and create ensemble:
- Train individual models
- Optimize hyperparameters
- Create and train ensemble
- Evaluate performance

In [None]:
# Split data into train, validation, and test sets
logger.info("Preparing training data...")

train_end = pd.Timestamp(config.data.validation_start)
val_end = pd.Timestamp(config.data.test_start)

# Features
train_features = selected_features[:train_end]
val_features = selected_features[train_end:val_end]
test_features = selected_features[val_end:]

# Target
train_target = target[:train_end]
val_target = target[train_end:val_end]
test_target = target[val_end:]

# Initialize model factory
model_factory = ModelFactory(config)

# Train base models
logger.info("Training base models...")
base_models = {}
base_predictions = {}
model_metrics = {}

for model_type in ['random_forest', 'xgboost', 'lightgbm']:
    logger.info(f"Training {model_type}...")
    
    # Create and train model
    model = model_factory.create_model(model_type)
    history = model.train(
        train_features,
        train_target,
        val_features,
        val_target
    )
    
    # Store model and predictions
    base_models[model_type] = model
    base_predictions[model_type] = {
        'train': model.predict_proba(train_features),
        'val': model.predict_proba(val_features),
        'test': model.predict_proba(test_features)
    }
    model_metrics[model_type] = history

# Create and train ensemble
logger.info("Creating ensemble model...")
ensemble = create_ensemble(
    config=config,
    base_models=list(base_models.values()),
    model_names=list(base_models.keys())
)

# Train ensemble
ensemble_history = ensemble.train(
    train_features,
    train_target,
    val_features,
    val_target,
    base_predictions={
        'train': {name: pred['train'] for name, pred in base_predictions.items()},
        'val': {name: pred['val'] for name, pred in base_predictions.items()}
    }
)

# Get ensemble predictions
ensemble_predictions = {
    'train': ensemble.predict_proba(train_features),
    'val': ensemble.predict_proba(val_features),
    'test': ensemble.predict_proba(test_features)
}

# Evaluate models
def evaluate_predictions(y_true, y_prob):
    """Calculate comprehensive metrics"""
    y_pred = (y_prob[:, 1] > 0.5).astype(int)
    return {
        'accuracy': accuracy_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred),
        'recall': recall_score(y_true, y_pred),
        'f1': f1_score(y_true, y_pred),
        'auc_roc': roc_auc_score(y_true, y_prob[:, 1])
    }

evaluation_results = {}
for name in base_models:
    evaluation_results[name] = {
        'train': evaluate_predictions(train_target, base_predictions[name]['train']),
        'val': evaluate_predictions(val_target, base_predictions[name]['val']),
        'test': evaluate_predictions(test_target, base_predictions[name]['test'])
    }

evaluation_results['ensemble'] = {
    'train': evaluate_predictions(train_target, ensemble_predictions['train']),
    'val': evaluate_predictions(val_target, ensemble_predictions['val']),
    'test': evaluate_predictions(test_target, ensemble_predictions['test'])
}

# Plot evaluation results
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
metrics = ['accuracy', 'f1', 'precision', 'recall']
for i, metric in enumerate(metrics):
    ax = axes[i // 2, i % 2]
    data = []
    for model in evaluation_results:
        for dataset in ['train', 'val', 'test']:
            data.append({
                'Model': model,
                'Dataset': dataset,
                'Score': evaluation_results[model][dataset][metric]
            })
    df = pd.DataFrame(data)
    sns.barplot(x='Model', y='Score', hue='Dataset', data=df, ax=ax)
    ax.set_title(f'{metric.capitalize()} Score')
    ax.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

# Save models and results
logger.info("Saving models and results...")
joblib.dump(ensemble, 'models/final_ensemble.joblib')
for name, model in base_models.items():
    joblib.dump(model, f'models/{name}_base.joblib')

with open('models/evaluation_results.json', 'w') as f:
    json.dump(evaluation_results, f, indent=4)

logger.info("Model training completed")

## 4. Backtesting and Performance Analysis

Backtest the trading strategy and analyze performance:
- Strategy implementation
- Historical simulation
- Performance metrics
- Risk analysis

In [None]:
# Initialize backtesting parameters
logger.info("Setting up backtesting...")

backtest_config = {
    'initial_capital': 100000,
    'position_size': 0.1,  # 10% of portfolio per trade
    'stop_loss': 0.02,     # 2% stop loss
    'take_profit': 0.03,   # 3% take profit
    'max_positions': 5,    # Maximum number of simultaneous positions
    'transaction_costs': 0.001  # 0.1% transaction cost
}

class StrategyBacktest:
    def __init__(self, model, features, market_data, config):
        self.model = model
        self.features = features
        self.market_data = market_data
        self.config = config
        self.reset()
        
    def reset(self):
        """Reset backtest state"""
        self.portfolio_value = []
        self.positions = {}
        self.capital = self.config['initial_capital']
        self.available_capital = self.config['initial_capital']
        
    def run(self):
        """Run backtest simulation"""
        for date in self.features.index:
            # Get prediction
            features = self.features.loc[date].values.reshape(1, -1)
            prob = self.model.predict_proba(features)[0, 1]
            
            # Update existing positions
            self._update_positions(date)
            
            # Check for new position
            if prob > 0.6 and len(self.positions) < self.config['max_positions']:
                self._open_position(date)
            
            # Record portfolio value
            total_value = self._calculate_portfolio_value(date)
            self.portfolio_value.append({
                'date': date,
                'value': total_value,
                'cash': self.available_capital,
                'positions': len(self.positions)
            })
    
    def _update_positions(self, date):
        """Update existing positions"""
        current_price = self.market_data.loc[date, 'Close']
        closed_positions = []
        
        for entry_date, position in self.positions.items():
            # Check stop loss
            if current_price <= position['stop_price']:
                self._close_position(entry_date, current_price, 'stop_loss')
                closed_positions.append(entry_date)
                
            # Check take profit
            elif current_price >= position['target_price']:
                self._close_position(entry_date, current_price, 'take_profit')
                closed_positions.append(entry_date)
        
        # Remove closed positions
        for date in closed_positions:
            del self.positions[date]
    
    def _open_position(self, date):
        """Open new position"""
        current_price = self.market_data.loc[date, 'Close']
        position_value = self.available_capital * self.config['position_size']
        shares = position_value // current_price
        
        if shares > 0:
            cost = shares * current_price
            self.positions[date] = {
                'shares': shares,
                'entry_price': current_price,
                'stop_price': current_price * (1 - self.config['stop_loss']),
                'target_price': current_price * (1 + self.config['take_profit'])
            }
            self.available_capital -= cost
    
    def _calculate_portfolio_value(self, date):
        """Calculate total portfolio value"""
        current_price = self.market_data.loc[date, 'Close']
        position_value = sum(
            pos['shares'] * current_price 
            for pos in self.positions.values()
        )
        return self.available_capital + position_value
    
    def get_results(self):
        """Get backtest results"""
        results = pd.DataFrame(self.portfolio_value)
        results.set_index('date', inplace=True)
        return results

# Run backtest
logger.info("Running backtest simulation...")
backtest = StrategyBacktest(ensemble, test_features, market_data.loc[test_features.index], backtest_config)
backtest.run()
backtest_results = backtest.get_results()

# Calculate performance metrics
def calculate_performance_metrics(results):
    """Calculate trading performance metrics"""
    returns = results['value'].pct_change()
    
    return {
        'Total Return (%)': (results['value'].iloc[-1] / results['value'].iloc[0] - 1) * 100,
        'Annual Return (%)': returns.mean() * 252 * 100,
        'Volatility (%)': returns.std() * np.sqrt(252) * 100,
        'Sharpe Ratio': (returns.mean() * 252) / (returns.std() * np.sqrt(252)),
        'Max Drawdown (%)': ((results['value'] / results['value'].cummax() - 1).min()) * 100,
        'Win Rate (%)': (returns > 0).mean() * 100
    }

metrics = calculate_performance_metrics(backtest_results)

# Plot results
fig, axes = plt.subplots(3, 1, figsize=(15, 15))

# Portfolio value
backtest_results['value'].plot(ax=axes[0])
axes[0].set_title('Portfolio Value')
axes[0].set_ylabel('Value ($)')

# Number of positions
backtest_results['positions'].plot(ax=axes[1])
axes[1].set_title('Number of Active Positions')
axes[1].set_ylabel('Positions')

# Drawdown
drawdown = (backtest_results['value'] / backtest_results['value'].cummax() - 1) * 100
drawdown.plot(ax=axes[2])
axes[2].set_title('Drawdown')
axes[2].set_ylabel('Drawdown (%)')

plt.tight_layout()
plt.show()

# Print metrics
print("\nPerformance Metrics:")
print("=" * 50)
for metric, value in metrics.items():
    print(f"{metric}: {value:.2f}")

# Save results
logger.info("Saving backtest results...")
backtest_results.to_parquet('results/backtest_results.parquet')
with open('results/performance_metrics.json', 'w') as f:
    json.dump(metrics, f, indent=4)

logger.info("Backtesting completed")

## 5. Monitoring and Deployment Preparation

Prepare the model for deployment and set up monitoring:
- Model export
- Performance monitoring setup
- Alert configuration
- Deployment checklist

In [None]:
# Set up monitoring thresholds and configurations
monitoring_config = {
    'performance_thresholds': {
        'min_accuracy': 0.55,
        'min_sharpe': 1.0,
        'max_drawdown': -0.15,
        'min_win_rate': 0.52
    },
    'data_quality_thresholds': {
        'max_missing_data': 0.05,
        'max_data_delay': 300,  # seconds
        'min_confidence': 0.60
    },
    'alert_levels': {
        'warning': 'yellow',
        'critical': 'red'
    }
}

class ModelMonitor:
    def __init__(self, config, monitoring_config):
        self.config = config
        self.monitoring_config = monitoring_config
        self.metrics_history = []
        
    def check_model_health(self, predictions, actual):
        """Check model prediction quality"""
        metrics = {
            'timestamp': datetime.now(),
            'accuracy': accuracy_score(actual, predictions > 0.5),
            'confidence': predictions.mean(),
            'prediction_spread': predictions.std()
        }
        self.metrics_history.append(metrics)
        return metrics
    
    def check_data_quality(self, features):
        """Check input data quality"""
        return {
            'missing_rate': features.isnull().mean().mean(),
            'feature_correlation': features.corr().mean().mean(),
            'feature_variance': features.var().mean()
        }
    
    def generate_monitoring_report(self):
        """Generate comprehensive monitoring report"""
        return {
            'model_performance': pd.DataFrame(self.metrics_history),
            'alerts': self.check_alerts(),
            'recommendations': self.generate_recommendations()
        }
    
    def check_alerts(self):
        """Check for alert conditions"""
        alerts = []
        if not self.metrics_history:
            return alerts
        
        latest = self.metrics_history[-1]
        thresholds = self.monitoring_config['performance_thresholds']
        
        if latest['accuracy'] < thresholds['min_accuracy']:
            alerts.append({
                'level': 'critical',
                'message': 'Model accuracy below threshold',
                'value': latest['accuracy']
            })
            
        return alerts
    
    def generate_recommendations(self):
        """Generate model improvement recommendations"""
        recommendations = []
        metrics_df = pd.DataFrame(self.metrics_history)
        
        if len(metrics_df) > 0:
            if metrics_df['accuracy'].tail(10).mean() < 0.6:
                recommendations.append("Consider retraining model")
            if metrics_df['confidence'].tail(10).mean() < 0.7:
                recommendations.append("Review feature importance")
                
        return recommendations

# Create deployment package
logger.info("Preparing deployment package...")

deployment_package = {
    'model': {
        'ensemble_path': 'models/final_ensemble.joblib',
        'base_models': {
            name: f'models/{name}_base.joblib'
            for name in base_models
        },
        'scaler_path': 'models/feature_scaler.joblib'
    },
    'metadata': {
        'creation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'features': selected_features.columns.tolist(),
        'model_version': '1.0.0',
        'performance_metrics': metrics,
        'monitoring_config': monitoring_config
    },
    'configuration': {
        'prediction_threshold': 0.6,
        'update_frequency': '24h',
        'batch_size': 100
    }
}

# Initialize monitoring
monitor = ModelMonitor(config, monitoring_config)

# Generate sample monitoring report
test_predictions = ensemble.predict_proba(test_features)[:, 1]
monitor.check_model_health(test_predictions, test_target)
data_quality = monitor.check_data_quality(test_features)
monitoring_report = monitor.generate_monitoring_report()

# Plot monitoring metrics
if len(monitoring_report['model_performance']) > 0:
    plt.figure(figsize=(15, 5))
    monitoring_report['model_performance'].set_index('timestamp')['accuracy'].plot()
    plt.title('Model Accuracy Over Time')
    plt.ylabel('Accuracy')
    plt.grid(True)
    plt.show()

# Save deployment artifacts
logger.info("Saving deployment artifacts...")

with open('models/deployment_package.json', 'w') as f:
    json.dump(deployment_package, f, indent=4)

monitoring_report['model_performance'].to_csv('results/monitoring_metrics.csv')

# Print deployment checklist
print("\nDeployment Checklist:")
print("=" * 50)
print("1. Model Artifacts:")
print("   - Ensemble model saved")
print("   - Base models saved")
print("   - Feature scaler saved")
print("   - Model metadata documented")

print("\n2. Performance Validation:")
print("   - Backtesting completed")
print("   - Performance metrics calculated")
print("   - Risk metrics assessed")

print("\n3. Monitoring Setup:")
print("   - Performance thresholds configured")
print("   - Data quality checks implemented")
print("   - Alert system configured")

print("\n4. Next Steps:")
print("   - Set up model API")
print("   - Configure data pipeline")
print("   - Implement monitoring dashboard")
print("   - Establish retraining schedule")

logger.info("Pipeline orchestration completed")