<a href="https://colab.research.google.com/github/StarDylan/FlowMatching/blob/main/MNIST_Latent_Space_Visualizations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import Libraries

In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from sklearn.preprocessing import normalize
from sklearn.decomposition import PCA, KernelPCA
from sklearn.manifold import TSNE
from umap import UMAP

from tensorflow.keras.datasets import mnist

from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, confusion_matrix, classification_report

# Config

In [2]:
image_size = 28

In [3]:
def get_mnist_data():
  (x_train, y_train), (x_test, y_test) = mnist.load_data()
  x_train = x_train.astype('float32')
  x_train /= 255
  x_train = x_train * 2 - 1
  x_test = x_test.astype('float32')
  x_test /= 255
  x_test = x_test * 2 - 1
  return x_train, y_train, x_test, y_test

In [4]:
X_train, y_train, X_test, y_test = get_mnist_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
[1m11490434/11490434[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [5]:
components = [100, 200, 300, 400, 500] + list(range(501, 600, 5)) + [600, 784]
flat_X_train = X_train[:1800].reshape(len(X_train[:1800]), -1)

# Helpers

In [6]:
def knn_classifier_evaluation(reduced_data, y_train):
  """
  Trains a KNN classifier on reduced data and outputs a classification report and confusion matrix.

  Args:
    reduced_data: The dimensionality-reduced data (e.g., output of repeated_umap).
    y_train: The training labels.
  """
  # Split data into training and testing sets for the KNN classifier
  X_train_knn, X_test_knn, y_train_knn, y_test_knn = train_test_split(
      reduced_data,
      y_train[:reduced_data.shape[0]],
      test_size=0.2,
      random_state=42
  )

  # Train a KNN classifier
  knn = KNeighborsClassifier(n_neighbors=5) # You can adjust n_neighbors
  knn.fit(X_train_knn, y_train_knn)

  # Make predictions
  y_pred_knn = knn.predict(X_test_knn)

  # Evaluate the model
  print("Classification Report:")
  print(classification_report(y_test_knn, y_pred_knn))

  print("\nConfusion Matrix:")
  print(sns.heatmap(confusion_matrix(y_test_knn, y_pred_knn)))

In [7]:
def method_performance_by_num_components(model_type, label:str):
  X_train_reduced_sets = []
  X_train_reconstructed_sets = []
  errors = []

  for component in components:
    if label == "KPCA":
      pca = model_type(n_components=component, fit_inverse_transform=True) # keep top k components that explain 95% for the variance
    else:
      pca = model_type(n_components=component) # keep top k components that explain 95% for the variance
    X_train_reduced = pca.fit_transform(flat_X_train) # data transformed via pca
    X_train_reduced_sets.append(X_train_reduced)
    X_train_reconstructed = pca.inverse_transform(X_train_reduced) # data attempted to be reconstructed
    X_train_reconstructed_sets.append(X_train_reconstructed)
    error = ((flat_X_train - X_train_reconstructed)**2).mean() # get the mean squared error
    errors.append(error)

  # get the index with the min error
  plt.plot(components, errors)
  plt.xlabel("Num Comp")
  plt.ylabel("Error")
  plt.title(f"{label} Num Component Comparison")
  plt.grid(True)
  plt.show()

# Dimensionality Reduction

## PCA

In [None]:
method_performance_by_num_components(PCA, label="PCA")

In [None]:
X_train.shape

In [None]:
PCA_model = PCA(n_components=10).fit(flat_X_train)

explained_variance = PCA_model.explained_variance_ratio_
plt.bar(range(10), explained_variance)
plt.xlabel("Principal Component")
plt.ylabel("Explained Variance")
plt.title("PCA Explained Variance")
plt.grid(True)
plt.show()

In [None]:
PCA_model = PCA(n_components=2).fit(flat_X_train)

In [None]:
pca_embeddings = PCA_model.transform(flat_X_train)
pca_embeddings_df = pd.DataFrame(pca_embeddings, columns=['PCA1', 'PCA2'])

# Plot it (optional)
plt.scatter(pca_embeddings_df['PCA1'], pca_embeddings_df['PCA2'], s=5)
plt.title('PCA projection')
plt.show()

In [None]:
knn_classifier_evaluation(pca_embeddings, y_train)

## KPCA

In [None]:
method_performance_by_num_components(KernelPCA, label="KPCA")

In [None]:
KPCA_model = KernelPCA(n_components=2, fit_inverse_transform=True).fit(flat_X_train)

In [None]:
kpca_embeddings = KPCA_model.transform(flat_X_train)
kpca_embeddings_df = pd.DataFrame(kpca_embeddings, columns=['KPCA1', 'KPCA2'])

plt.scatter(kpca_embeddings_df['KPCA1'], kpca_embeddings_df['KPCA2'], s=5)
plt.title('KPCA projection')
plt.show()

In [None]:
knn_classifier_evaluation(kpca_embeddings, y_train)

## UMAP

In [None]:
method_performance_by_num_components(UMAP, label="UMAP")

In [None]:
UMAP_model = UMAP(n_components=2, random_state=42).fit(flat_X_train)

In [None]:
# Apply UMAP to reduce to 2 dimensions
umap_embeddings = UMAP_model.transform(flat_X_train)
umap_embeddings_df = pd.DataFrame(embedding, columns=['UMAP1', 'UMAP2'])

# Plot it (optional)
plt.scatter(umap_embeddings_df['UMAP1'], umap_embeddings_df['UMAP2'], s=5)
plt.title('UMAP projection')
plt.show()

In [None]:
knn_classifier_evaluation(umap_embeddings, y_train)

## Repeated UMAP

In [None]:
def repeated_umap(X, n_components:int):
  UMAP_embeddings = UMAP(n_components=n_components, random_state=42).fit_transform(X)
  print(f"n_components: {n_components}")

  if n_components <= 2:
    plt.scatter(UMAP_embeddings[:, 0], UMAP_embeddings[:, 1], s=5)
    plt.title('UMAP projection')
    plt.show()
    return UMAP_embeddings

  else:
    return repeated_umap(
        UMAP_embeddings,
        n_components // 2 if n_components > 3 else 2
    )

In [None]:
repeated_umap_embeddings = repeated_umap(flat_X_train, 64)

In [None]:
knn_classifier_evaluation(repeated_umap_embeddings, y_train)

## Variational autoencoder for further dimensionality reduction
Run autoencoder to get information into nicer 2d representation

# Preprocessing

In [None]:
print(X_train.shape)
for i in range(5):
  plt.subplot(1, 5, i+1)
  plt.imshow(X_train[i])
  plt.axis('off')
plt.show()

In [None]:
reduced_mnist = normalize(UMAP_model.transform(flat_X_train))
reduced_mnist_df = pd.DataFrame(reduced_mnist)

In [None]:
reduced_mnist_df.head()

# Visualizations
Show samples from that latent space