conda install -c conda-forge xgboost

pip install pandas numpy scikit-learn matplotlib seaborn joblib pyyaml

In [31]:
import pandas as pd
import numpy as np
import os
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns

In [32]:
from sklearn.model_selection import train_test_split, cross_val_score, TimeSeriesSplit
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

In [33]:
from xgboost import XGBClassifier

In [34]:
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC

In [35]:
import joblib
import yaml
import logging
from typing import Dict, Tuple, List, Any
import warnings
warnings.filterwarnings('ignore')

In [36]:
class AlertPredictionSystem:
    def __init__(self, config_path: str = 'config.yaml'):
        """Initialize the Alert Prediction System with configuration."""
        self.config = self._load_config(config_path)
        self.setup_logging()
        self.models = {}
        self.scalers = {}
        self.best_models = {}
        self.feature_importances = {}

    def _load_config(self, config_path: str) -> dict:
        """Load configuration from YAML file."""
        default_config = {
            'prediction_window': 7,
            'probability_threshold': 0.7,
            'test_size': 0.2,
            'validation_size': 0.2,
            'random_state': 42,
            'model_params': {
                'RandomForest': {
                    'n_estimators': [100, 200, 300],
                    'max_depth': [10, 20, None],
                    'min_samples_split': [2, 5, 10]
                },
                'XGBoost': {
                    'n_estimators': [100, 200],
                    'max_depth': [3, 5, 7],
                    'learning_rate': [0.01, 0.1]
                },
                'GradientBoosting': {
                    'n_estimators': [100, 200],
                    'max_depth': [3, 5],
                    'learning_rate': [0.01, 0.1]
                }
            }
        }

        try:
            with open(config_path, 'r') as f:
                config = yaml.safe_load(f)
            return {**default_config, **config}
        except FileNotFoundError:
            return default_config

    def setup_logging(self):
        """Configure logging settings."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('alert_prediction.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def load_and_preprocess_data(self, folder: str) -> pd.DataFrame:
        """Load and preprocess data from multiple HTOL machines."""
        self.logger.info("Loading and preprocessing data...")

        dfs = []
        for i in range(9, 16):
            file_name = f"HTOL-{i:02d}_alerts.csv"
            try:
                file = os.path.join(folder, file_name)
                print(file)
                df = pd.read_csv(file)
                df['machine_id'] = f'HTOL-{i:02d}'
                dfs.append(df)
            except FileNotFoundError:
                self.logger.warning(f"File not found: {file_name}")
                continue

        if not dfs:
            raise ValueError("No data files were successfully loaded")

        combined_df = pd.concat(dfs, ignore_index=True)
        combined_df['Time'] = pd.to_datetime(combined_df['Time'])
        combined_df = combined_df.sort_values(['machine_id', 'Time'])
        print(combined_df.head())

        return combined_df

    def engineer_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Engineer features for the model."""
        self.logger.info("Engineering features...")

        # Time-based features
        df['hour'] = df['Time'].dt.hour
        df['day_of_week'] = df['Time'].dt.dayofweek
        df['month'] = df['Time'].dt.month
        df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
        df['time_of_day'] = pd.cut(df['hour'],
                                 bins=[0, 6, 12, 18, 24],
                                 labels=['night', 'morning', 'afternoon', 'evening'])

        # Rolling statistics with different windows
        windows = [12, 24, 48]  # hours
        for window in windows:
            df[f'rolling_mean_{window}h'] = df.groupby('machine_id')['ChlPrs'].rolling(
                window=window, min_periods=1).mean().reset_index(0, drop=True)
            df[f'rolling_std_{window}h'] = df.groupby('machine_id')['ChlPrs'].rolling(
                window=window, min_periods=1).std().reset_index(0, drop=True)

        # Alert history features
        alert_types = ['LOW', 'MEDIUM', 'HIGH', 'SIGMA']
        for alert_type in alert_types:
            # Time since last alert
            df[f'time_since_{alert_type}'] = df.groupby('machine_id').apply(
                lambda x: x['Time'] - x[x['ALERT'] == alert_type]['Time'].shift(1)
            ).reset_index(level=0, drop=True)
            df[f'time_since_{alert_type}'] = df[f'time_since_{alert_type}'].dt.total_seconds() / 3600

            # Alert frequency in last week
            df[f'{alert_type}_freq_7d'] = df.groupby('machine_id').apply(
                lambda x: x['ALERT'].eq(alert_type).rolling('7D', on='Time').sum()
            ).reset_index(level=0, drop=True)

        return df

    def prepare_data_for_classification(self, df: pd.DataFrame, target_alert_type: str) -> Tuple[pd.DataFrame, pd.Series]:
        """Prepare data for classification."""
        self.logger.info(f"Preparing data for {target_alert_type} alert prediction...")

        # Create target variable
        df['target'] = df.groupby('machine_id').apply(
            lambda x: (x['ALERT'] == target_alert_type)
            .rolling(window=self.config['prediction_window'])
            .max()
            .shift(-self.config['prediction_window']+1)
        ).reset_index(level=0, drop=True)

        # Select features
        feature_cols = [
            'ChlPrs', 'hour', 'day_of_week', 'month', 'is_weekend',
            *[f'rolling_mean_{w}h' for w in [12, 24, 48]],
            *[f'rolling_std_{w}h' for w in [12, 24, 48]],
            *[f'time_since_{at}' for at in ['LOW', 'MEDIUM', 'HIGH', 'SIGMA']],
            *[f'{at}_freq_7d' for at in ['LOW', 'MEDIUM', 'HIGH', 'SIGMA']]
        ]

        X = df[feature_cols].copy()
        y = df['target'].fillna(0)

        return X, y

    def split_data(self, X: pd.DataFrame, y: pd.Series) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.Series, pd.Series, pd.Series]:
        """Split data into train, validation, and test sets using time-based splitting."""
        self.logger.info("Splitting data into train, validation, and test sets...")

        # First split: train + validation vs test
        X_temp, X_test, y_temp, y_test = train_test_split(
            X, y,
            test_size=self.config['test_size'],
            random_state=self.config['random_state'],
            shuffle=False  # Time-based split
        )

        # Second split: train vs validation
        validation_size = self.config['validation_size'] / (1 - self.config['test_size'])
        X_train, X_val, y_train, y_val = train_test_split(
            X_temp, y_temp,
            test_size=validation_size,
            random_state=self.config['random_state'],
            shuffle=False  # Time-based split
        )

        return X_train, X_val, X_test, y_train, y_val, y_test

    def train_and_evaluate_models(self, X_train: pd.DataFrame, X_val: pd.DataFrame, X_test: pd.DataFrame,
                                y_train: pd.Series, y_val: pd.Series, y_test: pd.Series, alert_type: str) -> Dict:
        """Train and evaluate multiple models."""
        self.logger.info(f"Training and evaluating models for {alert_type} alerts...")

        models = {
            'RandomForest': RandomForestClassifier(),
            'XGBoost': XGBClassifier(),
            'GradientBoosting': GradientBoostingClassifier()
        }

        results = {}

        # Scale features
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_val_scaled = scaler.transform(X_val)
        X_test_scaled = scaler.transform(X_test)

        for model_name, model in models.items():
            self.logger.info(f"Training {model_name}...")

            # Train model
            model.fit(X_train_scaled, y_train)

            # Make predictions
            y_val_pred = model.predict(X_val_scaled)
            y_val_prob = model.predict_proba(X_val_scaled)[:, 1]

            # Calculate metrics
            results[model_name] = {
                'model': model,
                'validation_report': classification_report(y_val, y_val_pred, output_dict=True),
                'confusion_matrix': confusion_matrix(y_val, y_val_pred),
                'roc_auc': auc(roc_curve(y_val, y_val_prob)[0], roc_curve(y_val, y_val_prob)[1])
            }

            # Store feature importances for tree-based models
            if hasattr(model, 'feature_importances_'):
                results[model_name]['feature_importances'] = dict(zip(X_train.columns,
                                                                    model.feature_importances_))

        # Select best model based on ROC AUC
        best_model_name = max(results.items(), key=lambda x: x[1]['roc_auc'])[0]
        best_model = results[best_model_name]['model']

        # Evaluate best model on test set
        y_test_pred = best_model.predict(X_test_scaled)
        y_test_prob = best_model.predict_proba(X_test_scaled)[:, 1]

        results['best_model'] = {
            'name': best_model_name,
            'model': best_model,
            'test_report': classification_report(y_test, y_test_pred, output_dict=True),
            'test_confusion_matrix': confusion_matrix(y_test, y_test_pred),
            'test_roc_auc': auc(roc_curve(y_test, y_test_prob)[0], roc_curve(y_test, y_test_prob)[1])
        }

        return results

    def visualize_results(self, results: Dict, alert_type: str):
        """Visualize model results."""
        self.logger.info(f"Visualizing results for {alert_type} alerts...")

        # Create a figure with multiple subplots
        fig = plt.figure(figsize=(20, 15))

        # 1. ROC curves comparison
        plt.subplot(2, 2, 1)
        for model_name, result in results.items():
            if model_name != 'best_model':
                fpr, tpr, _ = roc_curve(y_val, result['model'].predict_proba(X_val_scaled)[:, 1])
                plt.plot(fpr, tpr, label=f'{model_name} (AUC = {result["roc_auc"]:.2f})')
        plt.plot([0, 1], [0, 1], 'k--')
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('ROC Curves Comparison')
        plt.legend()

        # 2. Feature Importance
        plt.subplot(2, 2, 2)
        best_model_name = results['best_model']['name']
        if 'feature_importances' in results[best_model_name]:
            importances = pd.Series(results[best_model_name]['feature_importances']).sort_values(ascending=True)
            importances.plot(kind='barh')
            plt.title(f'Feature Importance ({best_model_name})')

        # 3. Confusion Matrix
        plt.subplot(2, 2, 3)
        sns.heatmap(results['best_model']['test_confusion_matrix'],
                   annot=True, fmt='d', cmap='Blues')
        plt.title(f'Confusion Matrix - Test Set ({best_model_name})')
        plt.xlabel('Predicted')
        plt.ylabel('Actual')

        # 4. Precision-Recall Trade-off
        plt.subplot(2, 2, 4)
        metrics = results['best_model']['test_report']
        plt.bar(['Precision', 'Recall', 'F1-Score'],
                [metrics['1']['precision'], metrics['1']['recall'], metrics['1']['f1-score']])
        plt.title(f'Precision-Recall Metrics - Test Set ({best_model_name})')

        plt.tight_layout()
        plt.savefig(f'model_evaluation_{alert_type}.png')
        plt.close()

    def save_model(self, model, scaler, alert_type: str):
        """Save the trained model and scaler."""
        self.logger.info(f"Saving model for {alert_type} alerts...")

        model_path = f'models/model_{alert_type}.joblib'
        scaler_path = f'models/scaler_{alert_type}.joblib'

        os.makedirs('models', exist_ok=True)
        joblib.dump(model, model_path)
        joblib.dump(scaler, scaler_path)

    def load_model(self, alert_type: str) -> Tuple[Any, Any]:
        """Load a trained model and scaler."""
        model_path = f'models/model_{alert_type}.joblib'
        scaler_path = f'models/scaler_{alert_type}.joblib'

        model = joblib.load(model_path)
        scaler = joblib.load(scaler_path)

        return model, scaler

In [37]:

class AlertPredictionAPI:
    """API class for making predictions in production."""

    def __init__(self, model_dir: str = 'models'):
        self.model_dir = model_dir
        self.models = {}
        self.scalers = {}
        self.load_models()

    def load_models(self):
        """Load all trained models and scalers."""
        alert_types = ['LOW', 'MEDIUM', 'HIGH', 'SIGMA']
        for alert_type in alert_types:
            try:
                model_path = os.path.join(self.model_dir, f'model_{alert_type}.joblib')
                scaler_path = os.path.join(self.model_dir, f'scaler_{alert_type}.joblib')

                self.models[alert_type] = joblib.load(model_path)
                self.scalers[alert_type] = joblib.load(scaler_path)
            except FileNotFoundError as e:
                logging.error(f"Failed to load model for {alert_type}: {str(e)}")
                continue

    def preprocess_data(self, data: Dict) -> pd.DataFrame:
        """Preprocess incoming data for prediction."""
        df = pd.DataFrame([data])

        # Convert timestamp if provided as string
        if 'Time' in df.columns and isinstance(df['Time'].iloc[0], str):
            df['Time'] = pd.to_datetime(df['Time'])

        # Extract time-based features
        df['hour'] = df['Time'].dt.hour
        df['day_of_week'] = df['Time'].dt.dayofweek
        df['month'] = df['Time'].dt.month
        df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)

        # Ensure all required features are present
        required_features = [
            'ChlPrs', 'hour', 'day_of_week', 'month', 'is_weekend',
            *[f'rolling_mean_{w}h' for w in [12, 24, 48]],
            *[f'rolling_std_{w}h' for w in [12, 24, 48]],
            *[f'time_since_{at}' for at in ['LOW', 'MEDIUM', 'HIGH', 'SIGMA']],
            *[f'{at}_freq_7d' for at in ['LOW', 'MEDIUM', 'HIGH', 'SIGMA']]
        ]

        missing_features = set(required_features) - set(df.columns)
        if missing_features:
            raise ValueError(f"Missing required features: {missing_features}")

        return df[required_features]

    def predict(self, data: Dict, threshold: float = 0.7) -> Dict[str, Dict[str, float]]:
        """
        Make predictions for all alert types.

        Args:
            data: Dictionary containing the required features
            threshold: Probability threshold for considering an alert likely

        Returns:
            Dictionary with predictions for each alert type
        """
        try:
            processed_data = self.preprocess_data(data)
            predictions = {}

            for alert_type in self.models:
                # Scale features
                scaled_data = self.scalers[alert_type].transform(processed_data)

                # Get probability predictions
                prob = self.models[alert_type].predict_proba(scaled_data)[0, 1]

                predictions[alert_type] = {
                    'probability': float(prob),
                    'likely_alert': bool(prob > threshold)
                }

            return predictions

        except Exception as e:
            logging.error(f"Prediction error: {str(e)}")
            raise

In [38]:

def main():
    """Example usage of the Alert Prediction System."""

    # Initialize and train the system
    system = AlertPredictionSystem()

    try:
        # Load and process data
        df = system.load_and_preprocess_data("../../../outlier_tolerance=5_grouping_time_window=200_anomaly_threshold=6_start_date=2022-01-01_end_date=2026-01-01")
        df = system.engineer_features(df)

        # Train models for each alert type
        alert_types = ['LOW', 'MEDIUM', 'HIGH', 'SIGMA']
        for alert_type in alert_types:
            # Prepare data
            X, y = system.prepare_data_for_classification(df, alert_type)
            X_train, X_val, X_test, y_train, y_val, y_test = system.split_data(X, y)

            # Train and evaluate models
            results = system.train_and_evaluate_models(
                X_train, X_val, X_test,
                y_train, y_val, y_test,
                alert_type
            )

            # Visualize results
            system.visualize_results(results, alert_type)

            # Save the best model
            best_model = results['best_model']['model']
            scaler = StandardScaler().fit(X_train)
            system.save_model(best_model, scaler, alert_type)

        # Initialize the API for production use
        api = AlertPredictionAPI()

        # Example prediction
        sample_data = {
            'Time': datetime.now(),
            'ChlPrs': 2.5,
            'rolling_mean_12h': 2.4,
            'rolling_mean_24h': 2.3,
            'rolling_mean_48h': 2.2,
            'rolling_std_12h': 0.1,
            'rolling_std_24h': 0.2,
            'rolling_std_48h': 0.3,
            'time_since_LOW': 48.0,
            'time_since_MEDIUM': 72.0,
            'time_since_HIGH': 96.0,
            'time_since_SIGMA': 120.0,
            'LOW_freq_7d': 1,
            'MEDIUM_freq_7d': 0,
            'HIGH_freq_7d': 0,
            'SIGMA_freq_7d': 0
        }

        predictions = api.predict(sample_data)
        print("\nPredictions for sample data:")
        for alert_type, pred in predictions.items():
            print(f"{alert_type}: Probability = {pred['probability']:.2f}, "
                  f"Likely Alert = {pred['likely_alert']}")

    except Exception as e:
        logging.error(f"Error in main execution: {str(e)}")
        raise

if True:
    main()

2024-11-02 18:18:44,807 - INFO - Loading and preprocessing data...


../../../outlier_tolerance=5_grouping_time_window=200_anomaly_threshold=6_start_date=2022-01-01_end_date=2026-01-01/HTOL-09_alerts.csv
../../../outlier_tolerance=5_grouping_time_window=200_anomaly_threshold=6_start_date=2022-01-01_end_date=2026-01-01/HTOL-10_alerts.csv
../../../outlier_tolerance=5_grouping_time_window=200_anomaly_threshold=6_start_date=2022-01-01_end_date=2026-01-01/HTOL-11_alerts.csv
../../../outlier_tolerance=5_grouping_time_window=200_anomaly_threshold=6_start_date=2022-01-01_end_date=2026-01-01/HTOL-12_alerts.csv
../../../outlier_tolerance=5_grouping_time_window=200_anomaly_threshold=6_start_date=2022-01-01_end_date=2026-01-01/HTOL-13_alerts.csv
../../../outlier_tolerance=5_grouping_time_window=200_anomaly_threshold=6_start_date=2022-01-01_end_date=2026-01-01/HTOL-14_alerts.csv
../../../outlier_tolerance=5_grouping_time_window=200_anomaly_threshold=6_start_date=2022-01-01_end_date=2026-01-01/HTOL-15_alerts.csv


2024-11-02 18:18:46,450 - INFO - Engineering features...


                 Time  ChlPrs  alert_index ALERT                   file_name  \
0 2024-03-14 09:50:49   32.66            0   NaN  HTOL-09-20240314095049.csv   
1 2024-03-14 09:50:49   32.63            1   NaN  HTOL-09-20240314095049.csv   
2 2024-03-14 09:50:50   32.58            2   NaN  HTOL-09-20240314095049.csv   
3 2024-03-14 09:50:51   32.69            3   NaN  HTOL-09-20240314095049.csv   
4 2024-03-14 09:50:53   32.62            4   NaN  HTOL-09-20240314095049.csv   

  machine_id  
0    HTOL-09  
1    HTOL-09  
2    HTOL-09  
3    HTOL-09  
4    HTOL-09  


2024-11-02 18:18:48,431 - ERROR - Error in main execution: invalid on specified as Time, must be a column (of DataFrame), an Index or None


ValueError: invalid on specified as Time, must be a column (of DataFrame), an Index or None