In [7]:
import torch
import os
from torch import nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.datasets import ImageFolder
import torchvision.utils as vutils
import random
import matplotlib.pyplot as plt
import random
import numpy as np
import timeit

from tqdm import tqdm

In [8]:
DATA_DIR = ""
BATCH_SIZE = 1024
IMG_SIZE = 64
CHANNELS = 1
INPUT_VECTOR_DIM = 100
FEATURE_MAP_DIM = 64
LR = 2e-4
BETA1 = 0.5
EPOCHS = 2
RANDOM_SEED = 42

random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed_all(RANDOM_SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

device = "cuda" if torch.cuda.is_available() else "cpu"


In [9]:
class Discriminator(nn.Module):
    def __init__(self, feature_map_dim, channels):
        super(Discriminator, self).__init__()
        self.conv_1 = nn.Conv2d(channels, feature_map_dim*2,4,2,1,bias=False)
        self.conv_2 = nn.Conv2d(feature_map_dim*2, feature_map_dim*4,4,2,1,bias=False)
        self.conv_3 = nn.Conv2d(feature_map_dim*4, feature_map_dim*8,4,2,1,bias=False)
        self.conv_4 = nn.Conv2d(feature_map_dim*8, feature_map_dim*16,4,2,1,bias=False)
        self.conv_5 = nn.Conv2d(feature_map_dim*16, feature_map_dim*32,4,2,1,bias=False)
        
        self.conv_6 = nn.Conv2d(feature_map_dim*32, 1, 4, 1, 0,bias=False)

        self.leaky_relu = nn.LeakyReLU(0.2)
        self.batch_norm_1 = nn.BatchNorm2d(feature_map_dim*4)
        self.batch_norm_2 = nn.BatchNorm2d(feature_map_dim*8)
        self.batch_norm_3 = nn.BatchNorm2d(feature_map_dim*16)
        self.batch_norm_4 = nn.BatchNorm2d(feature_map_dim*32)

        self.sigmoid = nn.Sigmoid()
    
    def forward(self,inp):
        x = self.conv_1(inp)
        x = self.leaky_relu(x)

        x = self.conv_2(x)
        x = self.batch_norm_1(x)
        x = self.leaky_relu(x)

        x = self.conv_3(x)
        x = self.batch_norm_2(x)
        x = self.leaky_relu(x)
        
        x = self.conv_4(x)
        x = self.batch_norm_3(x)
        x = self.leaky_relu(x)

        x = self.conv_5(x)
        x = self.batch_norm_4(x)
        x = self.leaky_relu(x)

        x = self.conv_6(x)
        x = torch.mean(x, dim=3, keepdim=True)
        out = self.sigmoid(x)

        return out

In [10]:
discriminator = Discriminator(64,1).to(device)

x = torch.randn(10, 1, 128, 896).to(device)
print(x.shape)
dis_out = discriminator.forward(x)
print(dis_out.size())

torch.Size([10, 1, 128, 896])
torch.Size([10, 1, 1, 1])


In [11]:
class Generator(nn.Module):
    def __init__(self, input_vector_dim, feature_map_dim, channels):
        super(Generator, self).__init__()
        self.conv_1 = nn.ConvTranspose2d(input_vector_dim, feature_map_dim*32, (4,28), 1, 0, bias=False)
        
        # Progression of upsampling layers
        self.conv_2 = nn.ConvTranspose2d(feature_map_dim*32, feature_map_dim*16, 4, 2, 1, bias=False)
        self.conv_3 = nn.ConvTranspose2d(feature_map_dim*16, feature_map_dim*8, 4, 2, 1, bias=False)
        self.conv_4 = nn.ConvTranspose2d(feature_map_dim*8, feature_map_dim*4, 4, 2, 1, bias=False)
        self.conv_5 = nn.ConvTranspose2d(feature_map_dim*4, feature_map_dim*2, 4, 2, 1, bias=False)
        self.conv_6 = nn.ConvTranspose2d(feature_map_dim*2, channels, 4, 2, 1, bias=False)

        self.relu = nn.ReLU()
        self.batch_norm_1 = nn.BatchNorm2d(feature_map_dim*32)
        self.batch_norm_2 = nn.BatchNorm2d(feature_map_dim*16)
        self.batch_norm_3 = nn.BatchNorm2d(feature_map_dim*8)
        self.batch_norm_4 = nn.BatchNorm2d(feature_map_dim*4)
        self.batch_norm_5 = nn.BatchNorm2d(feature_map_dim*2)

        self.tanh = nn.Tanh()
    
    def forward(self,inp):
        # Input shape: [batch_size, input_vector_dim, 1, 1]
        x = self.conv_1(inp)       
        x = self.batch_norm_1(x)
        x = self.relu(x)
        
        x = self.conv_2(x)         
        x = self.batch_norm_2(x)
        x = self.relu(x)
        
        x = self.conv_3(x)          
        x = self.batch_norm_3(x)
        x = self.relu(x)
        
        x = self.conv_4(x)          
        x = self.batch_norm_4(x)
        x = self.relu(x)
        
        x = self.conv_5(x)          
        x = self.batch_norm_5(x)
        x = self.relu(x)
        
        x = self.conv_6(x)         
        out = self.tanh(x)
        
        return out

In [12]:
generator = Generator(INPUT_VECTOR_DIM, FEATURE_MAP_DIM, CHANNELS).to(device)

noise = torch.randn(10, 100, 1, 1, device=device)
gen_out = generator.forward(noise)

print(gen_out.size())

torch.Size([10, 1, 128, 896])


In [35]:
def load_dataset(directory_path, target_shape=(128,896), limit = None):
    all_files = os.listdir(directory_path)
    # Filter out the files ending with .npz and get their full paths
    npz_files = [os.path.join(directory_path, file) for file in all_files if file.endswith('.npz')]

    # List to accumulate data (for example, spectrogram data)
    accumulated_data = []
    types = set([])
    for i, file in enumerate(npz_files):
        if limit and i == limit:
            break 
        data = np.load(file)

        if 'spectrogram' in data:
            spec = data['spectrogram']
            h, w = spec.shape[0], spec.shape[1]
            min_h, min_w = min(target_shape[0], h), min(target_shape[1], w)
            
            
            new_spec = np.zeros(target_shape)
            new_spec[:min_h, :min_w] = spec[:min_h, :min_w] 
            accumulated_data.append(torch.from_numpy(np.expand_dims(new_spec, axis=0)).float())
            
    return np.array(accumulated_data)

In [36]:
fake_path_training = r"/Users/egatchal/Downloads/home/tianaz/audio_spectrograms/training/fake"
real_path_training = r"/Users/egatchal/Downloads/home/tianaz/audio_spectrograms/training/real"

fake_dataset = load_dataset(fake_path_training, limit=2000)
real_dataset = load_dataset(real_path_training)

In [None]:
real_labels = torch.ones(real_dataset.shape[0])
fake_labels = torch.zeros(fake_dataset.shape[0])
all_spectrograms = np.concatenate([fake_dataset, real_dataset], axis=0)
all_labels = np.concatenate([fake_labels, real_labels], axis=0)
all_labels = real_labels
combined = list(zip(all_spectrograms, all_labels))
combined = list(zip(real_dataset, real_labels))

In [38]:
print(real_dataset.shape)
print(fake_dataset.shape)
print(all_spectrograms.shape)
print(all_labels.dtype)

(14863, 1, 128, 896)
(2000, 1, 128, 896)
(16863, 1, 128, 896)
float32


In [31]:
DATA_DIR = ""
BATCH_SIZE = 1024
IMG_SIZE = 64
CHANNELS = 1
INPUT_VECTOR_DIM = 25
FEATURE_MAP_DIM = 64
LR = 2e-4
BETA1 = 0.5
EPOCHS = 2
RANDOM_SEED = 42

dataloader = DataLoader(
    combined, 
    batch_size=BATCH_SIZE,          # Number of samples in each batch
    shuffle=True,           # Shuffle the data for each epoch
    num_workers=4           # Optional: number of subprocesses to use for data loading
)

In [32]:
d = {}
for data in fake_dataset:
    k = data.shape
    if k not in d:
        d[k] = 0
    d[k] += 1
print(d)

d = {}
for data in real_dataset:
    k = data.shape
    if k not in d:
        d[k] = 0
    d[k] += 1
print(d)

{(1, 128, 896): 2000}
{(1, 128, 896): 100}


In [33]:
discriminator = Discriminator(FEATURE_MAP_DIM, CHANNELS).to(device)
generator = Generator(INPUT_VECTOR_DIM, FEATURE_MAP_DIM, CHANNELS).to(device)

In [34]:
criterion = nn.BCELoss()

discriminator_optimizer = optim.Adam(discriminator.parameters(), lr=LR, betas=(BETA1, 0.999))
generator_optimizer = optim.Adam(generator.parameters(), lr=LR, betas=(BETA1, 0.999))

generator.train()
discriminator.train()

start = timeit.default_timer()

for epoch in tqdm(range(EPOCHS), position=0, leave=True):
    generator_running_loss = 0
    discriminator_running_loss = 0
    for idx, data in enumerate(dataloader):
        img_data = data[0].to(device)
        dummy_labels = data[1]

        # real_labels = dummy_labels
        real_labels = torch.full((dummy_labels.size()), 1.0, dtype=torch.float).to(device)
        fake_labels = torch.full((dummy_labels.size()), 0., dtype=torch.float).to(device)
        noise = torch.randn(dummy_labels.size()[0], INPUT_VECTOR_DIM, 1, 1).to(device)

        discriminator_real_out = discriminator(img_data).view(-1)
        discriminator_real_loss = criterion(discriminator_real_out, real_labels)
        discriminator.zero_grad()
        discriminator_real_loss.backward()

        generator_fake_out = generator(noise)
        discriminator_fake_out = discriminator(generator_fake_out.detach()).view(-1)
        discriminator_fake_loss = criterion(discriminator_fake_out, fake_labels)
        discriminator_fake_loss.backward()
        discriminator_running_loss += discriminator_real_loss.item() + discriminator_fake_loss.item()
        discriminator_optimizer.step()

        discriminator_fake_out = discriminator(generator_fake_out).view(-1)
        generator_loss = criterion(discriminator_fake_out, real_labels)
        generator_running_loss += generator_loss.item()
        generator.zero_grad()
        generator_loss.backward()
        generator_optimizer.step()

    discriminator_loss = discriminator_running_loss / 2*(idx+1)
    generator_loss = generator_running_loss / (idx+1)

    print(f'Discriminator Loss EPOCH {epoch+1}: {discriminator_loss:.4f}')
    print(f'Generator Loss EPOCH {epoch+1}: {generator_loss:.4f}')

    plt.figure(figsize=(20,20))
    plt.subplot(1,2,1)
    plt.axis("off")
    plt.title(f"Epoch {epoch+1} Generated Images")
    grid = vutils.make_grid(generator_fake_out[:9], padding=5, normalize=True, nrow=3).cpu()

    plt.imshow(grid[0], cmap="rainbow")
    plt.show()

stop = timeit.default_timer() 
print(f"Training Time: {stop-start:.2f}s")

  0%|          | 0/2 [00:00<?, ?it/s]libc++abi: terminating due to uncaught exception of type std::__1::system_error: Broken pipe
libc++abi: terminating due to uncaught exception of type std::__1::system_error: Broken pipe
libc++abi: terminating due to uncaught exception of type std::__1::system_error: Broken pipe
libc++abi: terminating due to uncaught exception of type std::__1::system_error: Broken pipe
Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x10ed909a0>
Traceback (most recent call last):
  File "/opt/miniconda3/envs/DCGANS/lib/python3.12/site-packages/torch/utils/data/dataloader.py", line 1617, in __del__
    def __del__(self):

  File "/opt/miniconda3/envs/DCGANS/lib/python3.12/site-packages/torch/utils/data/_utils/signal_handling.py", line 73, in handler
    _error_if_any_worker_fails()
RuntimeError: DataLoader worker (pid 45851) is killed by signal: Abort trap: 6. 
  0%|          | 0/2 [01:04<?, ?it/s]


KeyboardInterrupt: 

In [None]:
torch.cuda.empty_cache()