## SNP MACE & Severity Prediction Report


In [None]:
# Load libraries
import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.preprocessing import StandardScaler
from sklearn.utils import resample
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, roc_auc_score, confusion_matrix, log_loss
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neural_network import MLPClassifier
from scipy.optimize import minimize
from xgboost import  XGBClassifier
import statsmodels.api as sm
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

#### 1. Feature Selection: Lasso Bootstrapping

In [None]:
def stable_snp_selection(data_path, target_col, n_bootstrap=100):
    df = pd.read_csv(data_path)
    
    # Genetic features only
    X_cols = [c for c in df.columns if c.startswith('SNP') or c == 'Variant.Pathogene']
    X = df[X_cols]
    y = df[target_col].astype(int) 
    
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    X_scaled = pd.DataFrame(X_scaled, columns=X_cols)
    
    # C grid for stability selection
    C_grid = [0.1, 0.2, 0.5, 1.0]
    
    all_selections = np.zeros(len(X_cols))
    
    print(f"Running Stability Selection for {target_col} ({n_bootstrap} bootstraps)...")
    
    for b in tqdm(range(n_bootstrap)):
        # Resample with stratification to handle any class imbalance
        X_res, y_res = resample(X_scaled, y, stratify=y, random_state=b)
        
        bootstrap_selection = np.zeros(len(X_cols), dtype=bool)
        for C_val in C_grid:
    
            clf = LogisticRegression(penalty='l1', solver='liblinear', C=C_val, 
                                     random_state=b, multi_class='ovr')
            clf.fit(X_res, y_res)
            
            # Non-zero in any class
            selected_mask = np.any(np.abs(clf.coef_) > 1e-4, axis=0)
            bootstrap_selection |= selected_mask
            
        all_selections += bootstrap_selection
                    
    stability_scores = all_selections / n_bootstrap
    
    results = pd.DataFrame({
        'SNP': X_cols,
        'Stability_Score': stability_scores
    }).sort_values(by='Stability_Score', ascending=False)
    
    return results


#### 2. Genetic Feature Engineering: ADR Modeling

In [None]:
def categorize_snps(df, target_col, snp_prefix='SNP'):
    """
    Categorizes SNPs into Additive, Dominant, or Recessive models 
    based on AIC (Akaike Information Criterion) from Logistic Regression.
    
    Mapping rules:
    - Additive: 0, 1, 2 (No change)
    - Dominant: 0 -> 0, (1, 2) -> 1
    - Recessive: (0, 1) -> 0, 2 -> 1
    """
    
    # Identify SNP columns
    snp_cols = [col for col in df.columns if col.startswith(snp_prefix)]
    
    y = df[target_col]
    adr_results = {}
    
    # print(f"Categorizing {len(snp_cols)} SNPs against {target_col}...")
    
    for snp in tqdm(snp_cols):
        X_orig = df[snp]
        
        # 1. Additive Model (0, 1, 2)
        try:
            model_add = sm.Logit(y, sm.add_constant(X_orig)).fit(disp=0)
            aic_add = model_add.aic
        except:
            aic_add = np.inf
            
        # 2. Dominant Model (0 vs 1,2)
        X_dom = (X_orig >= 1).astype(int)
        if X_dom.nunique() > 1:
            try:
                model_dom = sm.Logit(y, sm.add_constant(X_dom)).fit(disp=0)
                aic_dom = model_dom.aic
            except:
                aic_dom = np.inf
            
        # 3. Recessive Model (0,1 vs 2)
        X_rec = (X_orig == 2).astype(int)
        if X_rec.nunique() > 1:
            try:
                model_rec = sm.Logit(y, sm.add_constant(X_rec)).fit(disp=0)
                aic_rec = model_rec.aic
            except:
                aic_rec = np.inf
            
        # Determine best model
        aics = {'Additive': aic_add, 'Dominant': aic_dom, 'Recessive': aic_rec}
        best_model = min(aics, key=aics.get)
        
        # If all models failed or are inf, default to Additive
        if aics[best_model] == np.inf:
            best_model = 'Additive'
            
        adr_results[snp] = best_model
        
    return adr_results

def apply_adr_encoding(df, adr_mapping):
    """
    Applies the ADR mapping to the dataframe.
    """
    df_encoded = df.copy()
    
    for snp, model in adr_mapping.items():
        if snp not in df_encoded.columns:
            continue
            
        if model == 'Dominant':
            df_encoded[snp] = (df_encoded[snp] >= 1).astype(int)
        elif model == 'Recessive':
            df_encoded[snp] = (df_encoded[snp] == 2).astype(int)
        # Additive remains as is (0, 1, 2)
        
    return df_encoded

#### 3. Feature Engineering

In [None]:
# Few new features created.
def engineer_clinical_features(df):
    """
    Extract structural and functional interactions from clinical data.
    """
    new_df = df.copy()
    # 1. Atrial Loading: young patients with large atrial diameter
    new_df['Atrial_Loading_Index'] = new_df['Diam_OG'] * new_df['Age_Baseline']
    # 2. Obstruction-Symptom Risk: Gradient interacting with syncope
    new_df['Obstruction_Symptom_Risk'] = new_df['Gradient'] * new_df['SYNCOPE']
    # 3. Hypertrophy to Size Ratio: Normalized wall thickness
    new_df['Hypertrophy_BSA_Ratio'] = new_df['Epaiss_max'] / (new_df['BSA'] + 1e-6)
    # 4. Pump Efficiency Index: FEVG vs Atrial Size
    new_df['Pump_Efficiency_Index'] = new_df['FEVG'] / (new_df['Diam_OG'] + 1e-6)
    # 5. Wall Stress Proxy (Pressure x Thickness)
    new_df['Wall_Stress_Proxy'] = new_df['Gradient'] * new_df['Epaiss_max']
    # 6. Non-linear Age Risk
    new_df['Age_Baseline_Sq'] = new_df['Age_Baseline'] ** 2

    # 7. Clinical Risk Thresholding (Categorization) - User Requested
    new_df['Massive_Hypertrophy'] = (new_df['Epaiss_max'] >= 30).astype(int)
    new_df['Obstruction_Level'] = (new_df['Gradient'] >= 30).astype(int)
    new_df['Heart_Failure_Warning'] = (new_df['FEVG'] < 50).astype(int)

    # 8. Additional Logical Features
    new_df['Disease_Duration'] = np.maximum(0, new_df['Age_Baseline'] - new_df['Age_Diag'])
    new_df['BMI_High_Risk'] = (new_df['BMI'] >= 30).astype(int)

    return new_df

#### 4. Final Hybrid Model

In [None]:
class HybridConsensusClassifier(BaseEstimator, ClassifierMixin):
    """
    A custom hybrid ensemble classifier that combines a Bootstrapped Linear model
    with a set of non-linear classifiers.
    """
    def __init__(self, stable_snps, n_iterations=250, weightage=None, pct_sample=0.7, random_state=42):
        self.stable_snps = stable_snps
        self.n_iterations = n_iterations
        self.weightage = weightage
        self.random_state = random_state
        self.pct_sample = pct_sample
        
        # Initialize base non-linear models
        self.non_linear_models = {
            'forest': RandomForestClassifier(n_estimators=10, random_state=random_state),
            'svm': SVC(kernel='linear', C=0.5, probability=True, random_state=random_state),
            'knn': KNeighborsClassifier(n_neighbors=len(stable_snps) if stable_snps else 10),
            'bayes': GaussianNB(),
            'xgb': XGBClassifier(n_estimators=10, random_state=random_state, eval_metric='logloss')
        }
        
    def _fit_linear_stability(self, X, y):
        """
        Implementation of bootstrapped beta coefficient estimation.
        """
        classes = np.sort(np.unique(y))
        num_classes = len(classes)
        
        # Determine sampling size based on minority class for balancing
        class_counts = pd.Series(y).value_counts()
        min_class_size = class_counts.min()
        sample_size = int(min_class_size * self.pct_sample)
        
        all_coefs = []
        all_intercepts = []
        
        print(f"Bootstrapping Linear Model parameters ({self.n_iterations} iterations)...")
        for i in tqdm(range(self.n_iterations)):
            # Create balanced subsample
            indices = []
            for cls in classes:
                cls_indices = np.where(y == cls)[0]
                picked = np.random.choice(cls_indices, sample_size, replace=False)
                indices.extend(picked)
            
            X_sub = X.iloc[indices][self.stable_snps]
            y_sub = y[indices]
            
            if num_classes > 2:
                model = LogisticRegression(solver='lbfgs', multi_class='multinomial', max_iter=1000)
            else:
                model = LogisticRegression(solver='liblinear', max_iter=1000)
            
            model.fit(X_sub, y_sub)
            all_coefs.append(model.coef_)
            all_intercepts.append(model.intercept_)
            
        self.lin_coefs_ = np.mean(all_coefs, axis=0)
        self.lin_intercept_ = np.mean(all_intercepts, axis=0)
        self.classes_ = classes
        return self

    def fit(self, X, y):
        """
        Fits all models in the ensemble.
        X: DataFrame or Array-like
        y: Array-like
        """
        if isinstance(X, np.ndarray):
            X = pd.DataFrame(X, columns=self.stable_snps if hasattr(self, 'stable_snps') else None)
        
        y_array = np.array(y)
        
        # 1. Fit Linear Model via Bootstrapping
        self._fit_linear_stability(X, y_array)
        
        # 2. Fit Non-Linear Models
        print("Fitting non-linear ensemble models...")
        X_selected = X[self.stable_snps]
        for name, model in self.non_linear_models.items():
            model.fit(X_selected, y_array)
            
        return self

    def predict_all_probas(self, X):
        """
        Returns a dictionary of probabilistic predictions from each internal model.
        """
        if isinstance(X, np.ndarray):
            X = pd.DataFrame(X, columns=self.stable_snps)
            
        X_selected = X[self.stable_snps]
        n_classes = len(self.classes_)
        
        all_probs = {}
        
        # 1. Linear Prediction (Sigmoid/Softmax using averaged weights)
        z = np.dot(X_selected, self.lin_coefs_.T) + self.lin_intercept_
        if n_classes == 2:
            # Binary case
            prob_1 = 1 / (1 + np.exp(-z)).flatten()
            lin_probs = np.vstack([1 - prob_1, prob_1]).T
        else:
            # Multiclass case (Softmax)
            exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
            lin_probs = exp_z / np.sum(exp_z, axis=1, keepdims=True)
        
        all_probs['linear'] = lin_probs
        
        # 2. Collect Non-Linear Probabilities
        for name, model in self.non_linear_models.items():
            all_probs[name] = model.predict_proba(X_selected)
            
        return all_probs

    def predict_proba(self, X):
        """
        Predicts weighted probabilities.
        """
        all_probs = self.predict_all_probas(X)
        
        # 3. Apply Weighting
        if self.weightage is None:
            # Default: Equal weightage
            weight_val = 1.0 / len(all_probs)
            self.final_weights_ = {m: weight_val for m in all_probs.keys()}
        else:
            # Normalized user weights
            total = sum(self.weightage.values())
            self.final_weights_ = {m: v / total for m, v in self.weightage.items()}
            # Any model not in weightage gets 0
            for m in all_probs.keys():
                if m not in self.final_weights_:
                    self.final_weights_[m] = 0.0

        # Aggregate weighted probabilities
        weighted_probs = np.zeros_like(all_probs['linear'])
        for name, p in all_probs.items():
            weighted_probs += self.final_weights_[name] * p
            
        return weighted_probs

    def predict(self, X):
        """
        Predicts the class with the highest weighted probability.
        """
        probs = self.predict_proba(X)
        return self.classes_[np.argmax(probs, axis=1)]

*Load data the fill any nan values present*

In [None]:
train_data_path = "cardihack_final-train.csv"
test_data_path = "cardihack_final-test.csv"

train_df = pd.read_csv(train_data_path)
test_df = pd.read_csv(test_data_path)
CLINICAL_COLS = ['Age_Baseline', 'Age_Diag', 'BMI', 'BSA', 'Genre', 'Epaiss_max', 'Gradient', 'TVNS', 'FEVG', 'ATCD_MS', 'SYNCOPE', 'Diam_OG']

for col in CLINICAL_COLS:
    fill_val = train_df[col].mean()
    train_df[col] = train_df[col].fillna(fill_val)
    test_df[col] = test_df[col].fillna(fill_val)

*SNP selection and model prediction for `OUTCOME SEVERITY` and `OUTCOME MACE`*

In [None]:
# 1. Select Stable SNPs for OUTCOME SEVERITY
severity_stability = stable_snp_selection(train_data_path, 'OUTCOME SEVERITY')
STABLE_SNPS_SEVERITY =  severity_stability[severity_stability['Stability_Score']>=0.9]['SNP'].tolist()

# 2. Apply ADR Encoding
snp_maps_sev = categorize_snps(train_df[STABLE_SNPS_SEVERITY+['OUTCOME SEVERITY']], 'OUTCOME SEVERITY')
mapped_train_data_sev = apply_adr_encoding(train_df, snp_maps_sev)
mapped_test_data_sev = apply_adr_encoding(test_df, snp_maps_sev)

# 5. Train SNP Hybrid Ensemble Component with the optimized weight.
weights_sev = {
    'linear': 0.4049, 
    'knn': 0.3404, 
    'bayes': 0.1789, 
    'xgb': 0.0758
}
model_sev = HybridConsensusClassifier(
    stable_snps=STABLE_SNPS_SEVERITY, 
    n_iterations=250, 
    weightage=weights_sev
)
model_sev.fit(mapped_train_data_sev[STABLE_SNPS_SEVERITY], mapped_train_data_sev['OUTCOME SEVERITY'])
outcome_severity_prediction = model_sev.predict_proba(mapped_test_data_sev[STABLE_SNPS_SEVERITY])[:,1]

In [None]:
# 1. Select Stable SNPs for OUTCOME MACE
mace_stability = stable_snp_selection(train_data_path, 'OUTCOME MACE')
STABLE_SNPS_MACE = mace_stability[mace_stability['Stability_Score']>=0.9]['SNP'].tolist()

# 2. Apply ADR Encoding
snp_maps_mace = categorize_snps(train_df[STABLE_SNPS_MACE+['OUTCOME MACE']], 'OUTCOME MACE')
mapped_train_mace = apply_adr_encoding(train_df, snp_maps_mace)
mapped_test_mace = apply_adr_encoding(test_df, snp_maps_mace)

# 3. Clinical Feature Engineering (Interactions)
train_v4 = engineer_clinical_features(train_df)
test_v4 = engineer_clinical_features(test_df)
FEAT_V4 = CLINICAL_COLS + [
    'Atrial_Loading_Index', 'Obstruction_Symptom_Risk', 
    'Hypertrophy_BSA_Ratio', 'Pump_Efficiency_Index', 
    'Wall_Stress_Proxy', 'Age_Baseline_Sq', 'Massive_Hypertrophy', 'Obstruction_Level','Heart_Failure_Warning',
    'Disease_Duration','BMI_High_Risk'
]

scaler = StandardScaler()
X_train_clm = scaler.fit_transform(train_v4[FEAT_V4])
X_test_clm = scaler.transform(test_v4[FEAT_V4])

# 4. Train Clinical Specialist High-Performance Models
print("Training Super Clinical Specialists v4 (Logistic, RF, MLP)...")
clm_log = LogisticRegression(multi_class='multinomial', max_iter=1000).fit(X_train_clm, train_df['OUTCOME MACE'])
clm_rf = RandomForestClassifier(n_estimators=100, max_depth=5, random_state=42).fit(X_train_clm, train_df['OUTCOME MACE'])
clm_mlp = MLPClassifier(hidden_layer_sizes=(32,16), max_iter=1000, random_state=42).fit(X_train_clm, train_df['OUTCOME MACE'])

# 5. Train SNP Hybrid Ensemble Component
print("Training SNP Hybrid Ensemble Component...")
snp_hybrid = HybridConsensusClassifier(stable_snps=STABLE_SNPS_MACE, n_iterations=250)
snp_hybrid.fit(mapped_train_mace, mapped_train_mace['OUTCOME MACE'])
snp_probs_dict = snp_hybrid.predict_all_probas(mapped_test_mace)

# 6. Global Weighted Blending (Optimized)
GLOBAL_WEIGHTS_V4 = {
    'Clm_Logistic': 0.0134, 
    'Clm_RF': 0.1595, 
    'Clm_MLP': 0.1032,
    'linear': 0.0943, 
    'forest': 0.2495, 
    'svm': 0.1074, 
    'knn': 0.0111, 
    'bayes': 0.2586, 
    'xgb': 0.0031
}

print("Applying Optimized Global Weights v4...")
clm_probs = {
    'Clm_Logistic': clm_log.predict_proba(X_test_clm), 
    'Clm_RF': clm_rf.predict_proba(X_test_clm), 
    'Clm_MLP': clm_mlp.predict_proba(X_test_clm)
}

mace_probs = np.zeros_like(clm_probs['Clm_Logistic'])
for name, w in GLOBAL_WEIGHTS_V4.items():
    p = clm_probs[name] if name.startswith('Clm') else snp_probs_dict[name]
    mace_probs += w * p

outcome_mace_prediction = np.argmax(mace_probs, axis=1)
print("MACE Prediction Complete.")

In [None]:
# Final prediction and submit.

test_df['OUTCOME SEVERITY'] = outcome_severity_prediction
test_df['OUTCOME MACE'] = outcome_mace_prediction

test_df.to_csv("Submission.csv", index=False)
print("Final predictions with Advanced Clinical Features saved to Submission.csv")