In [1]:
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import logging
import json
from datetime import datetime
import os
import warnings
from typing import Tuple, Dict, Any
import pickle
from scipy.stats import ks_2samp
import numpy as np

class IrisClassificationSystem:
    def __init__(self, log_dir: str = "logs", model_dir: str = "models"):
        """Initialize the classification system with logging and monitoring."""
        # Create directories if they don't exist
        os.makedirs(log_dir, exist_ok=True)
        os.makedirs(model_dir, exist_ok=True)
        self.log_dir = log_dir
        self.model_dir = model_dir

        # Setup logging
        self.setup_logging()

        # Initialize model and scaler
        self.model = None
        self.scaler = StandardScaler()
        self.feature_names = None
        self.target_names = None

        # Performance thresholds for monitoring
        self.accuracy_threshold = 0.9
        self.drift_threshold = 0.05

        # Store baseline statistics
        self.baseline_stats = {}

    def setup_logging(self):
        """Configure logging with both file and console handlers."""
        self.logger = logging.getLogger('IrisClassification')
        self.logger.setLevel(logging.INFO)

        # Create handlers
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        fh = logging.FileHandler(f'{self.log_dir}/iris_classification_{timestamp}.log')
        ch = logging.StreamHandler()

        # Create formatter
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fh.setFormatter(formatter)
        ch.setFormatter(formatter)

        # Add handlers
        self.logger.addHandler(fh)
        self.logger.addHandler(ch)

    def load_and_preprocess_data(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Load and preprocess the Iris dataset."""
        self.logger.info("Loading and preprocessing data...")

        try:
            # Load dataset
            iris = load_iris()
            X = pd.DataFrame(iris.data, columns=iris.feature_names)
            y = iris.target

            self.feature_names = iris.feature_names
            self.target_names = iris.target_names

            # Split the data
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.2, random_state=42
            )

            # Scale the features
            X_train_scaled = self.scaler.fit_transform(X_train)
            X_test_scaled = self.scaler.transform(X_test)

            # Store baseline statistics
            self.baseline_stats = {
                'feature_means': X_train_scaled.mean(axis=0).tolist(),
                'feature_stds': X_train_scaled.std(axis=0).tolist(),
                'timestamp': datetime.now().isoformat()
            }

            self.logger.info("Data preprocessing completed successfully")
            return X_train_scaled, X_test_scaled, y_train, y_test

        except Exception as e:
            self.logger.error(f"Error in data preprocessing: {str(e)}")
            raise

    def train_model(self, X_train: np.ndarray, y_train: np.ndarray):
        """Train the Random Forest model."""
        self.logger.info("Training Random Forest Classifier...")

        try:
            self.model = RandomForestClassifier(
                n_estimators=100,
                random_state=42,
                n_jobs=-1
            )
            self.model.fit(X_train, y_train)

            # Save the model
            model_path = f"{self.model_dir}/rf_model_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pkl"
            with open(model_path, 'wb') as f:
                pickle.dump((self.model, self.scaler), f)

            self.logger.info(f"Model trained and saved to {model_path}")

        except Exception as e:
            self.logger.error(f"Error in model training: {str(e)}")
            raise

    def evaluate_model(self, X_test: np.ndarray, y_test: np.ndarray) -> Dict[str, Any]:
        """Evaluate the model and log performance metrics."""
        self.logger.info("Evaluating model performance...")

        try:
            # Make predictions
            y_pred = self.model.predict(X_test)

            # Calculate metrics
            accuracy = accuracy_score(y_test, y_pred)
            class_report = classification_report(y_test, y_pred, target_names=self.target_names)
            conf_matrix = confusion_matrix(y_test, y_pred)

            # Log metrics
            metrics = {
                'accuracy': accuracy,
                'classification_report': class_report,
                'confusion_matrix': conf_matrix.tolist(),
                'timestamp': datetime.now().isoformat()
            }

            # Save metrics to file
            metrics_path = f"{self.log_dir}/metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
            with open(metrics_path, 'w') as f:
                json.dump(metrics, f, indent=4)

            self.logger.info(f"Model Accuracy: {accuracy:.4f}")
            self.logger.info(f"Classification Report:\n{class_report}")

            # Check if accuracy is below threshold
            if accuracy < self.accuracy_threshold:
                self.logger.warning(f"Model accuracy {accuracy:.4f} is below threshold {self.accuracy_threshold}")

            return metrics

        except Exception as e:
            self.logger.error(f"Error in model evaluation: {str(e)}")
            raise

    def detect_drift(self, new_data: np.ndarray) -> Dict[str, float]:
        """Detect potential data drift using Kolmogorov-Smirnov test."""
        self.logger.info("Checking for data drift...")

        try:
            # Scale new data
            new_data_scaled = self.scaler.transform(new_data)

            drift_scores = {}
            for i, feature in enumerate(self.feature_names):
                # Perform KS test
                ks_statistic, p_value = ks_2samp(
                    new_data_scaled[:, i],
                    np.random.normal(
                        self.baseline_stats['feature_means'][i],
                        self.baseline_stats['feature_stds'][i],
                        size=len(new_data_scaled)
                    )
                )

                drift_scores[feature] = {
                    'ks_statistic': float(ks_statistic),
                    'p_value': float(p_value)
                }

                # Alert if significant drift detected
                if p_value < self.drift_threshold:
                    self.logger.warning(f"Drift detected in feature {feature}: KS statistic = {ks_statistic:.4f}, p-value = {p_value:.4f}")

            return drift_scores

        except Exception as e:
            self.logger.error(f"Error in drift detection: {str(e)}")
            raise

    def predict(self, X: np.ndarray) -> np.ndarray:
        """Make predictions with logging."""
        self.logger.info("Making predictions...")

        try:
            X_scaled = self.scaler.transform(X)
            predictions = self.model.predict(X_scaled)
            probabilities = self.model.predict_proba(X_scaled)

            # Log predictions
            for i, (pred, prob) in enumerate(zip(predictions, probabilities)):
                self.logger.info(f"Prediction {i+1}: Class={self.target_names[pred]} (Probability={max(prob):.4f})")

            return predictions

        except Exception as e:
            self.logger.error(f"Error in prediction: {str(e)}")
            raise

# Usage example
if __name__ == "__main__":
    # Initialize the system
    iris_system = IrisClassificationSystem()

    # Load and preprocess data
    X_train, X_test, y_train, y_test = iris_system.load_and_preprocess_data()

    # Train the model
    iris_system.train_model(X_train, y_train)

    # Evaluate the model
    metrics = iris_system.evaluate_model(X_test, y_test)

    # Example of drift detection
    iris_system.detect_drift(load_iris().data[:50])

    # Example prediction
    sample_data = load_iris().data[:5]
    predictions = iris_system.predict(sample_data)

2024-11-25 19:18:45,579 - IrisClassification - INFO - Loading and preprocessing data...
2024-11-25 19:18:45,585 - IrisClassification - INFO - Data preprocessing completed successfully
2024-11-25 19:18:45,585 - IrisClassification - INFO - Training Random Forest Classifier...
2024-11-25 19:18:45,695 - IrisClassification - INFO - Model trained and saved to models/rf_model_20241125_191845.pkl
2024-11-25 19:18:45,696 - IrisClassification - INFO - Evaluating model performance...
2024-11-25 19:18:45,715 - IrisClassification - INFO - Model Accuracy: 1.0000
2024-11-25 19:18:45,715 - IrisClassification - INFO - Classification Report:
              precision    recall  f1-score   support

      setosa       1.00      1.00      1.00        10
  versicolor       1.00      1.00      1.00         9
   virginica       1.00      1.00      1.00        11

    accuracy                           1.00        30
   macro avg       1.00      1.00      1.00        30
weighted avg       1.00      1.00      1.0