Feature Engineering Pipeline Master
====================================
Sistema principal para el procesamiento de features multimodales para Alzheimer

Autor: [Abraham Tartalos](www.linkedin.com/in/abrahamtartalos "Ir al perfíl de LinkedIn de Abraham Tartalos")

Fecha: Mayo 2025

Fase: 3 - Feature Engineering y Selección

---

## Importar Librerías Necesarias

In [None]:
import pandas as pd
import numpy as np
import logging
from pathlib import Path
import warnings
from typing import Dict, List, Tuple, Optional
import json
from datetime import datetime

# Importar módulos específicos de feature engineering
from fe_demographics import DemographicsFeatureEngineering
from fe_genetics import GeneticsFeatureEngineering
from fe_neuroimaging import NeuroimagingFeatureEngineering
from fe_biomarkers import BiomarkersFeatureEngineering
from fe_clinical import ClinicalFeatureEngineering
from fe_synthetic_activity_sleep import ActivitySleepFeatureEngineering

warnings.filterwarnings('ignore')

## Clase FeatureEngineeringPipeline:

In [None]:
class FeatureEngineeringPipeline:
    """
    Pipeline maestro para feature engineering multimodal en Alzheimer
    
    Integra todas las modalidades de datos y genera features finales
    incluyendo combinaciones inter-modalidad para score de riesgo compuesto.
    """
    
    def __init__(self, config_path: Optional[str] = None):
        """
        Inicializar pipeline de feature engineering
        
        Args:
            config_path: Ruta al archivo de configuración (opcional)
        """
        self.setup_logging()
        self.config = self._load_config(config_path)
        self.feature_processors = {}
        self.feature_stats = {}
        self.temporal_features = {}
        
        # Inicializar procesadores por modalidad
        self._initialize_processors()
        
    def setup_logging(self):
        """Configurar sistema de logging"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('feature_engineering.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
    def _load_config(self, config_path: Optional[str]) -> Dict:
        """Cargar configuración del pipeline"""
        default_config = {
            'input_path': '../data/processed/integrated/multimodal_alzheimer_dataset.csv',
            'metadata_path': '../data/processed/integrated/dataset_metadata.json',
            'output_path': '../data/processed/features/',
            'temporal_window_days': 365,
            'missing_threshold': 0.8,
            'correlation_threshold': 0.95,
            'feature_selection_methods': ['univariate', 'recursive', 'clinical_relevance'],
            'risk_score_components': {
                'cognitive': 0.3,
                'biomarker': 0.25,
                'neuroimaging': 0.2,
                'genetic': 0.15,
                'activity_sleep': 0.1
            }
        }
        
        if config_path and Path(config_path).exists():
            with open(config_path, 'r') as f:
                user_config = json.load(f)
                default_config.update(user_config)
                
        return default_config
        
    def _initialize_processors(self):
        """Inicializar procesadores específicos por modalidad"""
        self.feature_processors = {
            'demographics': DemographicsFeatureEngineering(),
            'genetics': GeneticsFeatureEngineering(),
            'neuroimaging': NeuroimagingFeatureEngineering(),
            'biomarkers': BiomarkersFeatureEngineering(),
            'clinical': ClinicalFeatureEngineering(),
            'activity_sleep': ActivitySleepFeatureEngineering()
        }
        
    def load_data(self) -> pd.DataFrame:
        """
        Cargar dataset integrado
        
        Returns:
            DataFrame con datos multimodales integrados
        """
        self.logger.info("🔄 Cargando dataset integrado...")
        
        try:
            df = pd.read_csv(self.config['input_path'])
            self.logger.info(f"✅ Dataset cargado: {df.shape[0]} registros, {df.shape[1]} variables")
            
            # Cargar metadatos
            if Path(self.config['metadata_path']).exists():
                with open(self.config['metadata_path'], 'r') as f:
                    self.metadata = json.load(f)
                    
            return df
            
        except Exception as e:
            self.logger.error(f"❌ Error cargando datos: {str(e)}")
            raise
            
    def preprocess_temporal_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Preprocesar información temporal para features longitudinales
        
        Args:
            df: DataFrame con datos originales
            
        Returns:
            DataFrame con features temporales agregadas
        """
        self.logger.info("⏰ Procesando features temporales...")
        
        temporal_cols = ['VISIT_DATE', 'VISDATE', 'DAYS_SINCE_BASELINE']
        df_temporal = df.copy()
        
        # Convertir fechas
        for col in temporal_cols:
            if col in df_temporal.columns:
                df_temporal[col] = pd.to_datetime(df_temporal[col], errors='coerce')
                
        # Crear features temporales por sujeto
        if 'PTID' in df_temporal.columns:
            temporal_features = []
            
            for ptid in df_temporal['PTID'].unique():
                subject_data = df_temporal[df_temporal['PTID'] == ptid].copy()
                
                if len(subject_data) > 1:
                    # Ordenar por fecha de visita
                    if 'VISIT_DATE' in subject_data.columns:
                        subject_data = subject_data.sort_values('VISIT_DATE')
                        
                        # Features de tendencia temporal
                        subject_data['visit_number'] = range(1, len(subject_data) + 1)
                        subject_data['days_between_visits'] = subject_data['VISIT_DATE'].diff().dt.days
                        subject_data['total_follow_up_days'] = (
                            subject_data['VISIT_DATE'].max() - subject_data['VISIT_DATE'].min()
                        ).days
                        
                temporal_features.append(subject_data)
                
            if temporal_features:
                df_temporal = pd.concat(temporal_features, ignore_index=True)
                
        self.temporal_features = {
            'visit_frequency': df_temporal.groupby('PTID').size().to_dict(),
            'follow_up_duration': df_temporal.groupby('PTID')['total_follow_up_days'].first().to_dict()
        }
        
        return df_temporal
        
    def process_features_by_modality(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Procesar features específicas por modalidad
        
        Args:
            df: DataFrame con datos temporales procesados
            
        Returns:
            DataFrame con features procesadas por modalidad
        """
        self.logger.info("🔬 Procesando features por modalidad...")
        
        processed_dfs = []
        base_df = df[['RID', 'PTID']].drop_duplicates()  # Mantener identificadores
        
        for modality, processor in self.feature_processors.items():
            self.logger.info(f"   📊 Procesando modalidad: {modality}")
            
            try:
                # Procesar features específicas de la modalidad
                modality_features = processor.process_features(df)
                
                if modality_features is not None and not modality_features.empty:
                    # Guardar estadísticas
                    self.feature_stats[modality] = {
                        'original_features': len([col for col in df.columns if processor.identify_modality_columns(col)]),
                        'engineered_features': len(modality_features.columns) - 2,  # Excluyendo RID, PTID
                        'missing_percentage': modality_features.isnull().mean().mean() * 100
                    }
                    
                    processed_dfs.append(modality_features)
                    self.logger.info(f"   ✅ {modality}: {len(modality_features.columns)-2} features generadas")
                else:
                    self.logger.warning(f"   ⚠️ {modality}: No se generaron features")
                    
            except Exception as e:
                self.logger.error(f"   ❌ Error procesando {modality}: {str(e)}")
                continue
                
        # Integrar todas las modalidades procesadas
        result_df = base_df
        for modal_df in processed_dfs:
            result_df = result_df.merge(modal_df, on=['RID', 'PTID'], how='left')
            
        return result_df
        
    def create_composite_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Crear features compuestas inter-modalidad
        
        Args:
            df: DataFrame con features por modalidad
            
        Returns:
            DataFrame con features compuestas agregadas
        """
        self.logger.info("🔗 Creando features compuestas inter-modalidad...")
        
        df_composite = df.copy()
        
        # 1. Ratios clínicamente relevantes
        self._create_biomarker_ratios(df_composite)
        
        # 2. Scores compuestos por dominio
        self._create_domain_scores(df_composite)
        
        # 3. Features de interacción genética-clínica
        self._create_genetic_clinical_interactions(df_composite)
        
        # 4. Índices de actividad-sueño
        self._create_activity_sleep_indices(df_composite)
        
        # 5. Score de riesgo compuesto final
        self._create_composite_risk_score(df_composite)
        
        return df_composite
        
    def _create_biomarker_ratios(self, df: pd.DataFrame):
        """Crear ratios de biomarcadores clínicamente relevantes"""
        # Ratios típicos en investigación de Alzheimer
        if 'ABETA_level' in df.columns and 'TAU_level' in df.columns:
            df['abeta_tau_ratio'] = df['ABETA_level'] / (df['TAU_level'] + 1e-6)
            
        # Agregar más ratios según biomarcadores disponibles
        
    def _create_domain_scores(self, df: pd.DataFrame):
        """Crear scores compuestos por dominio cognitivo"""
        # Score cognitivo compuesto
        cognitive_cols = [col for col in df.columns if 'cognitive' in col.lower() or 'mmse' in col.lower()]
        if cognitive_cols:
            df['composite_cognitive_score'] = df[cognitive_cols].mean(axis=1, skipna=True)
            
    def _create_genetic_clinical_interactions(self, df: pd.DataFrame):
        """Crear features de interacción genética-clínica"""
        # Interacciones APOE con otros factores
        if 'APOE4_status' in df.columns:
            clinical_cols = [col for col in df.columns if 'clinical' in col.lower()]
            for col in clinical_cols[:3]:  # Limitar para evitar explosión de features
                if col in df.columns:
                    df[f'APOE4_{col}_interaction'] = df['APOE4_status'] * df[col]
                    
    def _create_activity_sleep_indices(self, df: pd.DataFrame):
        """Crear índices compuestos de actividad y sueño"""
        # Índice de calidad de sueño
        sleep_cols = [col for col in df.columns if 'sleep' in col.lower()]
        if sleep_cols:
            df['sleep_quality_index'] = df[sleep_cols].mean(axis=1, skipna=True)
            
        # Índice de actividad física
        activity_cols = [col for col in df.columns if 'activity' in col.lower() or 'step' in col.lower()]
        if activity_cols:
            df['physical_activity_index'] = df[activity_cols].mean(axis=1, skipna=True)
            
    def _create_composite_risk_score(self, df: pd.DataFrame):
        """
        Crear score de riesgo compuesto final
        
        Combina múltiples modalidades con pesos clínicamente validados
        """
        self.logger.info("🎯 Calculando Score de Riesgo Compuesto...")
        
        risk_components = {}
        weights = self.config['risk_score_components']
        
        # Componente cognitivo
        if 'composite_cognitive_score' in df.columns:
            risk_components['cognitive'] = df['composite_cognitive_score'].fillna(df['composite_cognitive_score'].median())
            
        # Componente de biomarcadores
        biomarker_cols = [col for col in df.columns if 'biomarker' in col.lower() or 'abeta' in col.lower() or 'tau' in col.lower()]
        if biomarker_cols:
            risk_components['biomarker'] = df[biomarker_cols].mean(axis=1, skipna=True).fillna(0)
            
        # Componente de neuroimagen
        neuroimaging_cols = [col for col in df.columns if 'mri' in col.lower() or 'pet' in col.lower() or 'brain' in col.lower()]
        if neuroimaging_cols:
            risk_components['neuroimaging'] = df[neuroimaging_cols].mean(axis=1, skipna=True).fillna(0)
            
        # Componente genético
        genetic_cols = [col for col in df.columns if 'genetic' in col.lower() or 'apoe' in col.lower()]
        if genetic_cols:
            risk_components['genetic'] = df[genetic_cols].mean(axis=1, skipna=True).fillna(0)
            
        # Componente actividad-sueño
        if 'sleep_quality_index' in df.columns and 'physical_activity_index' in df.columns:
            risk_components['activity_sleep'] = (df['sleep_quality_index'] + df['physical_activity_index']) / 2
            
        # Calcular score compuesto
        composite_score = pd.Series(0, index=df.index)
        
        for component, values in risk_components.items():
            if component in weights:
                # Normalizar componente a escala 0-1
                normalized_values = (values - values.min()) / (values.max() - values.min() + 1e-6)
                composite_score += weights[component] * normalized_values
                
        df['composite_risk_score'] = composite_score
        df['risk_category'] = pd.cut(composite_score, 
                                   bins=[0, 0.3, 0.6, 1.0], 
                                   labels=['Low', 'Medium', 'High'])
        
        self.logger.info(f"✅ Score de riesgo calculado - Distribución: {df['risk_category'].value_counts().to_dict()}")
        
    def feature_selection(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Selección de features relevantes
        
        Args:
            df: DataFrame con todas las features generadas
            
        Returns:
            DataFrame con features seleccionadas
        """
        self.logger.info("🎯 Iniciando selección de features...")
        
        # Remover features con muchos valores faltantes
        missing_threshold = self.config['missing_threshold']
        missing_rates = df.isnull().mean()
        features_to_keep = missing_rates[missing_rates <= missing_threshold].index.tolist()
        
        df_selected = df[features_to_keep]
        self.logger.info(f"   📊 Features removidas por valores faltantes: {len(df.columns) - len(features_to_keep)}")
        
        # Remover features altamente correlacionadas
        numeric_cols = df_selected.select_dtypes(include=[np.number]).columns
        corr_matrix = df_selected[numeric_cols].corr().abs()
        
        # Encontrar pares altamente correlacionados
        high_corr_pairs = []
        for i in range(len(corr_matrix.columns)):
            for j in range(i+1, len(corr_matrix.columns)):
                if corr_matrix.iloc[i, j] >= self.config['correlation_threshold']:
                    high_corr_pairs.append((corr_matrix.columns[i], corr_matrix.columns[j]))
                    
        # Remover una feature de cada par correlacionado
        features_to_remove = set()
        for pair in high_corr_pairs:
            # Mantener la feature con menor cantidad de valores faltantes
            missing_1 = df_selected[pair[0]].isnull().mean()
            missing_2 = df_selected[pair[1]].isnull().mean()
            
            if missing_1 > missing_2:
                features_to_remove.add(pair[0])
            else:
                features_to_remove.add(pair[1])
                
        df_selected = df_selected.drop(columns=list(features_to_remove))
        self.logger.info(f"   🔗 Features removidas por alta correlación: {len(features_to_remove)}")
        
        # Selección basada en relevancia clínica (mantener features clave)
        clinical_priority_features = [
            'composite_risk_score', 'risk_category',
            'composite_cognitive_score', 'abeta_tau_ratio',
            'sleep_quality_index', 'physical_activity_index'
        ]
        
        priority_features = [col for col in clinical_priority_features if col in df_selected.columns]
        other_features = [col for col in df_selected.columns if col not in priority_features]
        
        # Reorganizar columnas priorizando features clínicamente relevantes
        final_columns = ['RID', 'PTID'] + priority_features + other_features
        final_columns = [col for col in final_columns if col in df_selected.columns]
        
        df_final = df_selected[final_columns]
        
        self.logger.info(f"✅ Selección completada: {len(df_final.columns)} features finales")
        
        return df_final
        
    def generate_feature_report(self, df_final: pd.DataFrame) -> Dict:
        """
        Generar reporte comprehensivo de feature engineering
        
        Args:
            df_final: DataFrame con features finales
            
        Returns:
            Diccionario con estadísticas del proceso
        """
        report = {
            'timestamp': datetime.now().isoformat(),
            'dataset_info': {
                'total_records': len(df_final),
                'total_features': len(df_final.columns),
                'unique_subjects': df_final['PTID'].nunique() if 'PTID' in df_final.columns else 'N/A'
            },
            'modality_stats': self.feature_stats,
            'temporal_info': self.temporal_features,
            'composite_features': {
                'risk_score_distribution': df_final['composite_risk_score'].describe().to_dict() if 'composite_risk_score' in df_final.columns else {},
                'risk_categories': df_final['risk_category'].value_counts().to_dict() if 'risk_category' in df_final.columns else {}
            },
            'data_quality': {
                'missing_data_percentage': df_final.isnull().mean().mean() * 100,
                'complete_records': len(df_final.dropna()),
                'completeness_by_modality': {
                    modality: (1 - stats['missing_percentage']/100) * 100 
                    for modality, stats in self.feature_stats.items()
                }
            }
        }
        
        return report
        
    def save_results(self, df_final: pd.DataFrame, report: Dict):
        """
        Guardar resultados del feature engineering
        
        Args:
            df_final: DataFrame con features finales
            report: Reporte del proceso
        """
        output_path = Path(self.config['output_path'])
        output_path.mkdir(parents=True, exist_ok=True)
        
        # Guardar dataset final
        output_file = output_path / 'multimodal_features_final.csv'
        df_final.to_csv(output_file, index=False)
        self.logger.info(f"💾 Dataset final guardado: {output_file}")
        
        # Guardar reporte
        report_file = output_path / 'feature_engineering_report.json'
        with open(report_file, 'w') as f:
            json.dump(report, f, indent=2, default=str)
        self.logger.info(f"📋 Reporte guardado: {report_file}")
        
        # Guardar dataset de muestra para análisis
        sample_size = min(1000, len(df_final))
        sample_df = df_final.sample(n=sample_size, random_state=42)
        sample_file = output_path / 'multimodal_features_sample.csv'
        sample_df.to_csv(sample_file, index=False)
        self.logger.info(f"🔬 Muestra guardada: {sample_file}")
        
    def run_pipeline(self) -> Tuple[pd.DataFrame, Dict]:
        """
        Ejecutar pipeline completo de feature engineering
        
        Returns:
            Tuple con DataFrame final y reporte del proceso
        """
        self.logger.info("🚀 INICIANDO PIPELINE DE FEATURE ENGINEERING")
        self.logger.info("=" * 60)
        
        # Paso 1: Cargar datos
        df = self.load_data()
        
        # Paso 2: Procesar información temporal
        df_temporal = self.preprocess_temporal_data(df)
        
        # Paso 3: Procesar features por modalidad
        df_modality = self.process_features_by_modality(df_temporal)
        
        # Paso 4: Crear features compuestas
        df_composite = self.create_composite_features(df_modality)
        
        # Paso 5: Selección de features
        df_final = self.feature_selection(df_composite)
        
        # Paso 6: Generar reporte
        report = self.generate_feature_report(df_final)
        
        # Paso 7: Guardar resultados
        self.save_results(df_final, report)
        
        self.logger.info("=" * 60)
        self.logger.info("✅ PIPELINE DE FEATURE ENGINEERING COMPLETADO")
        self.logger.info(f"📊 Resultado: {len(df_final)} registros × {len(df_final.columns)} features")
        
        return df_final, report

## Funciópara Ejecutar el Pipeline

In [None]:
def main():
    """Función principal para ejecutar el pipeline"""
    try:
        # Inicializar y ejecutar pipeline
        pipeline = FeatureEngineeringPipeline()
        df_final, report = pipeline.run_pipeline()
        
        print("\n🎯 RESUMEN EJECUTIVO:")
        print("=" * 50)
        print(f"📊 Registros procesados: {report['dataset_info']['total_records']:,}")
        print(f"🔬 Features generadas: {report['dataset_info']['total_features']}")
        print(f"👥 Sujetos únicos: {report['dataset_info']['unique_subjects']}")
        print(f"📈 Completitud promedio: {100 - report['data_quality']['missing_data_percentage']:.1f}%")
        
        if 'composite_risk_score' in df_final.columns:
            print(f"🎯 Score de riesgo - Media: {df_final['composite_risk_score'].mean():.3f}")
            print(f"🏷️ Categorías de riesgo: {report['composite_features']['risk_categories']}")
            
        return df_final, report
        
    except Exception as e:
        logging.error(f"❌ Error en pipeline principal: {str(e)}")
        raise


## Ejecución del Pipeline

In [None]:
if __name__ == "__main__":
    df_final, report = main()

---

__Abreham Tartalos__