In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import copy # For deep copying weights in early stopping
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, precision_recall_fscore_support
from sklearn.metrics import roc_curve, auc, classification_report
from sklearn.preprocessing import StandardScaler, MinMaxScaler # Added MinMaxScaler for experimentation suggestion
from minisom import MiniSom
np.random.seed(42)

plt.style.use('seaborn-v0_8-whitegrid')
print("Imports and basic setup completed.")

Imports and basic setup completed.


## SLFN for Titanic Classification - Data Loading and Preprocessing

Load the Titanic dataset (downloading if necessary) and define/apply the preprocessing steps (feature engineering, imputation, encoding, scaling). Split into final train, validation, and test sets.

In [2]:

print("\n--- Task 3: SLFN for Titanic Classification ---")
# --- Load Titanic Data (Robust Loading) ---
titanic_file = 'titanic.csv'
df_titanic = None
data_source_url = 'https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv'

# Try loading locally first
if os.path.exists(titanic_file):
    try:
        df_titanic = pd.read_csv(titanic_file)
        print(f"Titanic dataset loaded successfully from local file: '{titanic_file}'.")
    except Exception as e:
        print(f"Error reading local titanic.csv: {e}. Will attempt download.")
        df_titanic = None # Ensure it's None if local read fails
# Titanic Preprocessing Function
def preprocess_titanic(df):
    """Preprocesses the Titanic DataFrame for SLFN classification."""
    print("\nPreprocessing Titanic data...")
    if df is None:
        print("Error: Input DataFrame is None. Cannot preprocess.")
        return None, None, None, None

    data = df.copy() # Work on a copy

    # Extract Title from Name
    data['Title'] = data['Name'].str.extract(r' ([A-Za-z]+)\.', expand=False)
    # Consolidate rare titles
    data['Title'] = data['Title'].replace(['Lady', 'Countess','Capt', 'Col','Don', 'Dr', 'Major', 'Rev', 'Sir', 'Jonkheer', 'Dona'], 'Rare')
    data['Title'] = data['Title'].replace(['Mlle', 'Ms'], 'Miss')
    data['Title'] = data['Title'].replace('Mme', 'Mrs')
    # Map Title to numerical values
    title_mapping = {"Mr": 1, "Miss": 2, "Mrs": 3, "Master": 4, "Rare": 5}
    data['Title'] = data['Title'].map(title_mapping)
    data['Title'] = data['Title'].fillna(0) # Fill any NaNs in Title (e.g., if regex failed)

    # Create FamilySize
    data['FamilySize'] = data['SibSp'] + data['Parch'] + 1
    # Create IsAlone feature
    data['IsAlone'] = 0
    data.loc[data['FamilySize'] == 1, 'IsAlone'] = 1

    # Handling Missing Values: Age, Embarked, Fare
    # Impute Age based on median age per Title group
    try:
      data['Age'] = data.groupby('Title')['Age'].transform(lambda x: x.fillna(x.median()))
    except KeyError as e:
       print(f"Warning: Groupby operation failed, likely due to missing 'Title' mapping for some rows. Check Title extraction/mapping. Error: {e}")
    # If any Age NaNs remain (e.g., a Title group had all NaNs), fill with global median Age
    if data['Age'].isnull().any():
        data['Age'] = data['Age'].fillna(data['Age'].median())

    # Impute Embarked with the mode
    if data['Embarked'].isnull().any():
        mode_embarked = data['Embarked'].mode()[0]
        data['Embarked'] = data['Embarked'].fillna(mode_embarked)
        print(f"Filled missing Embarked values with mode: '{mode_embarked}'")

    # Impute Fare with the median Fare
    if data['Fare'].isnull().any():
        median_fare = data['Fare'].median()
        data['Fare'] = data['Fare'].fillna(median_fare)
        print(f"Filled missing Fare values with median: {median_fare:.2f}")

    # Drop unnecessary columns
    columns_to_drop = ['Name', 'Ticket', 'Cabin', 'PassengerId', 'SibSp', 'Parch']
    # Check if columns exist before dropping
    columns_exist = [col for col in columns_to_drop if col in data.columns]
    if columns_exist:
      data = data.drop(columns_exist, axis=1)
      print(f"Dropped columns: {columns_exist}")


    # Convert Categorical Features to Numerical: Sex, Embarked
    data['Sex'] = data['Sex'].map({'male': 0, 'female': 1}).astype(int)
    data['Embarked'] = data['Embarked'].map({'S': 0, 'C': 1, 'Q': 2}).astype(int)

    # Final check for any remaining NaNs (e.g., if median imputation failed)
    if data.isnull().sum().sum() > 0:
        print(f"Warning: Found {data.isnull().sum().sum()} remaining NaN values after initial imputation. Filling with column medians.")
        data = data.fillna(data.median()) # Fill any remaining NaNs with column medians

    # Separate features (X) and target (y)
    if 'Survived' not in data.columns:
        print("Error: 'Survived' column not found in the DataFrame.")
        return None, None, None, None
    X = data.drop('Survived', axis=1)
    y = data['Survived']
    feature_names = list(X.columns)
    print(f"Features used for modeling: {feature_names}")

    # Scale numerical features
    feature_scaler = StandardScaler()
    X_scaled = feature_scaler.fit_transform(X)

    # Convert y to NumPy array
    y = y.values

    print("Preprocessing complete.")
    print(f"Scaled features shape (X): {X_scaled.shape}")
    print(f"Target labels shape (y): {y.shape}")
    return X_scaled, y, feature_scaler, feature_names

# Preprocess and Split Titanic Data
titanic_data_available = False
if df_titanic is not None:
    X_titanic_scaled, y_titanic, titanic_scaler, titanic_features = preprocess_titanic(df_titanic)

    if X_titanic_scaled is not None and y_titanic is not None:
        titanic_data_available = True
        # Split Titanic data: Initial Train -> Test, then Initial Train -> Final Train + Validation
        # Using similar 60/20/20 split as for digits
        X_titanic_train_init, X_titanic_test, y_titanic_train_init, y_titanic_test = train_test_split(
            X_titanic_scaled, y_titanic,
            test_size=0.2, random_state=42, stratify=y_titanic
        )
        X_titanic_train_final, X_titanic_val, y_titanic_train_final, y_titanic_val = train_test_split(
            X_titanic_train_init, y_titanic_train_init,
            test_size=0.25, # 0.25 * 0.8 = 0.2 -> 20% validation set
            random_state=42, stratify=y_titanic_train_init
        )
        print(f"\nTitanic data split:")
        print(f"  Final Train set: {X_titanic_train_final.shape[0]} samples")
        print(f"  Validation set:  {X_titanic_val.shape[0]} samples")
        print(f"  Test set:        {X_titanic_test.shape[0]} samples")
        print("-" * 30)
    else:
        print("\nSkipping Titanic SLFN task due to preprocessing failure.")
else:
    print("\nSkipping Titanic preprocessing and SLFN task due to data loading failure.")


--- Task 3: SLFN for Titanic Classification ---
Titanic dataset loaded successfully from local file: 'titanic.csv'.

Preprocessing Titanic data...
Filled missing Embarked values with mode: 'S'
Dropped columns: ['Name', 'Ticket', 'Cabin', 'PassengerId', 'SibSp', 'Parch']
Features used for modeling: ['Pclass', 'Sex', 'Age', 'Fare', 'Embarked', 'Title', 'FamilySize', 'IsAlone']
Preprocessing complete.
Scaled features shape (X): (891, 8)
Target labels shape (y): (891,)

Titanic data split:
  Final Train set: 534 samples
  Validation set:  178 samples
  Test set:        179 samples
------------------------------


## SLFN Implementation (From Scratch - Fixed Epochs)

Define the Single Layer Feedforward Network (SLFN) class using only NumPy for core operations. This version runs for a **fixed number of epochs** as requested, removing the early stopping logic. It still includes momentum and L2 regularization.

In [3]:
# Implement SLFN from scratch
if titanic_data_available:
    class SLFN:
        def __init__(self, input_size, hidden_size, output_size=1, learning_rate=0.01,
                     momentum=0.9, l2_lambda=0.01):
            self.input_size = input_size
            self.hidden_size = hidden_size
            self.output_size = output_size
            self.learning_rate = learning_rate
            self.momentum = momentum
            self.l2_lambda = l2_lambda # L2 regularization strength

            # Initialize random weights and biases
            self.W1 = np.random.randn(input_size, hidden_size) * np.sqrt(2. / input_size)
            self.b1 = np.zeros((1, hidden_size))

            self.W2 = np.random.randn(hidden_size, output_size) * np.sqrt(1. / hidden_size)
            self.b2 = np.zeros((1, output_size))

            # Initialize velocities for Momentum optimizer
            self.v_W1 = np.zeros_like(self.W1)
            self.v_b1 = np.zeros_like(self.b1)
            self.v_W2 = np.zeros_like(self.W2)
            self.v_b2 = np.zeros_like(self.b2)

            # Placeholders for intermediate values during forward/backward pass
            self.z1, self.a1, self.z2, self.a2 = None, None, None, None

            # Lists to store training history
            self.train_losses = []
            self.val_losses = [] # Track validation loss for early stopping
            print(f"SLFN initialized:")
            print(f"  Layers: Input({input_size}) -> Hidden({hidden_size}) -> Output({output_size})")
            print(f"  Hyperparameters: LR={learning_rate}, Momentum={momentum}, L2 Lambda={l2_lambda}")

        # --- Activation Functions -----------------
        def relu(self, x):
            """ReLU activation function."""
            return np.maximum(0, x)

        def relu_derivative(self, x):
            """Derivative of ReLU activation function."""
            return np.where(x > 0, 1, 0)

        def sigmoid(self, x):
            # Clip input to avoid overflow in exp(-x) for large negative x
            clipped_x = np.clip(x, -500, 500)
            return 1 / (1 + np.exp(-clipped_x))

        # Implement the forward pass
        def forward(self, X):
            # Ensure X is a 2D array (batch_size, n_features)
            if not isinstance(X, np.ndarray): X = np.array(X)
            if X.ndim == 1: X = X.reshape(1, -1) # Handle single sample prediction

            # Layer 1 (Input to Hidden)
            self.z1 = np.dot(X, self.W1) + self.b1
            # Step 2.b (cont.): Use ReLU activation for the hidden layer
            self.a1 = self.relu(self.z1)

            # Layer 2 (Hidden to Output)
            self.z2 = np.dot(self.a1, self.W2) + self.b2
            # Step 2.c: Implement the sigmoid output layer for classification
            self.a2 = self.sigmoid(self.z2) # Output probabilities

            return self.a2

        # Step 2.d: Compute cross-entropy loss
        def compute_loss(self, y_true, y_pred, include_l2=False):
            if not isinstance(y_true, np.ndarray): y_true = np.array(y_true)
            # Ensure y_true has the same shape as y_pred (N, 1)
            y_true = y_true.reshape(y_pred.shape)
            m = y_true.shape[0] # Number of samples in the batch
            epsilon = 1e-9 # Small value to avoid log(0)

            # Clip predictions to avoid log(0) or log(1) issues
            y_pred_clipped = np.clip(y_pred, epsilon, 1 - epsilon)

            # Binary Cross-Entropy
            cross_entropy_term = - (y_true * np.log(y_pred_clipped) + (1 - y_true) * np.log(1 - y_pred_clipped))
            cross_entropy_loss = np.mean(cross_entropy_term)
            l2_penalty = 0
            if include_l2 and self.l2_lambda > 0:
                # Factor of 1/m instead of 1/(2*m) because loss is already averaged
                l2_penalty = (self.l2_lambda / 2) * (np.sum(np.square(self.W1)) + np.sum(np.square(self.W2)))

            return cross_entropy_loss + l2_penalty

        # --- Step 2.e: Implement backpropagation ---
        def backward(self, X, y):
            # Ensure inputs are NumPy arrays and correctly shaped
            if not isinstance(X, np.ndarray): X = np.array(X)
            if X.ndim == 1: X = X.reshape(1, -1)
            if not isinstance(y, np.ndarray): y = np.array(y)
            y = y.reshape(self.a2.shape) # Ensure y is (m, 1)
            m = X.shape[0] # Batch size

            # --- Calculate Gradients ---
            dz2 = (self.a2 - y) / m

            dW2 = np.dot(self.a1.T, dz2) + (self.l2_lambda / m) * self.W2
            db2 = np.sum(dz2, axis=0, keepdims=True) # Sum gradients over the batch for bias

            da1 = np.dot(dz2, self.W2.T)

            dz1 = da1 * self.relu_derivative(self.z1)

            # Add L2 regularization gradient term for weights (lambda/m * W)
            dW1 = np.dot(X.T, dz1) + (self.l2_lambda / m) * self.W1
            db1 = np.sum(dz1, axis=0, keepdims=True) # Sum gradients over the batch for bias

            # Update velocities
            self.v_W1 = self.momentum * self.v_W1 + self.learning_rate * dW1
            self.v_b1 = self.momentum * self.v_b1 + self.learning_rate * db1
            self.v_W2 = self.momentum * self.v_W2 + self.learning_rate * dW2
            self.v_b2 = self.momentum * self.v_b2 + self.learning_rate * db2

            # Update weights and biases
            self.W1 -= self.v_W1
            self.b1 -= self.v_b1
            self.W2 -= self.v_W2
            self.b2 -= self.v_b2

        def train(self, X_train, y_train, X_val, y_val, epochs=1000, batch_size=32,
                  patience=10, verbose=True, print_every=100, seed=None):
            # Set seed for reproducible shuffling
            if seed is not None:
                np.random.seed(seed)


            print(f"\nStarting SLFN training:")
            print(f"  Epochs: {epochs}, Batch Size: {batch_size}, Early Stopping Patience: {patience}, Seed: {seed}")
            if not isinstance(X_train, np.ndarray): X_train = np.array(X_train)
            if not isinstance(y_train, np.ndarray): y_train = np.array(y_train)
            m = X_train.shape[0]
            self.train_losses = []
            self.val_losses = []

            best_val_loss = np.inf
            epochs_no_improve = 0
            best_weights = None

            for epoch in range(epochs):
                epoch_train_losses = []
                indices = np.random.permutation(m)
                X_shuffled = X_train[indices]
                y_shuffled = y_train[indices]

                # Mini-batch training loop
                for i in range(0, m, batch_size):
                    # Get mini-batch
                    X_batch = X_shuffled[i:min(i + batch_size, m)]
                    y_batch = y_shuffled[i:min(i + batch_size, m)]

                    y_pred_batch = self.forward(X_batch)

                    batch_loss = self.compute_loss(y_batch, y_pred_batch, include_l2=True)
                    epoch_train_losses.append(batch_loss)

                    self.backward(X_batch, y_batch)

                # Record average training loss for the epoch
                avg_epoch_train_loss = np.mean(epoch_train_losses)
                self.train_losses.append(avg_epoch_train_loss)

                # Validation Step for Early Stopping---------
                # Calculate loss on the validation set (without L2 penalty for fair comparison)
                y_pred_val = self.forward(X_val)
                current_val_loss = self.compute_loss(y_val, y_pred_val, include_l2=False)
                self.val_losses.append(current_val_loss)

                # Calculate Train/Val Accuracy for monitoring progress
                train_acc, val_acc = 0.0, 0.0
                if verbose and (epoch % print_every == 0 or epoch == epochs - 1):
                     train_preds = (self.predict_proba(X_train) >= 0.5).astype(int).flatten()
                     val_preds = (y_pred_val >= 0.5).astype(int).flatten()
                     train_acc = np.mean(train_preds == y_train.flatten())
                     val_acc = np.mean(val_preds == y_val.flatten())
                     print(f"Epoch {epoch+1:5d}/{epochs} | Train Loss: {avg_epoch_train_loss:.6f} | Val Loss: {current_val_loss:.6f} | Train Acc: {train_acc:.4f} | Val Acc: {val_acc:.4f}")


                # Check for improvement in validation loss
                if current_val_loss < best_val_loss:
                    best_val_loss = current_val_loss
                    epochs_no_improve = 0
                    # Save the best weights found so far (use deepcopy)
                    best_weights = {
                        'W1': copy.deepcopy(self.W1), 'b1': copy.deepcopy(self.b1),
                        'W2': copy.deepcopy(self.W2), 'b2': copy.deepcopy(self.b2),
                        'v_W1': copy.deepcopy(self.v_W1), 'v_b1': copy.deepcopy(self.v_b1),
                        'v_W2': copy.deepcopy(self.v_W2), 'v_b2': copy.deepcopy(self.v_b2)
                    }
                    if verbose and epoch > patience: # Only print if improvement happens after initial phase
                       print(f"  (Improvement found! Best Val Loss: {best_val_loss:.6f})")
                else:
                    epochs_no_improve += 1

                # Trigger early stopping if validation loss hasn't improved for 'patience' epochs
                if epochs_no_improve >= patience:
                    print(f"\nEarly stopping triggered after {epoch+1} epochs.")
                    print(f"No improvement in validation loss for {patience} consecutive epochs.")
                    print(f"Best Validation Loss achieved: {best_val_loss:.6f}")
                    break # Exit training loop

            print("Training complete.")
            # Restore the best weights found during training based on validation loss
            if best_weights:
                print("Restoring best weights based on validation performance.")
                self.W1, self.b1 = best_weights['W1'], best_weights['b1']
                self.W2, self.b2 = best_weights['W2'], best_weights['b2']
                self.v_W1, self.v_b1 = best_weights['v_W1'], best_weights['v_b1']
                self.v_W2, self.v_b2 = best_weights['v_W2'], best_weights['v_b2']
            else:
                print("Warning: No best weights saved (training might have been too short or parameters unstable). Using final weights.")


        # --- Prediction Functions ---
        def predict_proba(self, X):
            """Predicts class probabilities (output of sigmoid)."""
            return self.forward(X)

        def predict(self, X, threshold=0.5):
             """Predicts class labels (0 or 1) based on the probability threshold."""
             probabilities = self.predict_proba(X)
             # Return 1 if probability >= threshold, else 0
             return (probabilities >= threshold).astype(int)

    print("SLFN class defined successfully.")
else:
    print("SLFN class definition skipped as Titanic data is not available.")

SLFN class defined successfully.
