In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.decomposition import PCA
from sklearn.preprocessing import FunctionTransformer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.pipeline import Pipeline
from sklearn.neural_network import MLPClassifier

from time import time

pd.set_option('display.precision', 3)

prng = np.random.RandomState(20250324)

# Introduction to Neural networks

## Recap from last class: MNIST digits

In [None]:
# Load MNIST dataset from Keras
from tensorflow.keras.datasets import mnist

# Load data with predefined train/test split
(X_train_full, y_train_full), (X_test, y_test) = mnist.load_data()

# Look at the dimensions
print(f"X_train_full: {X_train_full.shape}")
print(f"y_train_full: {y_train_full.shape}")
print(f"X_test_full:  {X_test.shape}")
print(f"y_test_full:  {y_test.shape}")

fig, axs = plt.subplots(5, 5, figsize=(10,10))
for i, ax in enumerate(axs.flatten()):
    ax.imshow(X_train_full[i], cmap="binary")
    ax.axis("off")
    ax.set_title(f"Label: {y_train_full[i]}")
plt.tight_layout()
plt.show()

In [3]:
# Define preprocessing step: rescale and flatten
def preprocessImageData(X):
    return X.reshape(X.shape[0], -1) / 255

## Benchmark: logistic regression

[Note that regularization is applied by default.](https://arc.net/l/quote/ndorcdmu)

In [None]:
pipe_lr_full = Pipeline([
    ("preprocess", FunctionTransformer(preprocessImageData)),
    ("lr", LogisticRegression(max_iter=1000, random_state=prng))
])

time0 = time()
pipe_lr_full.fit(X_train_full, y_train_full)
time_full = time() - time0
print(f"Logistic regression on full data completed in {time_full:.2f} seconds")

pipe_lr_pca = Pipeline([
    ("preprocess", FunctionTransformer(preprocessImageData)),
    ("pca", PCA(n_components=0.95)),
    ("lr", LogisticRegression(max_iter=1000, random_state=prng))
])

time0 = time()
pipe_lr_pca.fit(X_train_full, y_train_full)
time_pca = time() - time0
print(f"Logistic regression on PCA data completed in {time_pca:.2f} seconds")

In [None]:
# Compare performance metrics
performance_summary_data = {
    'Metric': ['Dimensions', 'Fit time', 'Accuracy (training)', 'Accuracy (validation)'],
    'Full data': [pipe_lr_full['lr'].n_features_in_, time_full, pipe_lr_full.score(X_train_full, y_train_full), pipe_lr_full.score(X_test, y_test)],
    'PCA': [pipe_lr_pca['lr'].n_features_in_, time_pca, pipe_lr_pca.score(X_train_full, y_train_full), pipe_lr_pca.score(X_test, y_test)],
}
performance_summary_df = pd.DataFrame(performance_summary_data)
print(performance_summary_df)

### Combine unsupervised and supervised learning methods: impact of PC components on performance metrics

In [6]:
# Decrease the data to quicken up computation (split should be preserved across notebooks)
X_train, X_val, y_train, y_val = train_test_split(X_train_full, y_train_full, train_size=10000, random_state=20250324, stratify=y_train_full)

In [7]:
# Try different numbers of PCA components to see impact on accuracy
pca_components = [5, 10, 20, 30, 50, 75, 100, 150, 200, 400, 784]
accuracies = []
val_accuracies = []
times = []

# Scale separately (so that it should not run again and again)
X_scaled = pipe_lr_pca['preprocess'].transform(X_train)
X_scaled_val = pipe_lr_pca['preprocess'].transform(X_val)

for n_components in pca_components:

    pipe = Pipeline([
        ('pca', PCA(n_components=n_components)),
        ('lr', LogisticRegression(max_iter=1000, random_state=prng))
    ])

    # Fit the model
    t0 = time()
    pipe.fit(X_scaled, y_train)
    time_fit = time() - t0
    
    # Calculate accuracy
    acc = accuracy_score(y_train, pipe.predict(X_scaled))
    validation_acc = accuracy_score(y_val, pipe.predict(X_scaled_val))
    
    # Store results
    times.append(time_fit)
    accuracies.append(acc)
    val_accuracies.append(validation_acc)

In [None]:
best_n_pca = pca_components[np.argmax(val_accuracies)]
print(f"Best validation accuracy is achieved with {best_n_pca} components.")

In [None]:
# Visualize the results

fig, ax = plt.subplots(2, 1, figsize=(10, 10))

# First plot: times by PCA components
ax[0].plot(pca_components, times, marker='o', color='darkblue')
ax[0].set_title('Time to Fit Model by Number of PCA Components')
ax[0].set_xlabel('Number of PCA Components')
ax[0].set_ylabel('Time (seconds)')

# Second plot: accuracies and val_accuracies by PCA components
ax[1].plot(pca_components, accuracies, marker='o', color='darkgreen', label='Training Accuracy')
ax[1].plot(pca_components, val_accuracies, marker='o', color='darkred', label='Validation Accuracy')
ax[1].set_title('Accuracy by Number of PCA Components')
ax[1].set_xlabel('Number of PCA Components')
ax[1].set_ylabel('Accuracy')
ax[1].legend()
ax[1].axvline(x=best_n_pca, color='gray', linestyle='--', linewidth=1)


plt.tight_layout()
plt.show()

### Store results (smaller dataset)

In [10]:
class ResultCollector:
    def __init__(self):
        self.results = {}
        
    def add_model(self, name, train_acc, test_acc):
        """Add or update a model's results."""
        self.results[name] = {
            'Train Accuracy': train_acc,
            'Test Accuracy': test_acc
        }
        return self.get_table()
    
    def get_table(self):
        """Get the results table with optional styling."""
        df = pd.DataFrame(self.results).T
        return df.style.format("{:.3f}").background_gradient(cmap='RdYlGn', axis=None)

In [11]:
# Refit the full Logistic Regression pipeline on the sampled data
pipe_lr_full.fit(X_train, y_train)
train_accuracy = accuracy_score(pipe_lr_full.predict(X_train), y_train)
test_accuracy = accuracy_score(pipe_lr_full.predict(X_val), y_val)

In [None]:
results = ResultCollector()
results.add_model("Logistic Regression", train_accuracy, test_accuracy)

In [None]:
pipe_pca = Pipeline([
    ("preprocess", FunctionTransformer(preprocessImageData)),
    ("pca", PCA(n_components=best_n_pca)),
    ("lr", LogisticRegression(max_iter=1000, random_state=prng))
])
pipe_pca.fit(X_train, y_train)
train_accuracy = accuracy_score(pipe_pca.predict(X_train), y_train)
test_accuracy = accuracy_score(pipe_pca.predict(X_val), y_val)

results.add_model("Logistic Regression with PCA", train_accuracy, test_accuracy)


## Benchmark #2: Random Forest (state-of-the-art)

**TODO**: Train a default Random Forest model and as another benchmark. Call the resulting model/pipeline as `pipe_rf`.

In [None]:
def plotConfusionMatrix(true_y, predicted_labels):
    conf_matrix = confusion_matrix(true_y, predicted_labels)

    plt.figure(figsize=(8, 6))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
    plt.title("Confusion Matrix: KMeans Clusters vs True Labels")
    plt.xlabel("Predicted Clusters")
    plt.ylabel("True Labels")

plotConfusionMatrix(y_val, pipe_rf.predict(X_val))


### Evaluation metrics for classification problems

In [None]:
pipe_rf["rf"].get_params()['criterion']

#### Gini impurity

Gini impurity is a measure of how often a randomly chosen element from the set would be incorrectly labeled if it were randomly labeled according to the distribution of labels in the subset. It's a measure of the "purity" of a node.

$$
Gini = 1 - \sum p_i^2
$$

where $p_i$ is the probability (proportion) of samples that belong to class $i$. For completely pure nodes, Gini impurity is 0. The maximum score is achieved when a node contains samples from all classes in equal proportions. Note the similarity to Gini index, a measure for economic inequality, also ranging between 0 and 1, and which reaches 0 at perfect equality when everyone has the same income.


#### Log-loss

An alternative measure is **log-loss (or cross-entropy)**. It can be understood intuitively as a measure of "surprise" about predictions. E.g., if you predict rain with 90% confidence, you feel minimal surprise if it indeed rains (low loss) while you would be very surprised if it doesn't rain (high loss). If you predict rain with 50%, either outcome comes with some surprise (medium loss).

$$
-\sum y_i \log(p_i)
$$

## Sklearn MLP

[This model optimizes the log-loss function using (...) stochastic gradient descent.](https://arc.net/l/quote/knlslrjn)

In [None]:
pipe_mlp = Pipeline([
    ("preprocess", FunctionTransformer(preprocessImageData)),
    ("mlp", MLPClassifier(random_state=prng))
])

pipe_mlp.fit(X_train, y_train)

train_accuracy = accuracy_score(pipe_mlp.predict(X_train), y_train)
test_accuracy = accuracy_score(pipe_mlp.predict(X_val), y_val)

results.add_model("Sklearn MLP", train_accuracy, test_accuracy)

In [None]:
pipe_mlp["mlp"].get_params()

## Keras

While `scikit-learn` provides a wide range of machine learning algorithms, `keras` is specifically designed for building and training neural networks and deep learning models, making it more suitable for tasks involving complex patterns and large datasets. It offers several advanced features that `sklearn` does not, such as training networks with complex architectures (such as convolutional neural networks), or applying pre-processing techniques common to deep learning (such as scaling and flattening), or the ability to transfer learning from pre-trained networks.

Keras is a high-level neural network API that provides a simple and intuitive interface for building and training deep learning models. Keras can run on multiple backends, with TensorFlowTensor becoming the default. TensorFlow is a standalone, low-level deep learning library developed by Google.

In [None]:
from keras import __version__ as keras_version
print(keras_version)

In [None]:
from keras.utils import to_categorical

print(f"Dimension of y before transformation: {y_train.shape}")

# Convert target variables to categorical
num_classes = 10
y_sets = [y_train, y_test, y_val]
y_train, y_test, y_val = [to_categorical(y, num_classes=num_classes) for y in y_sets]
print(f"Dimension of y after transformation: {y_train.shape}")

### Simple model

In [None]:
from keras.models import Sequential
from keras.layers import Input, Flatten, Rescaling, Dense

x_sets = [X_train, X_test, X_val]
X_train, X_test, X_val = [Rescaling(1./255)(x) for x in x_sets]  # we will always need this from now on

# Build the model
model = Sequential([
    Input(shape=X_train.shape[1:]), 
    Flatten(),
    Dense(100, activation='relu'), 
    Dense(num_classes, activation='softmax')
])
print(model.summary())
# 784*100+100
# 100*10+10

# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

The core of the backend is indeed constructed around **tensors**. Tensors are akin to NumPy arrays, as they hold numerical values of different dimensions. What sets them apart is their specialized role in deep learning: they are equipped with built-in gradient computation, seamlessly integrate within a computational graph, and are capable of leveraging hardware accelerators like TPUs and GPUs.

In [None]:
model.layers[2].output

In [None]:
# Fit the model
from keras.utils import set_random_seed
set_random_seed(20250324)  # for reproducibility
history = model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=10, batch_size=512)

In [None]:
# Evaluation of the model on the validation set
train_loss, train_accuracy = model.evaluate(X_train, y_train)
test_loss, test_accuracy = model.evaluate(X_val, y_val)
results.add_model("Keras MLP", train_accuracy, test_accuracy)


In [None]:
def plot_history(fit_history, metric="accuracy"):
    plt.plot(fit_history[metric], label='Training')
    plt.plot(fit_history[f'val_{metric}'], label='Validation')
    plt.xlabel('Epoch')
    plt.ylabel(metric)
    plt.title(f'{metric} history on training and validation sets')
    plt.legend()
    plt.show()

plot_history(history.history)

It seems that the maximum is not yet found. Let's train the network a little bit longer. Note that unless we recreate the model, the process starts from the point where it previously ended.

In [None]:
# Run for at least 50 epochs, store the result in history_longer_train
history_longer_train = model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=50, batch_size=512)

In [29]:
total_history = {
    "accuracy": history.history["accuracy"] + history_longer_train.history["accuracy"],
    "val_accuracy": history.history["val_accuracy"] + history_longer_train.history["val_accuracy"],
}

In [None]:
plot_history(total_history)

In [None]:
train_loss, train_accuracy = model.evaluate(X_train, y_train)
test_loss, test_accuracy = model.evaluate(X_val, y_val)
results.add_model("Keras MLP (longer training)", train_accuracy, test_accuracy)

### Regularization in neural networks

#### Early stopping

Keras' `EarlyStopping` method has a `patience` parameter which defines the number of epochs with no improvement after which training will be stopped. It defaults to 0.

In [None]:
from keras.callbacks import EarlyStopping
from keras.models import clone_model

# to make sure the process starts over, we need to create a new model instance and compile it
cloned_model = clone_model(model)
cloned_model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# fit with early stopping
history_with_early_stopping = cloned_model.fit(
    X_train, y_train, validation_data=(X_val, y_val), epochs=100, batch_size=512, 
    callbacks=[EarlyStopping(monitor='val_accuracy')]
)

In [None]:
plot_history(history_with_early_stopping.history)

#### Dropout

The Dropout layer randomly sets input units to 0 with a frequency of `rate` at each step during training time, which helps prevent overfitting. Inputs not set to 0 are scaled up by 1 / (1 - `rate`) such that the sum over all inputs is unchanged.

In [None]:
from keras.layers import Dropout

# Build the model
regularized_model = Sequential([
    Input(shape=X_train.shape[1:]),
    Flatten(),
    Dense(100, activation='relu'),
    Dropout(rate=0.5),
    Dense(num_classes, activation='softmax')
])

# Compile the model
regularized_model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
print(regularized_model.summary())

In [None]:
history_regularized = regularized_model.fit(
    X_train, y_train, validation_data=(X_val, y_val), epochs=50, batch_size=512,
    callbacks=[EarlyStopping(monitor='val_accuracy', patience=5)] # two epochs without any improvement is still fine
)

In [None]:
plot_history(history_regularized.history)

In [None]:
train_loss, train_accuracy = regularized_model.evaluate(X_train, y_train)
test_loss, test_accuracy = regularized_model.evaluate(X_val, y_val)
results.add_model("Keras MLP (regularized)", train_accuracy, test_accuracy)

### Deep network

**TODO**: Build a deeper network with at least two hidden layers. Train it and measure its performance. (Don't forget regularization!)

In [None]:
deep_model = #TBA

In [None]:
history_deep = deep_model.fit(#TBA)

In [None]:
plot_history(history_deep.history)

In [None]:
train_loss, train_accuracy = deep_model.evaluate(X_train, y_train)
test_loss, test_accuracy = deep_model.evaluate(X_val, y_val)
results.add_model("Keras MLP (deep model)", train_accuracy, test_accuracy)