In [1]:
# Install necessary libraries
!pip install deap optuna

import pandas as pd
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder, LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression

from deap import base, creator, tools, algorithms
import optuna

import warnings
warnings.filterwarnings("ignore")

# Suppress Optuna's trial logs for cleaner output during optimization
optuna.logging.set_verbosity(optuna.logging.WARNING)

print("Libraries installed and imported successfully.")


Libraries installed and imported successfully.


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Load the dataset
# We'll try to load a pre-processed pickle file first for speed.
# If it doesn't exist, we load the CSV and create the pickle file.
try:
    df = pd.read_pickle('../../dataPreprocessing/medical_data.pkl')
    print("Loaded data from medical_data.pkl")
except FileNotFoundError:
    print("Info: 'medical_data.pkl' not found. Attempting to load from CSV.")

# --- Data Cleaning and Preparation ---
# Replace '?' with NaN and handle potential errors
df.replace('?', np.nan, inplace=True)

# Define target and features
TARGET = 'readmitted_ind'
X = df.drop(TARGET, axis=1)
y = df[TARGET]

# Encode the target variable to be numeric (0 and 1)
le = LabelEncoder()
y = le.fit_transform(y)

# --- Feature Definition ---
# Define features to exclude based on your example
exclude_features = [
    'patient_nbr', 'encounter_id', 'diagnosis_tuple', 'readmitted', 'dummy'
]

# Filter out excluded columns that are not in the dataframe
existing_exclude_features = [col for col in exclude_features if col in X.columns]
X = X.drop(columns=existing_exclude_features)

# Identify numeric and categorical features from the remaining columns
numeric_features = X.select_dtypes(include=np.number).columns.tolist()
object_features = X.select_dtypes(include=['object']).columns.tolist()

# --- Preprocessing Pipeline ---
# Create a column transformer for preprocessing
preprocessor = ColumnTransformer(
    transformers=[
        ('num', MinMaxScaler(), numeric_features),
        ('cat', OneHotEncoder(handle_unknown='ignore', drop='first', sparse_output=False), object_features)
    ],
    remainder='passthrough'
)

# --- Train-Test Split ---
# Split data into a training set and a final hold-out test set
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print("\nData loaded and preprocessed.")
print(f"Training set shape: {X_train.shape}")
print(f"Test set shape: {X_test.shape}")
print(f"Number of numeric features: {len(numeric_features)}")
print(f"Number of categorical features: {len(object_features)}")


Loaded data from medical_data.pkl

Data loaded and preprocessed.
Training set shape: (62439, 183)
Test set shape: (15610, 183)
Number of numeric features: 173
Number of categorical features: 10


In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [4]:
from sklearn.base import BaseEstimator, TransformerMixin

# --- Custom Transformer for Data Type Conversion ---
# This helper class ensures that all data passed to the OneHotEncoder is of string type,
# preventing errors with mixed types (e.g., columns containing both numbers and strings).
class TypeConverter(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        return X.astype(str)

# --- Genetic Algorithm Setup ---

# The number of features to select from
# We determine this by combining the numeric and object feature lists from the previous cell.
N_FEATURES = len(numeric_features) + len(object_features)

# Define the fitness and individual creation for the GA
# We want to maximize accuracy, so weights=(1.0,)
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
# An individual is a list of bits (0 or 1) with a fitness attribute
creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()

# Attribute generator: a gene is either 0 (not selected) or 1 (selected)
toolbox.register("attr_bool", random.randint, 0, 1)

# Individual initializer: creates an individual with N_FEATURES genes
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, n=N_FEATURES)

# Population initializer
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# --- Fitness Evaluation Function ---
# This function determines how "good" a feature subset is by training a simple model.
def evaluate_features(individual):
    # Get indices of selected features (where the gene is 1)
    selected_indices = [i for i, bit in enumerate(individual) if bit == 1]

    # If the GA creates an individual with no features, its fitness is 0.
    if not selected_indices:
        return 0.0,

    # Combine all feature names to create a master list
    all_feature_names = np.array(numeric_features + object_features)
    # Select the feature names based on the individual's genes
    selected_features = all_feature_names[selected_indices].tolist()

    # Separate the selected features back into numeric and categorical types
    sel_numeric = [f for f in selected_features if f in numeric_features]
    sel_categorical = [f for f in selected_features if f in object_features]

    # Create a nested pipeline for categorical features to first convert them to strings
    categorical_transformer = Pipeline(steps=[
        ('tostring', TypeConverter()),
        ('onehot', OneHotEncoder(handle_unknown='ignore', drop='first'))
    ])

    # Create a temporary pipeline with a preprocessor for ONLY the selected features
    # This is crucial to evaluate the specific subset.
    pipeline = Pipeline([
        ('col_transformer', ColumnTransformer(
            transformers=[
                ('num', MinMaxScaler(), sel_numeric),
                ('cat', categorical_transformer, sel_categorical)
            ],
            remainder='drop' # Drop any columns that were not selected
        )),
        # Use a fast model like Logistic Regression for quick fitness evaluation
        ('classifier', LogisticRegression(random_state=42, max_iter=1000))
    ])

    # For speed, we train and evaluate on a smaller sample of the data.
    X_sample, _, y_sample, _ = train_test_split(X_train, y_train, test_size=0.8, random_state=42, stratify=y_train)
    pipeline.fit(X_sample, y_sample)
    y_pred = pipeline.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)

    # DEAP requires the fitness to be a tuple
    return accuracy,

# --- Register GA Operators ---
toolbox.register("evaluate", evaluate_features)
toolbox.register("mate", tools.cxTwoPoint) # Crossover operator
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05) # Mutation operator
toolbox.register("select", tools.selTournament, tournsize=3) # Selection operator

# --- Run the Genetic Algorithm ---
print("Starting Genetic Algorithm for feature selection...")
population = toolbox.population(n=50)
NGEN = 20 # Number of generations to run
CXPB, MUTPB = 0.5, 0.2 # Crossover and mutation probabilities

# Use a Hall of Fame to keep track of the single best individual found
hof = tools.HallOfFame(1)

# Set up statistics to track during evolution
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("min", np.min)
stats.register("max", np.max)

# Run the evolution process
algorithms.eaSimple(population, toolbox, cxpb=CXPB, mutpb=MUTPB, ngen=NGEN,
                    stats=stats, halloffame=hof, verbose=True)

# --- Get the Best Features from the Hall of Fame ---
best_individual = hof[0]
all_feature_names = np.array(numeric_features + object_features)
selected_features_mask = np.array(best_individual).astype(bool)
selected_features = all_feature_names[selected_features_mask].tolist()

print("\nGenetic Algorithm finished.")
print(f"Number of selected features: {len(selected_features)}")
print(f"Selected features: {selected_features}")

# --- Final Preprocessing with Selected Features ---
# Create the final preprocessor that will be used to prepare data for the CNN
sel_numeric_final = [f for f in selected_features if f in numeric_features]
sel_categorical_final = [f for f in selected_features if f in object_features]

# The categorical transformer pipeline to ensure data is string before one-hot encoding
final_categorical_transformer = Pipeline(steps=[
    ('tostring', TypeConverter()),
    ('onehot', OneHotEncoder(handle_unknown='ignore', drop='first', sparse_output=False))
])

final_preprocessor = ColumnTransformer(
    transformers=[
        ('num', MinMaxScaler(), sel_numeric_final),
        ('cat', final_categorical_transformer, sel_categorical_final)
    ],
    remainder='drop' # IMPORTANT: Drop columns not selected by the GA to prevent data type errors
)

# Transform the training and test sets using the final preprocessor
X_train_selected = final_preprocessor.fit_transform(X_train)
X_test_selected = final_preprocessor.transform(X_test)

# The number of input features for our CNN is now the number of columns in the processed data
INPUT_SIZE = X_train_selected.shape[1]

print(f"\nData preprocessed with selected features. New input dimension for CNN: {INPUT_SIZE}")


Starting Genetic Algorithm for feature selection...
gen	nevals	avg     	std      	min     	max     
0  	50    	0.848018	0.0587701	0.729148	0.900256
1  	37    	0.883321	0.0326461	0.743754	0.90032 
2  	26    	0.896976	0.00415124	0.879885	0.90032 
3  	33    	0.897596	0.00776622	0.843754	0.900577
4  	21    	0.899407	0.000802464	0.897694	0.900833
5  	32    	0.896947	0.0209418  	0.750416	0.900833
6  	20    	0.900309	0.000557694	0.89795 	0.901345
7  	27    	0.900263	0.0033069  	0.87745 	0.902114
8  	30    	0.897853	0.0207673  	0.752659	0.902434
9  	26    	0.901318	0.00050881 	0.900064	0.902434
10 	29    	0.898471	0.021114   	0.750865	0.902691
11 	37    	0.898315	0.0210583  	0.752723	0.902947
12 	31    	0.898803	0.0207623  	0.753555	0.902691
13 	30    	0.901572	0.00304882 	0.880525	0.902691
14 	26    	0.901488	0.00359625 	0.876618	0.902691
15 	26    	0.902122	0.000444688	0.900512	0.902691
16 	28    	0.902228	0.000517162	0.899872	0.903075
17 	36    	0.902361	0.000637534	0.899744	0.903331
18 	29

In [5]:
class SingleShotCNN(nn.Module):
    def __init__(self, input_size, num_classes, num_filters, kernel_size, dropout_p):
        super(SingleShotCNN, self).__init__()

        # Convolutional layer
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=num_filters, kernel_size=kernel_size, padding='same')
        self.relu = nn.ReLU()
        # Global Max Pooling will reduce the dimension to (batch_size, num_filters)
        self.pool = nn.AdaptiveMaxPool1d(1)
        self.dropout = nn.Dropout(dropout_p)

        # Fully connected layers
        # The input to the linear layer is num_filters because of global max pooling
        self.fc1 = nn.Linear(num_filters, 50)
        self.fc2 = nn.Linear(50, num_classes)

    def forward(self, x):
        # Reshape input for Conv1d: (batch_size, channels, sequence_length)
        # Our features are the sequence, and we have 1 channel.
        x = x.unsqueeze(1)

        x = self.conv1(x)
        x = self.relu(x)
        x = self.pool(x)

        # Flatten the output for the fully connected layer
        x = x.view(x.size(0), -1)
        x = self.dropout(x)

        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

print("PyTorch 1D CNN model defined.")


PyTorch 1D CNN model defined.


In [12]:
def objective(trial):
    # --- Hyperparameter Search Space ---
    lr = trial.suggest_float("lr", 1e-4, 1e-1, log=True)
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "RMSprop", "SGD"])
    num_filters = trial.suggest_int("num_filters", 16, 64, step=16)
    kernel_size = trial.suggest_int("kernel_size", 3, 7, step=2)
    dropout_p = trial.suggest_float("dropout_p", 0.1, 0.5)
    epochs = 15 # Fixed number of epochs for each trial for faster tuning
    
    print(f"\n--- Starting Trial {trial.number} ---")
    # --- K-Fold Cross-Validation ---
    kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    fold_accuracies = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(X_train_selected, y_train)):
        # Data for this fold
        X_train_fold, X_val_fold = X_train_selected[train_idx], X_train_selected[val_idx]
        y_train_fold, y_val_fold = y_train[train_idx], y_train[val_idx]

        # Convert to PyTorch Tensors, ensuring data is float32 to prevent errors
        X_train_tensor = torch.FloatTensor(X_train_fold.astype(np.float32))
        y_train_tensor = torch.LongTensor(y_train_fold)
        X_val_tensor = torch.FloatTensor(X_val_fold.astype(np.float32))
        y_val_tensor = torch.LongTensor(y_val_fold)

        # Create DataLoaders
        train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
        train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

        # --- Model Training and Evaluation for the Fold ---
        model = SingleShotCNN(INPUT_SIZE, num_classes=2, num_filters=num_filters, kernel_size=kernel_size, dropout_p=dropout_p).to(device)
        criterion = nn.CrossEntropyLoss()
        optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)

        # Training loop
        model.train()
        for epoch in range(epochs):
            print(f"    Epoch {epoch + 1}/{epochs}", end='\r')
            for batch_X, batch_y in train_loader:
                batch_X = batch_X.to(device)
                batch_y = batch_y.to(device)
                optimizer.zero_grad()
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()
        print(f"    Fold {fold + 1} completed.")

        # Evaluation
        model.eval()
        with torch.no_grad():
            X_val_tensor = X_val_tensor.to(device)
            val_outputs = model(X_val_tensor)
            _, predicted = torch.max(val_outputs.data, 1)
            accuracy = accuracy_score(y_val_tensor.numpy(),  predicted.cpu().numpy())
            fold_accuracies.append(accuracy)
            print(f"    Fold Accuracy: {accuracy:.4f}")

    # Return the mean accuracy across all folds
    mean_accuracy = np.mean(fold_accuracies)
    print(f"--- Trial {trial.number} Finished | Mean Accuracy: {mean_accuracy:.4f} ---")
    return np.mean(fold_accuracies)

print("Optuna objective function defined.")


Optuna objective function defined.


In [13]:
# Create and run the Optuna study
study = optuna.create_study(direction="maximize")
print("Starting Optuna hyperparameter search...")

# Increase n_trials for a more thorough search, e.g., 50 or 100
study.optimize(objective, n_trials=30)

print("\nOptuna study finished.")

# --- Display Results ---
best_trial = study.best_trial
print("\n--- Best Results ---")
print(f"Best Accuracy: {best_trial.value:.4f}")
print("Best Hyperparameters:")
for key, value in best_trial.params.items():
    print(f"  {key}: {value}")

# --- Final Evaluation ---
# Now, we train the final model on the FULL training data with the best hyperparameters
# and evaluate on the unseen test set.

print("\nTraining final model with best hyperparameters on all training data...")
best_params = best_trial.params
final_model = SingleShotCNN(
    INPUT_SIZE,
    num_classes=2,
    num_filters=best_params['num_filters'],
    kernel_size=best_params['kernel_size'],
    dropout_p=best_params['dropout_p']
).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = getattr(optim, best_params['optimizer'])(final_model.parameters(), lr=best_params['lr'])

# Create final dataloaders
X_train_tensor = torch.FloatTensor(X_train_selected)
y_train_tensor = torch.LongTensor(y_train)
X_test_tensor = torch.FloatTensor(X_test_selected)
y_test_tensor = torch.LongTensor(y_test)

final_train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
final_train_loader = DataLoader(final_train_dataset, batch_size=64, shuffle=True)

# Final training loop
final_model.train()
for epoch in range(25): # Train for a few more epochs on the full data
    for batch_X, batch_y in final_train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        optimizer.zero_grad()
        outputs = final_model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

# Final evaluation on the hold-out test set
final_model.eval()
with torch.no_grad():
    # 3. Move data tensor to the device
    test_outputs = final_model(X_test_tensor.to(device))
    _, predicted = torch.max(test_outputs.data, 1)
    # Move predicted tensor back to cpu for numpy conversion
    final_accuracy = accuracy_score(y_test_tensor.numpy(), predicted.cpu().numpy())

print("\n--- Final Model Performance ---")
print(f"Accuracy on unseen Test Set: {final_accuracy:.4f}")


Starting Optuna hyperparameter search...

--- Starting Trial 0 ---
    Fold 1 completed.
    Fold Accuracy: 0.7578
    Fold 2 completed.
    Fold Accuracy: 0.7497
    Fold 3 completed.
    Fold Accuracy: 0.7569
    Fold 4 completed.
    Fold Accuracy: 0.7565
    Fold 5 completed.
    Fold Accuracy: 0.7517
--- Trial 0 Finished | Mean Accuracy: 0.7545 ---

--- Starting Trial 1 ---
    Fold 1 completed.
    Fold Accuracy: 0.7490
    Fold 2 completed.
    Fold Accuracy: 0.7457
    Fold 3 completed.
    Fold Accuracy: 0.7497
    Fold 4 completed.
    Fold Accuracy: 0.7549
    Fold 5 completed.
    Fold Accuracy: 0.7436
--- Trial 1 Finished | Mean Accuracy: 0.7486 ---

--- Starting Trial 2 ---
    Fold 1 completed.
    Fold Accuracy: 0.5420
    Fold 2 completed.
    Fold Accuracy: 0.5420
    Fold 3 completed.
    Fold Accuracy: 0.5420
    Fold 4 completed.
    Fold Accuracy: 0.5420
    Fold 5 completed.
    Fold Accuracy: 0.5421
--- Trial 2 Finished | Mean Accuracy: 0.5420 ---

--- Starting 