In [1]:
# %% [markdown]
# # Autoencoder and Latent Space Classification Notebook
#
# In this notebook, we will:
# 
# 1. **Build a Simple Autoencoder:**  
#    Create an autoencoder (encoder and decoder) that takes 512x512 images as input.
#
# 2. **Data Preparation:**  
#    Load images using PyTorch’s `ImageFolder`, apply the necessary transformations, and split the dataset into training (70%), validation (15%), and test (15%) sets.
#
# 3. **Training the Autoencoder:**  
#    Train the autoencoder using Mean Squared Error (MSE) as the reconstruction loss and monitor the loss per epoch.
#
# 4. **Plot the Loss:**  
#    Plot the training and validation loss per epoch and save the plot as `reconstruction_loss.png`.
#
# 5. **Latent Space Extraction and Classification:**  
#    Extract the latent representations (z) from the trained autoencoder, then train a logistic regression classifier (using scikit‑learn) on these representations.
#
# 6. **Evaluation:**  
#    Evaluate the classifier using the weighted F1 score.
#
# **Note:**  
# Make sure to update the dataset path (`dataset_path`) to point to your dataset directory.

In [None]:
# %% [code]
# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score

# %% [markdown]
# ## Define the Autoencoder Model
#
# The autoencoder is composed of:
#
# - **Encoder:** A series of convolutional layers that downsample the image and finally flatten the output. A fully connected layer then maps the flattened features to the latent space vector `z`.
# - **Decoder:** A fully connected layer converts `z` back to a feature map, which is then upsampled via transposed convolution layers to reconstruct the original image.
#
# We assume the images have 3 channels (RGB) and are of size 512x512.


In [None]:
# %% [code]
class Autoencoder(nn.Module):
    def __init__(self, latent_dim=128):
        super(Autoencoder, self).__init__()
        # Encoder: Downsample the input image.
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1),  # [16, 256, 256]
            nn.ReLU(True),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),  # [32, 128, 128]
            nn.ReLU(True),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),  # [64, 64, 64]
            nn.ReLU(True),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1), # [128, 32, 32]
            nn.ReLU(True),
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1),# [256, 16, 16]
            nn.ReLU(True),
        )
        # Fully connected layers to go from the flattened feature map to the latent vector and back.
        self.fc1 = nn.Linear(256 * 16 * 16, latent_dim)  # Encode to latent vector.
        self.fc2 = nn.Linear(latent_dim, 256 * 16 * 16)   # Decode from latent vector.
        
        # Decoder: Upsample the feature map back to the original image dimensions.
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, kernel_size=3, stride=2, 
                               padding=1, output_padding=1),  # [128, 32, 32]
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, 
                               padding=1, output_padding=1),  # [64, 64, 64]
            nn.ReLU(True),
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, 
                               padding=1, output_padding=1),  # [32, 128, 128]
            nn.ReLU(True),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, 
                               padding=1, output_padding=1),  # [16, 256, 256]
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 3, kernel_size=3, stride=2, 
                               padding=1, output_padding=1),  # [3, 512, 512]
            nn.Sigmoid()  # Normalize the output to be between 0 and 1.
        )
    
    def forward(self, x):
        batch_size = x.size(0)
        # Encode
        x_enc = self.encoder(x)
        x_enc = x_enc.view(batch_size, -1)  # Flatten the feature maps.
        z = self.fc1(x_enc)  # Get the latent representation.
        # Decode
        x_dec = self.fc2(z)
        x_dec = x_dec.view(batch_size, 256, 16, 16)  # Reshape back to feature map dimensions.
        x_recon = self.decoder(x_dec)  # Reconstruct the image.
        return x_recon, z


# %% [markdown]
# ## Data Loading and Splitting
#
# We load our dataset using `torchvision.datasets.ImageFolder`. The images are resized to 512x512 and converted to tensors.
# The dataset is then split into:
#
# - **Training Set:** 70%
# - **Validation Set:** 15%
# - **Test Set:** 15%
#
# **Important:** Update the `dataset_path` variable to the location of your dataset.

In [None]:
# %% [code]
# Data transformations: resize images to 512x512 and convert them to tensors.
transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.ToTensor(),
])

# Set the dataset path (update this path accordingly).
dataset_path = 'path/to/dataset'
dataset = datasets.ImageFolder(root=dataset_path, transform=transform)

# Compute dataset splits.
total_size = len(dataset)
train_size = int(0.7 * total_size)
val_size = int(0.15 * total_size)
test_size = total_size - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Create DataLoaders for each subset.
batch_size = 8  # Adjust based on available hardware.
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# %% [markdown]
# ## Training the Autoencoder
#
# We now train our autoencoder using MSE loss to measure the difference between the input images and their reconstructions.
# After each epoch, we evaluate the model on the validation set and log both the training and validation losses.

In [None]:
# %% [code]
# Set device (GPU if available, else CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

latent_dim = 128
model = Autoencoder(latent_dim=latent_dim).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

num_epochs = 20
train_losses = []
val_losses = []

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, _ in train_loader:
        images = images.to(device)
        optimizer.zero_grad()
        outputs, _ = model(images)
        loss = criterion(outputs, images)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)
    epoch_train_loss = running_loss / len(train_loader.dataset)
    train_losses.append(epoch_train_loss)
    
    # Validation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, _ in val_loader:
            images = images.to(device)
            outputs, _ = model(images)
            loss = criterion(outputs, images)
            val_loss += loss.item() * images.size(0)
    epoch_val_loss = val_loss / len(val_loader.dataset)
    val_losses.append(epoch_val_loss)
    
    print(f"Epoch [{epoch+1}/{num_epochs}]  Train Loss: {epoch_train_loss:.4f}  |  Val Loss: {epoch_val_loss:.4f}")


# %% [markdown]
# ## Plotting and Saving the Reconstruction Loss
#
# We plot the training and validation reconstruction loss per epoch. The plot is saved as `reconstruction_loss.png` and displayed below.

In [None]:
# %% [code]
def extract_latents(dataloader):
    model.eval()
    latent_list = []
    label_list = []
    with torch.no_grad():
        for imgs, lbls in dataloader:
            imgs = imgs.to(device)
            _, z = model(imgs)
            latent_list.append(z.cpu().numpy())
            label_list.append(lbls.numpy())
    return np.concatenate(latent_list, axis=0), np.concatenate(label_list, axis=0)

train_latents, train_labels = extract_latents(train_loader)
test_latents, test_labels = extract_latents(test_loader)

# %% [markdown]
# ## Logistic Regression Classification on the Latent Space
#
# We now train a logistic regression classifier using the latent representations from the training set.
# The classifier is then evaluated on the test set using the weighted F1 score as the performance metric.


In [None]:
# %% [code]
clf = LogisticRegression(max_iter=1000)
clf.fit(train_latents, train_labels)
test_preds = clf.predict(test_latents)
f1 = f1_score(test_labels, test_preds, average='weighted')
print(f"Logistic Regression Classification F1 Score: {f1:.4f}")

# %% [markdown]
# ## Conclusion
#
# In this notebook, we:
#
# - Built and trained an autoencoder on 512x512 images.
# - Monitored and plotted the reconstruction loss over epochs.
# - Extracted latent representations from the autoencoder.
# - Performed a classification task using logistic regression on the latent space.
# - Evaluated the classifier using the weighted F1 score.
#
# This pipeline can serve as a foundation for more complex models or additional downstream tasks.
