In [9]:
import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import os

import numpy as np
from sklearn.preprocessing import MinMaxScaler
import torch.nn.functional as F
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset, random_split
import torch.optim as optim

In [10]:
scaler = MinMaxScaler()

def get_spectrograms(file_name):
    audio, sr = librosa.load(file_name, sr=None, mono=None)
    audio = audio.mean(axis=0)
    magnitude_spectrogram = np.abs(librosa.stft(audio, n_fft=1024, hop_length=512))
    magnitude_spectrogram = magnitude_spectrogram[:, 0:800] # cropping the time dimension
    log_spectrogram = librosa.amplitude_to_db(magnitude_spectrogram)
    log_spectrogram_normalized = scaler.fit_transform(log_spectrogram)
    log_spectrogram_tensor = torch.tensor(log_spectrogram_normalized, dtype=torch.float32)
    return log_spectrogram_tensor

In [11]:
# Getting 300 spectrograms to train the diffusion autoencoder
spectrograms = []
i = 0
for file in os.listdir('./train_data'):

    spectrograms.append(get_spectrograms(os.path.join('./train_data',file)))
    i += 1

spectrograms = np.array(spectrograms)
print(spectrograms.shape)


(299, 513, 800)


In [None]:
class SpectrogramAutoencoder(nn.Module):
    def __init__(self, freq_bins, time_stamps, latent_dim):
        super(SpectrogramAutoencoder, self).__init__()
        
        self.freq_bins = freq_bins
        self.time_stamps = time_stamps
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),  # (1, F, T) -> (32, F, T)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0),      # -> (32, F/2, T/2)
            
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1), # (32, F/2, T/2) -> (64, F/2, T/2)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0),      # -> (64, F/4, T/4)
            
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),# (64, F/4, T/4) -> (128, F/4, T/4)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0)       # -> (128, F/8, T/8)
        )


        self.encoder_output_size=819200 # found out after checking the dimensions of input after self.flatten
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(self.encoder_output_size, latent_dim)
        


        # Decoder
        self.fc2 = nn.Linear(latent_dim, self.encoder_output_size)
        self.decoder = nn.Sequential(
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1), # (128, F/8, T/8) -> (128, F/8, T/8)
            nn.ReLU(),
            nn.Upsample(scale_factor=2, mode='nearest'),             # -> (128, F/4, T/4)
            
            nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1),  # (128, F/4, T/4) -> (64, F/4, T/4)
            nn.ReLU(),
            nn.Upsample(scale_factor=2, mode='nearest'),             # -> (64, F/2, T/2)
            
            nn.Conv2d(64, 32, kernel_size=3, stride=1, padding=1),   # (64, F/2, T/2) -> (32, F/2, T/2)
            nn.ReLU(),
            nn.Upsample(scale_factor=2, mode='nearest'),             # -> (32, F, T)
            
            nn.Conv2d(32, 1, kernel_size=3, stride=1, padding=(1, 0)),    # (32, F, T) -> (1, F, T)
            nn.Sigmoid()
        )


    
    def forward(self, x):
        # Reshape input for convolutional layers
        x = x.unsqueeze(1)  # Add channel dimension: (N, 1, F, T)
        
        # Encoder
        x = self.encoder(x)
        # print(x.shape, "after encoder")
        x = self.flatten(x)
        # print(x.shape, "after flattening")
        latent = self.fc1(x)
        
        # Decoder
        x = self.fc2(latent)
        x = x.view(-1, 128, self.freq_bins // 8, self.time_stamps // 8)

        x = self.decoder(x)
        # print(x.shape, 'shape after decoding')
        x = x.squeeze(1)
        # print(x.shape, 'shape after squeezing')
        x = F.pad(x, (0, 2, 0, 1))
        print(x.shape)
        return x , latent


In [None]:
# Hyperparameters
freq_bins = 513
time_stamps = 800
latent_dim = 32
epochs = 5
batch_size = 32
learning_rate = 0.001

# Instantiate the model
model = SpectrogramAutoencoder(freq_bins=freq_bins, time_stamps=time_stamps, latent_dim=latent_dim)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

train_size = int(0.8 * len(spectrograms))
val_size = len(spectrograms) - train_size
train_data, val_data = random_split(spectrograms, [train_size, val_size])

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size)



criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(epochs):
    model.train()
    total_train_loss = 0
    for batch in train_loader:
        batch = batch.to(device)
        outputs, latent_space = model(batch)
        loss = criterion(outputs, batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_train_loss += loss.item()

    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for batch in val_loader:
            batch = batch.to(device)
            outputs, _ = model(batch)
            val_loss = criterion(outputs, batch)
            total_val_loss += val_loss.item()

    # Average losses
    avg_train_loss = total_train_loss / len(train_loader)
    avg_val_loss = total_val_loss / len(val_loader)
    print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

torch.Size([32, 513, 800])
torch.Size([32, 513, 800])
torch.Size([32, 513, 800])
torch.Size([32, 513, 800])
torch.Size([32, 513, 800])
torch.Size([32, 513, 800])
torch.Size([32, 513, 800])
torch.Size([15, 513, 800])
torch.Size([32, 513, 800])
torch.Size([28, 513, 800])
Epoch 1/5, Train Loss: 0.0749, Val Loss: 0.0324
torch.Size([32, 513, 800])
torch.Size([32, 513, 800])
torch.Size([32, 513, 800])
torch.Size([32, 513, 800])
torch.Size([32, 513, 800])
torch.Size([32, 513, 800])


In [None]:
# Switch the model to evaluation mode
model.eval()

# Create a DataLoader for the spectrograms
full_loader = DataLoader(spectrograms, batch_size=batch_size, shuffle=False)

# Initialize a list to store latent representations
latent_representations = []

# Generate latent representations
with torch.no_grad():
    for batch in full_loader:
        batch = batch.to(device)
        _, latent_space = model(batch)  # Get the latent representations
        latent_representations.append(latent_space.cpu().numpy())  # Move to CPU and convert to numpy array

# Concatenate all latent representations into a single array
latent_representations = np.concatenate(latent_representations, axis=0)

# Print the shape of the latent representations
print(f"Latent representations shape: {latent_representations.shape}")

# Save the latent representations to a file if needed
np.save("latent_representations.npy", latent_representations)

print("Latent representations saved to 'latent_representations.npy'")
