In [1]:
"""
Preprocessing Pipeline for Predictive Maintenance
Handles normalization, feature selection, and sliding window generation

Author: Fatima Khadija Benzine
Week 1, Day 3
"""

'\nPreprocessing Pipeline for Predictive Maintenance\nHandles normalization, feature selection, and sliding window generation\n\nAuthor: Fatima Khadija Benzine\nWeek 1, Day 3\n'

In [2]:

import numpy as np
import pandas as pd
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import warnings
warnings.filterwarnings('ignore')

In [3]:
class DataNormalizer:
    """
    Handles data normalization for different dataset types
    """
    
    def __init__(self, method: str = 'minmax'):
        """
        Initialize normalizer
        
        Args:
            method: 'minmax' or 'standard'
        """
        self.method = method
        self.scalers = {}
        self.fitted = False
        
    def fit(self, data: pd.DataFrame, columns: List[str], 
            group_col: Optional[str] = None) -> 'DataNormalizer':
        """
        Fit normalization parameters
        
        Args:
            data: DataFrame to fit on
            columns: Columns to normalize
            group_col: Optional column to group by (e.g., 'unit' for per-unit normalization)
            
        Returns:
            self (for chaining)
        """
        if group_col is None:
            # Global normalization
            if self.method == 'minmax':
                scaler = MinMaxScaler()
            elif self.method == 'standard':
                scaler = StandardScaler()
            else:
                raise ValueError(f"Unknown method: {self.method}")
            
            scaler.fit(data[columns])
            self.scalers['global'] = scaler
            
        else:
            # Per-group normalization
            for group_id in data[group_col].unique():
                group_data = data[data[group_col] == group_id]
                
                if self.method == 'minmax':
                    scaler = MinMaxScaler()
                elif self.method == 'standard':
                    scaler = StandardScaler()
                else:
                    raise ValueError(f"Unknown method: {self.method}")
                
                scaler.fit(group_data[columns])
                self.scalers[group_id] = scaler
        
        self.fitted = True
        self.columns = columns
        self.group_col = group_col
        
        return self
    
    def transform(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Transform data using fitted parameters
        
        Args:
            data: DataFrame to transform
            
        Returns:
            Transformed DataFrame
        """
        if not self.fitted:
            raise ValueError("Normalizer must be fitted before transform")
        
        data = data.copy()
        
        if self.group_col is None:
            # Global normalization
            data[self.columns] = self.scalers['global'].transform(data[self.columns])
        else:
            # Per-group normalization
            for group_id in data[self.group_col].unique():
                mask = data[self.group_col] == group_id
                
                if group_id in self.scalers:
                    data.loc[mask, self.columns] = self.scalers[group_id].transform(
                        data.loc[mask, self.columns]
                    )
                else:
                    # Use global scaler as fallback for unseen groups
                    print(f"Warning: Group {group_id} not seen during fit, using global scaler")
                    if 'global' not in self.scalers:
                        raise ValueError(f"No scaler available for unseen group {group_id}")
                    data.loc[mask, self.columns] = self.scalers['global'].transform(
                        data.loc[mask, self.columns]
                    )
        
        return data
    
    def fit_transform(self, data: pd.DataFrame, columns: List[str], 
                     group_col: Optional[str] = None) -> pd.DataFrame:
        """
        Fit and transform in one step
        
        Args:
            data: DataFrame to fit and transform
            columns: Columns to normalize
            group_col: Optional column to group by
            
        Returns:
            Transformed DataFrame
        """
        self.fit(data, columns, group_col)
        return self.transform(data)
    
    def inverse_transform(self, data: pd.DataFrame, 
                         group_col_values: Optional[pd.Series] = None) -> pd.DataFrame:
        """
        Inverse transform normalized data back to original scale
        
        Args:
            data: Normalized DataFrame
            group_col_values: Group identifiers if using per-group normalization
            
        Returns:
            Data in original scale
        """
        if not self.fitted:
            raise ValueError("Normalizer must be fitted before inverse transform")
        
        data = data.copy()
        
        if self.group_col is None:
            # Global inverse transform
            data[self.columns] = self.scalers['global'].inverse_transform(data[self.columns])
        else:
            # Per-group inverse transform
            if group_col_values is None:
                raise ValueError("group_col_values required for per-group inverse transform")
            
            for group_id in group_col_values.unique():
                mask = group_col_values == group_id
                
                if group_id in self.scalers:
                    data.loc[mask, self.columns] = self.scalers[group_id].inverse_transform(
                        data.loc[mask, self.columns]
                    )
        
        return data
        

In [5]:
class FeatureSelector:
    """
    Handles feature selection based on variance and correlation
    """
    
    def __init__(self, variance_threshold: float = 0.01, 
                 correlation_threshold: float = 0.95):
        """
        Initialize feature selector
        
        Args:
            variance_threshold: Minimum variance to keep a feature
            correlation_threshold: Maximum correlation before removing redundant features
        """
        self.variance_threshold = variance_threshold
        self.correlation_threshold = correlation_threshold
        self.selected_features = None
        
    def select_features(self, data: pd.DataFrame, 
                       exclude_cols: List[str] = None) -> List[str]:
        """
        Select features based on variance and correlation
        
        Args:
            data: DataFrame to analyze
            exclude_cols: Columns to exclude from selection (e.g., 'unit', 'cycle', 'rul')
            
        Returns:
            List of selected feature names
        """
        if exclude_cols is None:
            exclude_cols = []
        
        # Get candidate features
        candidate_features = [col for col in data.columns if col not in exclude_cols]
        
        print(f"Starting with {len(candidate_features)} candidate features")
        
        # Step 1: Remove low variance features
        variances = data[candidate_features].var()
        high_variance_features = variances[variances > self.variance_threshold].index.tolist()
        
        print(f"After variance filtering: {len(high_variance_features)} features")
        print(f"Removed low variance: {set(candidate_features) - set(high_variance_features)}")
        
        # Step 2: Remove highly correlated features
        if len(high_variance_features) > 1:
            corr_matrix = data[high_variance_features].corr().abs()
            
            # Get upper triangle of correlation matrix
            upper_triangle = corr_matrix.where(
                np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
            )
            
            # Find features with correlation greater than threshold
            to_drop = [column for column in upper_triangle.columns 
                      if any(upper_triangle[column] > self.correlation_threshold)]
            
            selected_features = [f for f in high_variance_features if f not in to_drop]
            
            print(f"After correlation filtering: {len(selected_features)} features")
            print(f"Removed highly correlated: {to_drop}")
        else:
            selected_features = high_variance_features
        
        self.selected_features = selected_features
        return selected_features
    
    def transform(self, data: pd.DataFrame, 
                 keep_cols: List[str] = None) -> pd.DataFrame:
        """
        Transform data by keeping only selected features
        
        Args:
            data: DataFrame to transform
            keep_cols: Additional columns to keep (e.g., 'unit', 'cycle', 'rul')
            
        Returns:
            DataFrame with selected features
        """
        if self.selected_features is None:
            raise ValueError("Must call select_features before transform")
        
        if keep_cols is None:
            keep_cols = []
        
        return data[keep_cols + self.selected_features]


In [6]:
class PreprocessingPipeline:
    """
    Complete preprocessing pipeline for predictive maintenance data
    """
    
    def __init__(self, 
                 normalization_method: str = 'minmax',
                 variance_threshold: float = 0.01,
                 correlation_threshold: float = 0.95):
        """
        Initialize preprocessing pipeline
        
        Args:
            normalization_method: 'minmax' or 'standard'
            variance_threshold: Minimum variance for feature selection
            correlation_threshold: Maximum correlation for feature selection
        """
        self.normalizer = DataNormalizer(method=normalization_method)
        self.feature_selector = FeatureSelector(
            variance_threshold=variance_threshold,
            correlation_threshold=correlation_threshold
        )
        self.fitted = False
        
    def fit(self, train_data: pd.DataFrame, 
           feature_cols: List[str],
           exclude_cols: List[str],
           group_col: Optional[str] = None) -> 'PreprocessingPipeline':
        """
        Fit the preprocessing pipeline
        
        Args:
            train_data: Training data
            feature_cols: Columns to consider as features
            exclude_cols: Columns to exclude (e.g., 'unit', 'cycle', 'rul')
            group_col: Optional grouping column for per-group normalization
            
        Returns:
            self (for chaining)
        """
        print("=== Fitting Preprocessing Pipeline ===")
        
        # Step 1: Normalize
        print("\n1. Normalizing features...")
        normalized_data = self.normalizer.fit_transform(
            train_data, feature_cols, group_col
        )
        
        # Step 2: Select features
        print("\n2. Selecting features...")
        self.selected_features = self.feature_selector.select_features(
            normalized_data, exclude_cols
        )
        
        self.fitted = True
        self.exclude_cols = exclude_cols
        self.group_col = group_col
        
        print(f"\n✓ Pipeline fitted with {len(self.selected_features)} selected features")
        return self
    
    def transform(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Transform data using fitted pipeline
        
        Args:
            data: Data to transform
            
        Returns:
            Preprocessed DataFrame
        """
        if not self.fitted:
            raise ValueError("Pipeline must be fitted before transform")
        
        # Step 1: Normalize
        normalized_data = self.normalizer.transform(data)
        
        # Step 2: Select features
        processed_data = self.feature_selector.transform(
            normalized_data, keep_cols=self.exclude_cols
        )
        
        return processed_data
    
    def fit_transform(self, train_data: pd.DataFrame,
                     feature_cols: List[str],
                     exclude_cols: List[str],
                     group_col: Optional[str] = None) -> pd.DataFrame:
        """
        Fit and transform in one step
        
        Args:
            train_data: Training data
            feature_cols: Columns to consider as features
            exclude_cols: Columns to exclude
            group_col: Optional grouping column
            
        Returns:
            Preprocessed DataFrame
        """
        self.fit(train_data, feature_cols, exclude_cols, group_col)
        return self.transform(train_data)



In [7]:
# Helper function for dataset-specific preprocessing
def preprocess_cmapss(train_df: pd.DataFrame, test_df: pd.DataFrame,
                     normalization_method: str = 'minmax',
                     per_unit_normalization: bool = True) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Preprocess C-MAPSS dataset
    
    Args:
        train_df: Training DataFrame
        test_df: Test DataFrame
        normalization_method: 'minmax' or 'standard'
        per_unit_normalization: Whether to normalize per unit
        
    Returns:
        Tuple of (processed_train, processed_test)
    """
    # Define feature columns (sensors and settings)
    feature_cols = [col for col in train_df.columns 
                   if col.startswith('sensor_') or col.startswith('setting_')]
    
    exclude_cols = ['unit', 'cycle', 'rul']
    if 'operating_condition' in train_df.columns:
        exclude_cols.append('operating_condition')
    
    # Create pipeline
    pipeline = PreprocessingPipeline(
        normalization_method=normalization_method,
        variance_threshold=0.01,
        correlation_threshold=0.95
    )
    
    # Fit on training data
    group_col = 'unit' if per_unit_normalization else None
    train_processed = pipeline.fit_transform(
        train_df, feature_cols, exclude_cols, group_col
    )
    
    # Transform test data
    test_processed = pipeline.transform(test_df)
    
    return train_processed, test_processed


def preprocess_milling(train_df: pd.DataFrame, test_df: pd.DataFrame,
                      normalization_method: str = 'minmax',
                      per_case_normalization: bool = True) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Preprocess NASA Milling dataset
    
    Args:
        train_df: Training DataFrame
        test_df: Test DataFrame
        normalization_method: 'minmax' or 'standard'
        per_case_normalization: Whether to normalize per case
        
    Returns:
        Tuple of (processed_train, processed_test)
    """
    # Define feature columns (exclude identifiers and target)
    exclude_cols = ['case', 'run', 'VB', 'rul']
    feature_cols = [col for col in train_df.columns if col not in exclude_cols]
    
    # Create pipeline
    pipeline = PreprocessingPipeline(
        normalization_method=normalization_method,
        variance_threshold=0.01,
        correlation_threshold=0.95
    )
    
    # Fit on training data
    group_col = 'case' if per_case_normalization else None
    train_processed = pipeline.fit_transform(
        train_df, feature_cols, exclude_cols, group_col
    )
    
    # Transform test data
    test_processed = pipeline.transform(test_df)
    
    return train_processed, test_processed

In [8]:
import sys
from pathlib import Path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [10]:
# Add project root to path
project_root = Path().resolve().parent.parent
sys.path.append(str(project_root))

from src.data_loader import MultiDatasetLoader

In [11]:
loader = MultiDatasetLoader()

In [12]:
print("Loading FD001...")
fd001 = loader.load_fd001()
train_fd001 = fd001['train']
test_fd001 = fd001['test']

print(f"FD001 - Train: {train_fd001.shape}, Test: {test_fd001.shape}")


Loading FD001...
Loading FD001 dataset...
  Files: train=True, test=True, rul=True
  - Training data shape: (20631, 26)
  - Training units: 100
  - Training RUL range: [0, 361]
  - Test data shape: (13096, 26)
  - RUL values shape: (100, 1)
  - Test units found: 100 (units: [np.int64(1), np.int64(2), np.int64(3), np.int64(4), np.int64(5)]...)
  - RUL values provided: 100
    Unit 1: max_cycle=31, base_RUL=112
    Unit 2: max_cycle=49, base_RUL=98
    Unit 3: max_cycle=126, base_RUL=69
✓ FD001 loaded: 20631 train, 13096 test samples
FD001 - Train: (20631, 27), Test: (13096, 27)


In [13]:
print("\nLoading NASA Milling...")
milling = loader.load_nasa_milling()
train_milling = milling['train']
test_milling = milling['test']

print(f"Milling - Train: {train_milling.shape}, Test: {test_milling.shape}")



Loading NASA Milling...
Loading NASA Milling dataset...
Loaded data shape: (167, 14)
Columns: ['Unnamed: 0', 'case', 'run', 'VB', 'time', 'DOC', 'feed', 'material', 'smcAC', 'smcDC', 'vib_table', 'vib_spindle', 'AE_table', 'AE_spindle']
  Using existing column names: ['Unnamed: 0', 'case', 'run', 'VB', 'time', 'DOC', 'feed', 'material', 'smcAC', 'smcDC', 'vib_table', 'vib_spindle', 'AE_table', 'AE_spindle']
  Removed 21 rows with invalid VB values
✓ NASA Milling loaded: 110 train, 36 test samples
Milling - Train: (110, 15), Test: (36, 15)
