In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import librosa
import matplotlib.pyplot as plt
from google.colab import drive
import glob
import os
import cv2

drive.mount('/content/drive')

# Define paths
real_dataset_path = '/content/drive/MyDrive/dataset/TeamDeepwave/dataset/KaggleDataset/real'
fake_dataset_path = '/content/drive/MyDrive/dataset/TeamDeepwave/dataset/KaggleDataset/fake'

# Function to load audio and create spectrograms
# Function to load audio and create padded spectrograms
def load_audio(file_path, sr=22050, max_length=128):
    y, _ = librosa.load(file_path, sr=sr)
    stft = librosa.stft(y)
    spectrogram = librosa.amplitude_to_db(np.abs(stft), ref=np.max)

    # Pad spectrograms to have the same length
    if spectrogram.shape[1] < max_length:
        padding = max_length - spectrogram.shape[1]
        spectrogram = np.pad(spectrogram, pad_width=((0, 0), (0, padding)), mode='constant')
    else:
        spectrogram = spectrogram[:, :max_length]
    return spectrogram

# Load real spectrograms with padding
real_spectrograms = []
for file in glob.glob(real_dataset_path + '/*.wav'):
    real_spectrograms.append(load_audio(file))
real_spectrograms = np.array(real_spectrograms)

# Load real spectrograms
real_spectrograms = []
for file in glob.glob(real_dataset_path + '/*.wav'):
    real_spectrograms.append(load_audio(file))
real_spectrograms = np.array(real_spectrograms)
real_labels = np.ones(len(real_spectrograms))  # Labels for real spectrograms

# Load fake spectrograms
fake_spectrograms = []
for file in glob.glob(fake_dataset_path + '/*.wav'):
    fake_spectrograms.append(load_audio(file))
fake_spectrograms = np.array(fake_spectrograms)
fake_labels = np.zeros(len(fake_spectrograms))  # Labels for fake spectrograms

# Combine and shuffle data
all_spectrograms = np.concatenate([real_spectrograms, fake_spectrograms])
all_labels = np.concatenate([real_labels, fake_labels])

# Shuffle data and labels together
shuffle_indices = np.random.permutation(len(all_spectrograms))
all_spectrograms = all_spectrograms[shuffle_indices]
all_labels = all_labels[shuffle_indices]

# Normalize and resize spectrograms (adjust size as needed)
all_spectrograms = (all_spectrograms - np.min(all_spectrograms)) / (np.max(all_spectrograms) - np.min(all_spectrograms))
all_spectrograms = np.array([cv2.resize(spec, (128, 128)) for spec in all_spectrograms])

# Convert to PyTorch tensors and create DataLoader
all_spectrograms_tensor = torch.tensor(all_spectrograms, dtype=torch.float32).unsqueeze(1)  # Add channel dimension
all_labels_tensor = torch.tensor(all_labels, dtype=torch.float32).unsqueeze(1)
dataset = torch.utils.data.TensorDataset(all_spectrograms_tensor, all_labels_tensor)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=True)

# Define GAN Architecture
class Generator(nn.Module):
    def __init__(self, input_dim=100, output_channels=1, image_size=128):
        super(Generator, self).__init__()
        self.output_channels = output_channels  # Store output_channels as an attribute
        self.main = nn.Sequential(
            # input is Z, going into a convolution
            nn.ConvTranspose2d(input_dim, 512, 4, 1, 0, bias=False),
            nn.BatchNorm2d(512),
            nn.ReLU(True),
            # state size. (ngf*8) x 4 x 4
            nn.ConvTranspose2d(512, 256, 4, 2, 1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(True),
            # state size. (ngf*4) x 8 x 8
            nn.ConvTranspose2d(256, 128, 4, 2, 1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
            # state size. (ngf*2) x 16 x 16
            nn.ConvTranspose2d(128, 64, 4, 2, 1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(True),
            # state size. (ngf) x 32 x 32
            nn.ConvTranspose2d(64, self.output_channels, 4, 2, 1, bias=False), # Use self.output_channels
            nn.Tanh()
            # state size. (nc) x 64 x 64
        )

    def forward(self, input):
        output = self.main(input)
        return output

class Discriminator(nn.Module):
    def __init__(self, input_channels=1, image_size=128):
        super(Discriminator, self).__init__()
        self.main = nn.Sequential(
            # input is (nc) x 64 x 64
            nn.Conv2d(input_channels, 64, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (ndf) x 32 x 32
            nn.Conv2d(64, 128, 4, 2, 1, bias=False),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (ndf*2) x 16 x 16
            nn.Conv2d(128, 256, 4, 2, 1, bias=False),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (ndf*4) x 8 x 8
            nn.Conv2d(256, 512, 4, 2, 1, bias=False),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (ndf*8) x 4 x 4
            nn.Flatten(),
            nn.Linear(512 * 8 * 8, 1024),  # Hidden layer      #nn.Linear(512 * 4 * 4, 1024)
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(1024, 1),              # Output layer
            nn.Sigmoid()
        )

    def forward(self, input):
        return self.main(input)

# Training Loop
def train_gan(generator, discriminator, dataloader, num_epochs=100, lr=0.0002, device="cuda"):  # Default to 'cuda'
    criterion = nn.BCELoss()
    optimizer_G = optim.Adam(generator.parameters(), lr=lr, betas=(0.5, 0.999))
    optimizer_D = optim.Adam(discriminator.parameters(), lr=lr, betas=(0.5, 0.999))

    # Fixed noise for visualization
    fixed_noise = torch.randn(64, 100, 1, 1, device=device)

    for epoch in range(num_epochs):
        for i, (images, labels) in enumerate(dataloader):
            # Ensure models and data are on the correct device
            images = images.to(device)
            labels = labels.to(device)
            generator.to(device)
            discriminator.to(device)

            # Train Discriminator with all-real batch
            discriminator.zero_grad()
            output = discriminator(images).view(-1)
            errD_real = criterion(output, labels)
            errD_real.backward()

            # Train Discriminator with all-fake batch
            noise = torch.randn(images.size(0), 100, 1, 1, device=device)
            fake = generator(noise)
            output = discriminator(fake.detach()).view(-1)
            errD_fake = criterion(output, torch.zeros_like(labels))
            errD_fake.backward()

            errD = errD_real + errD_fake
            optimizer_D.step()

            # Train Generator
            generator.zero_grad()
            output = discriminator(fake).view(-1)
            errG = criterion(output, labels)
            errG.backward()
            optimizer_G.step()

            # Output training stats
            if i % 50 == 0:
                print(f'[{epoch}/{num_epochs}][{i}/{len(dataloader)}]\tLoss_D: {errD.item():.4f}\tLoss_G: {errG.item():.4f}')
    return generator

# Initialize models and determine device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
generator = Generator().to(device)
discriminator = Discriminator().to(device)
print(f"Using device: {device}")

# Move the creation of dataloader outside the train_gan function
# Convert to PyTorch tensors and create DataLoader
all_spectrograms_tensor = torch.tensor(all_spectrograms, dtype=torch.float32).unsqueeze(1)  # Add channel dimension
all_labels_tensor = torch.tensor(all_labels, dtype=torch.float32).unsqueeze(1)
dataset = torch.utils.data.TensorDataset(all_spectrograms_tensor, all_labels_tensor)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=True)

# Now, call train_gan function with dataloader
generator = train_gan(generator, discriminator, dataloader, device=device)  # Pass device to train_gan


# Generate spectrograms
generated_spectrograms = generator(fixed_noise).detach().cpu()

# Visualize generated spectrograms (only first 9 for demonstration)
plt.figure(figsize=(10, 10))
for i in range(9):
    plt.subplot(3, 3, i + 1)
    plt.imshow(generated_spectrograms[i].squeeze(0), cmap='viridis', origin='lower', aspect='auto')  # Assuming the spectrogram is the first channel
    plt.title(f'Generated Spectrogram {i + 1}')
    plt.axis('off')

plt.show()
# Move the creation of dataloader outside the train_gan function
# Convert to PyTorch tensors and create DataLoader
all_spectrograms_tensor = torch.tensor(all_spectrograms, dtype=torch.float32).unsqueeze(1)  # Add channel dimension
all_labels_tensor = torch.tensor(all_labels, dtype=torch.float32).unsqueeze(1)
dataset = torch.utils.data.TensorDataset(all_spectrograms_tensor, all_labels_tensor)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=True)

# Now, call train_gan function with dataloader
train_gan(generator, discriminator, dataloader, device=device)  # Pass device to train_gan


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Using device: cpu


ValueError: Using a target size (torch.Size([64, 1])) that is different to the input size (torch.Size([64])) is deprecated. Please ensure they have the same size.