In [1]:
# Import required libraries
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE
from imblearn.combine import SMOTEENN
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, VotingClassifier
from sklearn.svm import SVC
from sklearn.decomposition import PCA
from sklearn.calibration import CalibratedClassifierCV
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import time
import joblib
import os
from typing import Tuple, Optional

class ModelPipeline:
    def __init__(self, target_size: int = 320):
        self.scaler = StandardScaler()
        self.target_size = target_size
        self.models = {
            'elf': None,
            'mag': None
        }
        
    def load_data(self, data_dir: str = 'data') -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame]]:
        """Load and validate data from directory structure"""
        try:
            # Cargar todos los archivos CSV de ELF
            elf_files = os.listdir(os.path.join(data_dir, 'ELF'))
            elf_data = []
            for file in elf_files:
                if file.endswith('.csv'):
                    df = pd.read_csv(os.path.join(data_dir, 'ELF', file))
                    elf_data.append(df)
            data_elf = pd.concat(elf_data, ignore_index=True) if elf_data else None

            # Cargar todos los archivos CSV de MAG
            mag_files = os.listdir(os.path.join(data_dir, 'MAG'))
            mag_data = []
            for file in mag_files:
                if file.endswith('.csv'):
                    df = pd.read_csv(os.path.join(data_dir, 'MAG', file))
                    mag_data.append(df)
            data_mag = pd.concat(mag_data, ignore_index=True) if mag_data else None

            if data_elf is not None and data_mag is not None:
                print(f"Data loaded - ELF shape: {data_elf.shape}, MAG shape: {data_mag.shape}")
            return data_elf, data_mag
        except Exception as e:
            print(f"Error loading data: {e}")
            return None, None

    def standardize_samples(self, data: pd.DataFrame) -> pd.DataFrame:
        """Standardize data to target size using padding or truncation"""
        features = data.select_dtypes(include=[np.number]).columns
        data_features = data[features]
        
        if data_features.shape[1] < self.target_size:
            padding = pd.DataFrame(0, index=data_features.index, 
                                 columns=[f'padded_{i}' for i in range(data_features.shape[1], self.target_size)])
            return pd.concat([data_features, padding], axis=1)
        elif data_features.shape[1] > self.target_size:
            return data_features.iloc[:, :self.target_size]
        return data_features

    def add_features(self, X: pd.DataFrame) -> pd.DataFrame:
        """Add statistical features to the dataset"""
        print("\nAdding statistical features...")
        features = pd.DataFrame()
        features['mean'] = X.mean(axis=1)
        features['std'] = X.std(axis=1)
        features['max'] = X.max(axis=1)
        features['min'] = X.min(axis=1)
        features['median'] = X.median(axis=1)
        features['skew'] = X.skew(axis=1)
        features['kurtosis'] = X.kurtosis(axis=1)
        return pd.concat([X, features], axis=1)

    def preprocess_data(self, data: pd.DataFrame, is_training: bool = True) -> pd.DataFrame:
        """Preprocess the data including normalization and feature engineering"""
        # Separate features and labels
        if 'Label' in data.columns:
            X = data.drop('Label', axis=1)
            y = data['Label']
        else:
            X = data
            y = None

        # Standardize sample size
        X = self.standardize_samples(X)
        
        # Normalize
        if is_training:
            X = pd.DataFrame(self.scaler.fit_transform(X), columns=X.columns)
        else:
            X = pd.DataFrame(self.scaler.transform(X), columns=X.columns)

        # Add features
        X = self.add_features(X)

        if y is not None:
            return X, y
        return X

    def train_model(self, X: pd.DataFrame, y: pd.DataFrame, sensor_type: str):
        """Train the model with grid search and cross validation"""
        # Create ensemble
        clf1 = RandomForestClassifier(random_state=42)
        clf2 = GradientBoostingClassifier(random_state=42)
        clf3 = SVC(probability=True, random_state=42)

        ensemble = VotingClassifier(
            estimators=[('rf', clf1), ('gb', clf2), ('svc', clf3)],
            voting='soft'
        )

        # Parameters for grid search
        param_grid = {
            'rf__n_estimators': [100, 200],
            'rf__max_depth': [10, 20, None],
            'rf__min_samples_split': [2, 5]
        }

        # Perform grid search
        print(f"\nPerforming grid search for {sensor_type}...")
        grid_search = GridSearchCV(ensemble, param_grid, cv=5, n_jobs=-1)
        with tqdm(total=100) as pbar:
            grid_search.fit(X, y)
            for i in range(100):
                time.sleep(0.1)
                pbar.update(1)

        self.models[sensor_type] = grid_search
        return grid_search

    def evaluate_model(self, X_test: pd.DataFrame, y_test: pd.DataFrame, sensor_type: str):
        """Evaluate the model and display results"""
        model = self.models[sensor_type]
        y_pred = model.predict(X_test)
        print(f"\nEvaluation results for {sensor_type}:")
        print(classification_report(y_test, y_pred))
        
        # Crear directorio para los plots
        plots_dir = os.path.join('train_plots', sensor_type.upper())
        os.makedirs(plots_dir, exist_ok=True)
        
        # Confusion matrix
        plt.figure(figsize=(8, 6))
        cm = confusion_matrix(y_test, y_pred)
        sns.heatmap(cm, annot=True, fmt='d')
        plt.title(f'Confusion Matrix - {sensor_type}')
        plt.ylabel('True')
        plt.xlabel('Predicted')
        plt.savefig(os.path.join(plots_dir, f'confusion_matrix_{sensor_type}.png'))
        plt.close()
        
        # Guardar métricas en archivo de texto
        metrics_file = os.path.join(plots_dir, f'metrics_{sensor_type}.txt')
        with open(metrics_file, 'w') as f:
            f.write(f"Evaluation results for {sensor_type}:\n")
            f.write("\nClassification Report:\n")
            f.write(classification_report(y_test, y_pred))
            f.write(f"\nAccuracy: {(y_pred == y_test).mean():.4f}")

    def save_models(self, base_dir: str = 'model'):
        """Save trained models and scaler in appropriate directories"""
        # Guardar modelo ELF
        if self.models['elf'] is not None:
            elf_dir = os.path.join(base_dir, 'ELF')
            os.makedirs(elf_dir, exist_ok=True)
            joblib.dump(self.models['elf'], os.path.join(elf_dir, 'model_elf.joblib'))
            joblib.dump(self.scaler, os.path.join(elf_dir, 'scaler_elf.joblib'))

        # Guardar modelo MAG
        if self.models['mag'] is not None:
            mag_dir = os.path.join(base_dir, 'MAG')
            os.makedirs(mag_dir, exist_ok=True)
            joblib.dump(self.models['mag'], os.path.join(mag_dir, 'model_mag.joblib'))
            joblib.dump(self.scaler, os.path.join(mag_dir, 'scaler_mag.joblib'))

    def load_models(self, base_dir: str = 'model'):
        """Load trained models and scalers from appropriate directories"""
        # Cargar modelo ELF
        elf_model_path = os.path.join(base_dir, 'ELF', 'model_elf.joblib')
        elf_scaler_path = os.path.join(base_dir, 'ELF', 'scaler_elf.joblib')
        if os.path.exists(elf_model_path):
            self.models['elf'] = joblib.load(elf_model_path)
            if os.path.exists(elf_scaler_path):
                self.scaler = joblib.load(elf_scaler_path)

        # Cargar modelo MAG
        mag_model_path = os.path.join(base_dir, 'MAG', 'model_mag.joblib')
        mag_scaler_path = os.path.join(base_dir, 'MAG', 'scaler_mag.joblib')
        if os.path.exists(mag_model_path):
            self.models['mag'] = joblib.load(mag_model_path)
            if os.path.exists(mag_scaler_path):
                self.scaler = joblib.load(mag_scaler_path)

def main():
    # Initialize pipeline
    pipeline = ModelPipeline()

    # Load data from directory structure
    data_elf, data_mag = pipeline.load_data('data')

    if data_elf is None or data_mag is None:
        print("Error loading data. Exiting...")
        return

    # Process ELF data
    X_elf, y_elf = pipeline.preprocess_data(data_elf)
    
    # Process MAG data
    X_mag, y_mag = pipeline.preprocess_data(data_mag)

    # Apply SMOTEENN
    smote_enn = SMOTEENN(random_state=42)
    X_elf_balanced, y_elf_balanced = smote_enn.fit_resample(X_elf, y_elf)
    X_mag_balanced, y_mag_balanced = smote_enn.fit_resample(X_mag, y_mag)

    # Split data
    X_train_elf, X_test_elf, y_train_elf, y_test_elf = train_test_split(
        X_elf_balanced, y_elf_balanced, test_size=0.2, random_state=42)
    X_train_mag, X_test_mag, y_train_mag, y_test_mag = train_test_split(
        X_mag_balanced, y_mag_balanced, test_size=0.2, random_state=42)

    # Train models
    pipeline.train_model(X_train_elf, y_train_elf, 'elf')
    pipeline.train_model(X_train_mag, y_train_mag, 'mag')

    # Evaluate models
    pipeline.evaluate_model(X_test_elf, y_test_elf, 'elf')
    pipeline.evaluate_model(X_test_mag, y_test_mag, 'mag')

    # Save models in appropriate directories
    pipeline.save_models('model')

if __name__ == "__main__":
    main()

Data loaded - ELF shape: (453, 322), MAG shape: (938, 322)

Adding statistical features...

Adding statistical features...

Performing grid search for elf...


100%|██████████| 100/100 [00:26<00:00,  3.77it/s]



Performing grid search for mag...


100%|██████████| 100/100 [00:58<00:00,  1.71it/s]



Evaluation results for elf:
              precision    recall  f1-score   support

           0       0.99      0.99      0.99        76
           1       0.98      0.98      0.98        65

    accuracy                           0.99       141
   macro avg       0.99      0.99      0.99       141
weighted avg       0.99      0.99      0.99       141


Evaluation results for mag:
              precision    recall  f1-score   support

           0       0.99      1.00      0.99       137
           1       1.00      0.99      0.99       167

    accuracy                           0.99       304
   macro avg       0.99      0.99      0.99       304
weighted avg       0.99      0.99      0.99       304

