In [None]:
# Embeddings/Autoencoder.ipynb

import os
import glob
import random
from collections import defaultdict
from PIL import Image

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from tqdm.auto import tqdm

# Load Dataset

In [None]:
# Parameters
data_root = "../Results/Dev"
pattern = os.path.join(data_root, "*", "aligned_spectrogram_*.png")
test_split = 0.2     # 20% test, 80% train
seed = 42            # random seed for reproducibility

# Gather all spectrograms
filenames = sorted(glob.glob(pattern))
print(f"Found {len(filenames)} spectrogram images")
print(f"Example: {filenames[0]}")

# Group by song folder
song_to_files = defaultdict(list)
for f in filenames:
    song = os.path.basename(os.path.dirname(f))  # parent folder name
    song_to_files[song].append(f)

all_songs = sorted(song_to_files.keys())
print(f"Found {len(all_songs)} songs")

# Reproducible shuffle
random.seed(seed)
random.shuffle(all_songs)

# Split songs into train/test
num_test = int(len(all_songs) * test_split)
test_songs = set(all_songs[:num_test])
train_songs = set(all_songs[num_test:])

# Flatten into filename lists
train_files = [f for song in train_songs for f in song_to_files[song]]
test_files  = [f for song in test_songs  for f in song_to_files[song]]

print(f"Train: {len(train_files)} files from {len(train_songs)} songs")
print(f"Test:  {len(test_files)} files from {len(test_songs)} songs")


In [None]:
class SpectrogramDataset(Dataset):
    def __init__(self, filenames=None, root_dir=None, img_size=128):
        """
        Args:
            filenames (list[str]): Explicit list of image paths to load.
            root_dir (str): Directory containing song folders with spectrograms.
                            If provided, will auto-discover aligned_spectrogram images.
            img_size (int): Final resize to (img_size, img_size).
        """
        if filenames is not None:
            self.files = filenames
        elif root_dir is not None:
            self.files = []
            for song_dir in glob(os.path.join(root_dir, "*")):
                self.files.extend(
                    [f for f in glob(os.path.join(song_dir, "*.png")) if "aligned_spectrogram" in f]
                )
        else:
            raise ValueError("Must provide either filenames or root_dir")

        self.transform = T.Compose([
            T.Resize((img_size, img_size)),
            T.ToTensor(),  # scales to [0,1]
        ])

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        img = Image.open(self.files[idx]).convert("L")  # grayscale
        return self.transform(img)


# Parameters
img_size = 128
batch_size = 32

dataset = SpectrogramDataset(train_files, img_size=img_size)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


train_dataset = SpectrogramDataset(train_files, img_size=img_size)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = SpectrogramDataset(test_files, img_size=img_size)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

full_dataset = SpectrogramDataset(filenames, img_size=img_size)
full_dataloader = DataLoader(full_dataset, batch_size=batch_size, shuffle=True)

print("Train dataset:", len(train_dataset), "images")
print("Test dataset:", len(test_dataset), "images")
print("Full dataset:", len(full_dataset), "images")

# Simple Linear Autoencoder

In [None]:

class SimpleAutoencoder(nn.Module):
    def __init__(self, embedding_dim=128):
        super().__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Flatten(),
            nn.Linear(img_size * img_size, 512),
            nn.ReLU(),
            nn.Linear(512, embedding_dim)
        )
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(embedding_dim, 512),
            nn.ReLU(),
            nn.Linear(512, img_size * img_size),
            nn.Sigmoid(),  # outputs in [0,1]
        )

    def forward(self, x):
        z = self.encoder(x)
        recon = self.decoder(z)
        recon = recon.view(-1, 1, img_size, img_size)
        return recon, z

device = "cuda" if torch.cuda.is_available() else "cpu"
ae_model = SimpleAutoencoder(embedding_dim=128).to(device)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(ae_model.parameters(), lr=1e-3)

print("Model ready on", device)

In [None]:
epochs = 10  # adjust as needed

for epoch in range(epochs):
    epoch_loss = 0
    for imgs in train_dataloader:
        imgs = imgs.to(device)
        recon, z = ae_model(imgs)
        loss = criterion(recon, imgs)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f"Epoch {epoch+1}/{epochs} - Loss: {epoch_loss/len(dataloader):.6f}")

In [None]:
n_epochs = 10
for epoch in range(n_epochs):
    # --- Training phase ---
    ae_model.train()
    running_loss = 0.0
    for imgs in tqdm(train_dataloader, desc=f"Train Epoch {epoch+1}/{n_epochs}"):
        imgs = imgs.to(device)
        recon, _ = ae_model(imgs)  # your model returns (recon, z)
        loss = criterion(recon, imgs)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * imgs.size(0)

    train_loss = running_loss / len(train_dataloader.dataset)

    # --- Evaluation phase ---
    ae_model.eval()
    test_loss = 0.0
    with torch.no_grad():
        for imgs in tqdm(test_dataloader, desc=f"Test Epoch {epoch+1}/{n_epochs}"):
            imgs = imgs.to(device)
            recon, _ = ae_model(imgs)
            loss = criterion(recon, imgs)
            test_loss += loss.item() * imgs.size(0)

    test_loss = test_loss / len(test_dataloader.dataset)

    # --- Logging ---
    print(f"Epoch [{epoch+1}/{n_epochs}] "
          f"Train Loss: {train_loss:.6f} | Test Loss: {test_loss:.6f}")

### Save model weights

In [None]:
# Save state dict
save_path = "../Results/Models/autoencoder.pth"
os.makedirs(os.path.dirname(save_path), exist_ok=True)
torch.save(ae_model.state_dict(), save_path)

# Load later
loaded_model = SimpleAutoencoder(embedding_dim=128).to(device)
loaded_model.load_state_dict(torch.load(save_path, map_location=device))
loaded_model.eval()

### Generate Embeddings

In [None]:
all_embeddings = []

with torch.no_grad():
    for imgs in DataLoader(dataset, batch_size=batch_size):
        imgs = imgs.to(device)
        _, z = loaded_model(imgs)
        all_embeddings.append(z.cpu())

embeddings_tensor = torch.cat(all_embeddings, dim=0)  # shape (N, embedding_dim)
print("Final embeddings shape:", embeddings_tensor.shape)

# Save
save_path = "../Results/EmbeddingData/autoencoder_embeddings.pt"
os.makedirs(os.path.dirname(save_path), exist_ok=True)
torch.save(embeddings_tensor, save_path)
print(f"Saved embeddings to {save_path}")

# Convolutional Autoencoder

In [None]:
# --------------------------
# Conv Autoencoder
# --------------------------
class ConvAutoencoder(nn.Module):
    def __init__(self, latent_dim=128):
        super().__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, 3, stride=2, padding=1),  # 1x128x128 -> 16x64x64
            nn.ReLU(True),
            nn.Conv2d(16, 32, 3, stride=2, padding=1), # 32x32x32
            nn.ReLU(True),
            nn.Conv2d(32, 64, 3, stride=2, padding=1), # 64x16x16
            nn.ReLU(True),
            nn.Conv2d(64, 128, 3, stride=2, padding=1),# 128x8x8
            nn.ReLU(True),
        )
        self.flatten = nn.Flatten()
        self.fc_enc = nn.Linear(128 * 8 * 8, latent_dim)

        # Decoder
        self.fc_dec = nn.Linear(latent_dim, 128 * 8 * 8)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),  # 64x16x16
            nn.ReLU(True),
            nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),   # 32x32x32
            nn.ReLU(True),
            nn.ConvTranspose2d(32, 16, 3, stride=2, padding=1, output_padding=1),   # 16x64x64
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 1, 3, stride=2, padding=1, output_padding=1),    # 1x128x128
            nn.Sigmoid(),  # keep values in [0,1]
        )

    def encode(self, x):
        h = self.encoder(x)
        h = self.flatten(h)
        z = self.fc_enc(h)
        return z

    def decode(self, z):
        h = self.fc_dec(z)
        h = h.view(-1, 128, 8, 8)
        x_recon = self.decoder(h)
        return x_recon

    def forward(self, x):
        z = self.encode(x)
        x_recon = self.decode(z)
        return x_recon

# --------------------------
# Training Setup
# --------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Parameters
img_size = 128
batch_size = 32

dataset = SpectrogramDataset(filenames, img_size=img_size)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

model = ConvAutoencoder(latent_dim=128).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [None]:
# --------------------------
# Training Loop
# --------------------------
n_epochs = 1
for epoch in range(n_epochs):
    model.train()
    running_loss = 0.0
    for imgs in tqdm(dataloader, desc=f"Epoch {epoch+1}/{n_epochs}"):
        imgs = imgs.to(device)
        recon = model(imgs)
        loss = criterion(recon, imgs)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * imgs.size(0)

    epoch_loss = running_loss / len(dataset)
    print(f"Epoch [{epoch+1}/{n_epochs}], Loss: {epoch_loss:.6f}")

# Convolutional Variational Autoencoder

In [None]:
class ConvVAE(nn.Module):
    def __init__(self, latent_dim=128):
        super().__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, 3, stride=2, padding=1),  # 1x128x128 -> 16x64x64
            nn.ReLU(True),
            nn.Conv2d(16, 32, 3, stride=2, padding=1), # 32x32x32
            nn.ReLU(True),
            nn.Conv2d(32, 64, 3, stride=2, padding=1), # 64x16x16
            nn.ReLU(True),
            nn.Conv2d(64, 128, 3, stride=2, padding=1),# 128x8x8
            nn.ReLU(True),
        )
        self.flatten = nn.Flatten()
        hidden_dim = 128 * 8 * 8
        self.fc_mu = nn.Linear(hidden_dim, latent_dim)
        self.fc_logvar = nn.Linear(hidden_dim, latent_dim)

        # Decoder
        self.fc_dec = nn.Linear(latent_dim, hidden_dim)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1),  # 64x16x16
            nn.ReLU(True),
            nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),   # 32x32x32
            nn.ReLU(True),
            nn.ConvTranspose2d(32, 16, 3, stride=2, padding=1, output_padding=1),   # 16x64x64
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 1, 3, stride=2, padding=1, output_padding=1),    # 1x128x128
            nn.Sigmoid(),
        )

    def encode(self, x):
        h = self.encoder(x)
        h = self.flatten(h)
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        h = self.fc_dec(z)
        h = h.view(-1, 128, 8, 8)
        return self.decoder(h)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        x_recon = self.decode(z)
        return x_recon, mu, logvar

def vae_loss(recon_x, x, mu, logvar, beta=1.0):
    # Reconstruction loss
    recon_loss = nn.functional.mse_loss(recon_x, x, reduction='sum')

    # KL divergence
    kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    return (recon_loss + beta * kl_loss) / x.size(0)

model = ConvVAE(latent_dim=128).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [None]:
n_epochs = 1
for epoch in range(n_epochs):
    model.train()
    running_loss = 0.0
    for imgs in tqdm(dataloader, desc=f"Epoch {epoch+1}/{n_epochs}"):
        imgs = imgs.to(device)
        recon, mu, logvar = model(imgs)
        loss = vae_loss(recon, imgs, mu, logvar)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * imgs.size(0)

    epoch_loss = running_loss / len(dataset)
    print(f"Epoch [{epoch+1}/{n_epochs}], Loss: {epoch_loss:.6f}")