# Direct Speech To Text

Please run this file in Kaggle and import the [dataset](https://www.kaggle.com/datasets/lokeshbolisetty/speech-to-image-dataset).

Github repo can be found [here](https://github.com/LokeshBolisetty/Direct-Speech-To-Image)

### IMPORTING REQUIRED LIBRARIES

In [1]:
import librosa

import numpy as np
import pandas as pd
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, LabelEncoder

import matplotlib.pyplot as plt
%matplotlib inline

from IPython import display
from IPython.display import clear_output

import torch
import torchvision
import torchvision.transforms as T
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
torch.manual_seed(0)
from torchvision.utils import make_grid
from torchvision.utils import save_image

import numpy as np
import torch.nn as nn
import torch.nn.functional as F

from tqdm.notebook import tqdm
import statistics

import IPython.display as ipd
from IPython.core.display import display
from IPython.display import Image
import warnings

In [2]:
#Making the requirements file
!pip freeze > requirements.txt

In [3]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

## Add dataset from [here](https://www.kaggle.com/datasets/lokeshbolisetty/speech-to-image-dataset)

# Speech captions generation

We are using Google Text to speech(referred to gtts from hereon) software to generate audio captions since the dataset only has text captions. However, there is a limit on the number of requests you can send to that API. Therefore we had to delay the generation and as a result, the code takes very very long time to run(>12 hrs). Anyway, it is not really feasible computationally to work on this entire data. So we sampled some of this data randomly and added it to our [dataset](https://www.kaggle.com/datasets/lokeshbolisetty/speech-to-image-dataset). It is recommended to use that and not to run this. However, the code to generate this data can be found [here](https://github.com/LokeshBolisetty/DL-Project/tree/main/TTS). 

# AUDIO ENCODING

### PREPROCESSING DATA FOR AUDIO ENCODER

In [4]:
BASE_PATH = '../input/speech-to-image-dataset/Sound'
TRAIN_PATH = BASE_PATH  

# Sampling rate per sec
sr = 22050
# Load duration- 2s
load_duration = 2
# Samples for corresponding load duration
TOTAL_SAMPLES = load_duration * sr
# Number of slices to be created of the load duration
NUM_SLICES = 1

SAMPLES_PER_SLICE = int(TOTAL_SAMPLES / NUM_SLICES)
DATA_POINTS = TOTAL_SAMPLES

The project needs audio inputs but the captions are in text form. Since they are not available, we are using Google Text to Speech to convert the text into speech. The length of each speech sample is varied. However, we need them to be of the same size to give them as inputs to the MLP. Therefore, we add padding to the array to make it of length ```22050*5``` because ```22050``` is the sampling rate per sec and 5 is the maximum length of speech we are considering(We observed that most audio samples have a length approx 4 sec and adding more length than required using padding will only decrease the amount of true data). 

This is computationally expensive because we need to load a lot of samples. But using the entire dataset is not computationally feasible here, so we are using only some part of the data. Decreasing the sampling rate results in bad accuracy and hence we are going with decreased number of samples. This is a decision after trying different sizes of datasets. 

In [5]:
# Loading the audio dataset for cats and dogs

#We are ignoring warnings here because everytime we load a file, we get a warning saying pySoundRead failed and that it is using a different function. 
#But since the warning is from inside librosa, there isnt much we can do about it and it is not a problem because the file runs perfectly with it
#This is happening because the wav format gTTS gives is not the exact wav format that librosa uses. However, this is nothing to worry about. 
import warnings
warnings.filterwarnings("ignore")

#This holds the values of the pre-trained input audio
x_train_pre = []
#This holds whether the input is corresponding to a cat or a dog
labels = []

#The path has two folders Cats adn Dogs. Each of them considered
#This takes a couple of minutes to run because we are appending mean value to the numpy arrays and there are a lot of files
for i, class_type in enumerate(sorted(os.listdir(TRAIN_PATH))):
    print(class_type+" processing has started")
    animal_list = np.array(sorted(os.listdir(TRAIN_PATH+'/'+class_type)))

    for animal in animal_list:
        filename = TRAIN_PATH+'/'+class_type+'/'+animal
        audio, sr = librosa.load(filename, offset=0.0, duration=load_duration) #Load timeseries data into audio using librosa. The sampling rate is default value, 22050, load duration is 5 sec
        
        meanValue = audio.sum()/len(audio) #Finding the mean value of audio

        while(audio.size!=sr*load_duration): #Forcing audio to be of same length in every row
            audio = np.append(audio, meanValue) #by appending mean value if the length is lesser
        x_train_pre.append(audio[:SAMPLES_PER_SLICE]) #If the length is larger than 5sec, we just ignore the additional portion. 

        #Making the labels
        if(class_type=='Cats'):
          labels.append(0) #considering 0s represent Cats and 
        else:
          labels.append(1) #1s represent Dogs

#Converting the lists to numpy arrays for further use
x_train_pre = np.array(x_train_pre) 
labels = np.array(labels)

#This will take around 10 minutes to load

In [6]:
#Checking the shape of x_train_pre to make sure that the length of each row is the same (22050*5)
print(x_train_pre.shape)

In [7]:
#Total samples in an audio file is sampling rate per sec times number of seconds
DATA_POINTS = x_train_pre.shape[1]

In [8]:
print(f"Shape of x_train_pre: {x_train_pre.shape}")

In [9]:
# Showing a sample audio clip
ipd.Audio(x_train_pre[300],rate=22050)

In [10]:
print("The audio heard earlier belongs to class ", labels[300])

### DATASET AND DATA LOADER FOR AUDIO ENCODER

In [11]:
#Using a batch size of 20
batch_size = 20

In [12]:
#Making the dataset
from torch.utils.data import DataLoader, Dataset
kwargs = {'num_workers': 1, 'pin_memory': True} if device=='cuda' else {}

class AnimalDataset(Dataset):
    def __init__(self, animal_data):
        self.animal_data = animal_data

    def __len__(self):
        return len(self.animal_data)

    #Method to get each item
    def __getitem__(self, idx):
        data = self.animal_data[idx]
        data = torch.from_numpy(data)
        return (data,idx) #Returning the index so that we know which class the given audio belongs to later using this index and labels array

#Making the train test split in 70-30
train_size = int(0.7*x_train_pre.shape[0])
test_size = x_train_pre.shape[0]-train_size

#Splitting the data randomly
train_set, test_set = torch.utils.data.random_split(AnimalDataset(x_train_pre), [train_size, test_size], generator=torch.Generator().manual_seed(401))

#Making the train and test loader
data_train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, **kwargs)
data_test_loader = DataLoader(test_set, batch_size = batch_size, shuffle=True, **kwargs)
print("The train set is ",len(train_set),"long and the test set is ",len(test_set)," long")

### MODEL OF AUDIO ENCODER

In [13]:
# model architecture

class Encoder(nn.Module):
    def __init__(self, featureDim=DATA_POINTS):
        super(Encoder, self).__init__()

        #Creating a vanilla MLP
        self.fc1 = nn.Linear(featureDim, 16384)
        self.fc2 = nn.Linear(16384, 4096)
        self.fc3 = nn.Linear(4096, 2048)
        self.fc4 = nn.Linear(2048, 512)
        self.fc5 = nn.Linear(512, 128)
        self.fc6 = nn.Linear(128, 32)
        self.fc7 = nn.Linear(32, 8)
        self.fc8 = nn.Linear(8,1)
        self.act = nn.LeakyReLU(0.1)

    def encoder(self, x):
        x = self.act(self.fc1(x))
        x = self.act(self.fc2(x))
        x = self.act(self.fc3(x))
        x = self.act(self.fc4(x))
        x = self.act(self.fc5(x))
        x = self.act(self.fc6(x))
        x = self.act(self.fc7(x))
        x = torch.sigmoid(self.fc8(x))
        return x

    def forward(self, x):
        x = self.encoder(x)
        return x


### LOSS FUNCTION FOR AUDIO ENCODER

In [14]:
#Using binary cross entropy loss because we are doing a binary classification problem
def loss_function(y_, y):
    BCE = nn.BCELoss(y_, y)
    return BCE

### TRAINING AUDIO ENCODER

In [15]:
# model instantiation
model = Encoder()
model.to(device)
print(model)

In [16]:
# Optimizer
#After testing a couple of options for lr, we realised that lr should be as large as 0.1
optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.2)

In [17]:
epochs = 160
loss_values = []
#labels = torch.from_numpy(labels).float().to(device)
for epoch in range(1, epochs+1):
    # monitor training loss
    train_loss = 0.0

    #Training
    for data in data_train_loader:
        #Extracting the actual label
        label = data[1]
        data = data[0]

        data = data.to(device).float()

        #Making the tensor for the labels of all samples in this batch
        thisLabel = torch.Tensor(labels[label])
        thisLabel = thisLabel.to(device)
        optimizer.zero_grad()
        outputs = model(data)

        try:
            outputs = outputs.reshape(batch_size) #For all the batches
        except:
            outputs = outputs.reshape(train_size % batch_size) #For the last batch

        loss = torch.nn.BCELoss()
        thisLoss = loss(outputs, thisLabel)
        thisLoss.backward()
        optimizer.step()
        train_loss += thisLoss.item()*data.size(0)

    train_loss = train_loss/len(data_train_loader)
    loss_values.append(train_loss)
    print('Epoch: {} \tTraining Loss: {:.6f}'.format(epoch, train_loss))


In [18]:
#Accuracy check

#Holds the number of correct predictions
correct = 0

for data in data_test_loader:
    label = data[1] #Loading the actual labels

    data = data[0] #Data is the audio file's numpy array
    data = data.to(device) #Sending data to GPU

    #thisLabel is the labels of all the elements in the current batch
    thisLabel = torch.Tensor(labels[label])
    thisLabel = thisLabel.to(device)
    outputs = model(data)

    try:
        outputs = outputs.reshape(batch_size) #For all the batches
    except:
        outputs = outputs.reshape(test_size % batch_size) #For the last batch
    
    #Counting the number of correct outputs
    i=0
    for x in thisLabel:
      #Setting the threshold as 0.5
      if(outputs[i]<0.5):
        outputs[i] = 0
      else:
        outputs[i] = 1
      #If the outputs match, increase correct
      if(x==outputs[i]):
        correct = correct+1
      i = i+1
    
print("The accuracy on test data is ",correct/test_size)

In [19]:
plt.plot(np.array(loss_values), color="blue")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.grid('True')

In [20]:
#Saving the model for future use
torch.save(model.state_dict(),'Classify.pth')

Run this snippets only if you are using the classify.pth from above cell. Otherwise do not run them. 

In [21]:
model = Encoder()
model.load_state_dict(torch.load('./Classify.pth'))
model.eval()
model.to(device)

In [22]:
audio, sr = librosa.load('../input/speech-to-image-dataset/Sound/Cats/000000032816_3.wav', offset=0.0, duration=load_duration)
while(audio.size!=44100):
  audio = np.append(audio, 0)
audio = torch.Tensor(audio).float().to(device)
model(audio)

# GAN

In [23]:
# to normalize the pixel values, we choose a mean standard deviation of 0.5 for each channel
# this will ensure that the pixel values are in the range of (-1, 1)
# as its very convenient to train the discriminator when the pixel values are in the range of (-1, 1)
image_size = 64
batch_size = 128 #used to create a data loader
stats = (0.5, 0.5, 0.5), (0.5, 0.5, 0.5) # means, standard deviations

In [24]:
# as we normalized the pixel values into (-1, 1) 
# this denormalization brings the pixel values back 
# into the range of (0, 1) we use this while we view images
def denorm(img_tensors):
    return img_tensors * stats[1][0] + stats[0][0]

In [25]:
# show_images takes image tensors and maximum number of images it should show and plots them in a grid

def show_images(images, nmax=64):
    fig, ax = plt.subplots(figsize=(8, 8))
    ax.set_xticks([]); ax.set_yticks([])
    ax.imshow(make_grid(denorm(images.detach()[:nmax]), nrow=8).permute(1, 2, 0))

# show_batch takes the data loader so as get the batch of images from dataloader and show the images 

def show_batch(dl, nmax=64):
    for images, _ in dl:
        show_images(images, nmax)
        break

def show_images_2x2(images, nmax=64):
    fig, ax = plt.subplots(figsize=(2, 2))
    ax.set_xticks([]); ax.set_yticks([])
    ax.imshow(make_grid(denorm(images.detach()[:nmax]), nrow=8).permute(1, 2, 0))

In [26]:
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():   # for this to retrun true 3 conditions should hold true, 
                                    # Execution environment should be connected to a hardware which is a Nvidia GPU or a graphics card
                                    # Cuda Drivers installed
                                    # Pytorch version that is compatable with GPU
                                    # all these are ensured in colab/kaggle 
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
# to_device takes data and move it onto a target device 
def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

In [27]:
# this cell is just to make sure we are using gpu
# it outputs 'cuda' in case we are using gpu, else it outputs 'cpu'

device = get_default_device()
device

In [28]:
DATA_DIR_dog = '../input/speech-to-image-dataset/Photos/DogImages'
DATA_DIR_cat = '../input/speech-to-image-dataset/Photos/CatImages'
latent_size = 128

In [29]:
generator_dog = nn.Sequential(
    # in: latent_size x 1 x 1

    nn.ConvTranspose2d(latent_size, 512, kernel_size=4, stride=1, padding=0, bias=False),
    nn.BatchNorm2d(512),
    nn.ReLU(True), #Activation Function
    # out: 512 x 4 x 4

    nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(256),
    nn.ReLU(True), #Activation Function
    # out: 256 x 8 x 8

    nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.ReLU(True), #Activation Function
    # out: 128 x 16 x 16

    nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(64),
    nn.ReLU(True), #Activation Function
    # out: 64 x 32 x 32

    nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1, bias=False),
    nn.Tanh()
    # out: 3 x 64 x 64
)
# So the outputs of generator are pixel values in the range of (-1, 1) and are of the shape 3*64*64
# which is same as the Images picked from the dataset after normalization 

In [30]:
generator_cat = nn.Sequential(
    # in: latent_size x 1 x 1

    nn.ConvTranspose2d(latent_size, 512, kernel_size=4, stride=1, padding=0, bias=False),
    nn.BatchNorm2d(512),
    nn.ReLU(True), #Activation Function
    # out: 512 x 4 x 4

    nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(256),
    nn.ReLU(True), #Activation Function
    # out: 256 x 8 x 8

    nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.ReLU(True), #Activation Function
    # out: 128 x 16 x 16

    nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(64),
    nn.ReLU(True), #Activation Function
    # out: 64 x 32 x 32

    nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1, bias=False),
    nn.Tanh()
    # out: 3 x 64 x 64
)
# So the outputs of generator are pixel values in the range of (-1, 1) and are of the shape 3*64*64
# which is same as the Images picked from the dataset after normalization 

In [31]:
fixed_latent = torch.randn(64, latent_size, 1, 1, device=device)

In [32]:
#cat_generator = generator_cat()
generator_cat.load_state_dict(torch.load('../input/speech-to-image-dataset/SavedModels/G_cat.pth'))
generator_cat.to(device)
generator_cat.eval()

In [33]:
def show_one_image(images, nmax=64):
    fig, ax = plt.subplots(figsize=(2, 2))
    ax.set_xticks([]); ax.set_yticks([])
    ax.imshow(make_grid(denorm(images.detach()[:nmax]), nrow=8).permute(1, 2, 0))

In [34]:
generator_dog.load_state_dict(torch.load('../input/speech-to-image-dataset/SavedModels/G_dog.pth'))
generator_dog.to(device)
generator_dog.eval()

In [35]:
xb = torch.randn(1, latent_size, 1, 1) # random latent tensors
xb = xb.to(device)
fake_images = generator_dog(xb)
fake_images = fake_images.cpu()
print(fake_images.shape)
show_one_image(fake_images)

In [36]:
xb = torch.randn(1, latent_size, 1, 1) # random latent tensors
xb = xb.to(device)
fake_images = generator_cat(xb)
fake_images = fake_images.cpu()
print("Image of a cat")
print(fake_images.shape)
show_one_image(fake_images)

In [37]:
def print_dog():
    xb = torch.randn(1, latent_size, 1, 1) # random latent tensors
    xb = xb.to(device)
    fake_images = generator_dog(xb)
    fake_images = fake_images.cpu()
    print("Image of a dog: ")
    show_one_image(fake_images)

In [38]:
def print_cat():
    xb = torch.randn(1, latent_size, 1, 1) # random latent tensors
    xb = xb.to(device)
    fake_images = generator_cat(xb)
    fake_images = fake_images.cpu()
    print("Image of a cat: ")
    show_one_image(fake_images)

# Working Examples

In [39]:
def play_sound(tensor):
    display(ipd.Audio(tensor,rate = sr, autoplay=True))

In [40]:
def generateAudio(audio_path):
    sample_audio1, sr = librosa.load(audio_path, offset=0.0, duration=load_duration)
    while(sample_audio1.size!=44100):
        sample_audio1 = np.append(sample_audio1, 0)
    sample_audio1 = torch.Tensor(sample_audio1).float()
    print("The said statment is ")
    #ipd.Audio(sample_audio1,rate=22050)
    play_sound(sample_audio1)
    sample_audio1 = sample_audio1.to(device)
    output = model(sample_audio1)
    xb = torch.rand(1,latent_size, 1, 1)
    xb = xb.to(device)
    if(output<0.5):
        print("The model categorised this as a cat.\nNow generating Cat image")
        print_cat()
    else:
        print("The model categorised this as a dog. Now generating Dog image")
        print_dog()

In [41]:
audio_path = '../input/speech-to-image-dataset/Sound/Cats/000000032816_3.wav'
generateAudio(audio_path)

In [42]:
audio_path = '../input/speech-to-image-dataset/Sound/Dogs/000000024664_0.wav'
generateAudio(audio_path)

# Note
Run the following code only to see that there are no errors. The following training takes about 6 hours in total. We have used the trianed model's saved states in the earlier cells. 

### For Dogs

Load the dataset using the ImageFolder class from torchvisison. Resize the images to 64x64 and normalize the pixels so that all the pixels are in the range (-1,1). 

In [43]:
train_ds_dog = ImageFolder(DATA_DIR_dog, transform=T.Compose([
    T.Resize(image_size),
    T.CenterCrop(image_size),
    T.ToTensor(),
    T.Normalize(*stats)]))

train_dl_dog = DataLoader(train_ds_dog, batch_size, shuffle=True, num_workers=3, pin_memory=True)
#Ignore the warning here

In [44]:
show_batch(train_dl_dog)

We can now move our training data loader using `DeviceDataLoader` for automatically transferring batches of data to the GPU (if available).

In [45]:
# we are converting training data loader to a device data loader
train_dl_dog = DeviceDataLoader(train_dl_dog, device)

### For Cats

In [46]:
train_ds_cat = ImageFolder(DATA_DIR_cat, transform=T.Compose([
    T.Resize(image_size),
    T.CenterCrop(image_size),
    T.ToTensor(),
    T.Normalize(*stats)]))

train_dl_cat = DataLoader(train_ds_cat, batch_size, shuffle=True, num_workers=3, pin_memory=True)
#Ignore the warning here

In [47]:
show_batch(train_dl_cat)

In [48]:
train_dl_cat = DeviceDataLoader(train_dl_cat, device)

## Discriminator Network

In [49]:
discriminator_dog = nn.Sequential(
    # in: 3 x 64 x 64

    nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(64),
    nn.LeakyReLU(0.2, inplace=True), #Activation Function
    # out: 64 x 32 x 32

    nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.LeakyReLU(0.2, inplace=True), #Activation Function
    # out: 128 x 16 x 16

    nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(256),
    nn.LeakyReLU(0.2, inplace=True), #Activation Function
    # out: 256 x 8 x 8

    nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(512),
    nn.LeakyReLU(0.2, inplace=True), #Activation Function
    # out: 512 x 4 x 4

    nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=0, bias=False),
    # out: 1 x 1 x 1

    nn.Flatten(), # to flatten it out into a single vector
    nn.Sigmoid()) # as we have a single class we are using Sigmoid()

In [50]:
discriminator_cat = nn.Sequential(
    # in: 3 x 64 x 64

    nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(64),
    nn.LeakyReLU(0.2, inplace=True), #Activation Function
    # out: 64 x 32 x 32

    nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(128),
    nn.LeakyReLU(0.2, inplace=True),#Activation Function
    # out: 128 x 16 x 16

    nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(256),
    nn.LeakyReLU(0.2, inplace=True), #Activation Function
    # out: 256 x 8 x 8

    nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1, bias=False),
    nn.BatchNorm2d(512),
    nn.LeakyReLU(0.2, inplace=True), #Activation Function
    # out: 512 x 4 x 4

    nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=0, bias=False),
    # out: 1 x 1 x 1

    nn.Flatten(), # to flatten it out into a single vector
    nn.Sigmoid()) # as we have a single class we are using Sigmoid()

Note that we're using the Leaky ReLU activation for the discriminator.

Just like any other binary classification model, the output of the discriminator is a single number between 0 and 1, which can be interpreted as the probability of the input image being real i.e. picked from the original dataset.

In [51]:
#moving the discriminator model to device
discriminator_dog = to_device(discriminator_dog, device)
discriminator_cat = to_device(discriminator_cat, device)

## Generator Network

We use `ConvTranspose2d` to perform *transposed convolution*. This will convert a latent tensor of (128,1,1) to (3,28,28)

In [52]:
latent_size = 128

We use the TanH activation function for the output layer of the generator.

In [53]:
xb = torch.randn(batch_size, latent_size, 1, 1) # random latent tensors
xb = xb.to(device)

In [54]:
fake_images_dog = generator_dog(xb)
fake_images_dog = fake_images_dog.cpu()
show_images(fake_images_dog)

In [55]:
fake_images_cat = generator_cat(xb)
print(fake_images_cat.shape)
show_images(fake_images_cat.cpu())

In [56]:
# moving the generator model to the device
generator_dog = to_device(generator_dog, device)
generator_cat = to_device(generator_cat, device)

In [57]:
def train_discriminator_dog(real_images_dog, opt_d):
    # Clear discriminator gradients
    opt_d.zero_grad()

    # Pass real images through discriminator
    # targets are set to ones for all the real images
    real_preds_dog = discriminator_dog(real_images_dog)
    real_targets_dog = torch.ones(real_images_dog.size(0), 1, device=device)
    real_loss_dog = F.binary_cross_entropy(real_preds_dog, real_targets_dog)
    real_score_dog = torch.mean(real_preds_dog).item()
    
    # Generate fake images
    latent = torch.randn(batch_size, latent_size, 1, 1, device=device)
    fake_images_dog = generator_dog(latent)

    # Pass fake images through discriminator
    # targets are set to zero for all the fake images
    fake_targets_dog = torch.zeros(fake_images_dog.size(0), 1, device=device)
    fake_preds_dog = discriminator_dog(fake_images_dog)
    fake_loss_dog = F.binary_cross_entropy(fake_preds_dog, fake_targets_dog)
    fake_score_dog = torch.mean(fake_preds_dog).item()

    # Update discriminator weights
    loss_dog = real_loss_dog + fake_loss_dog
    loss_dog.backward()
    opt_d.step()
    return loss_dog.item(), real_score_dog, fake_score_dog

In [58]:
def train_discriminator_cat(real_images_cat, opt_d):
    # Clear discriminator gradients
    opt_d.zero_grad()

    # Pass real images through discriminator
    # targets are set to ones for all the real images
    real_preds_cat = discriminator_cat(real_images_cat)
    real_targets_cat = torch.ones(real_images_cat.size(0), 1, device=device)
    real_loss_cat = F.binary_cross_entropy(real_preds_cat, real_targets_cat)
    real_score_cat = torch.mean(real_preds_cat).item()
    
    # Generate fake images
    latent = torch.randn(batch_size, latent_size, 1, 1, device=device)
    fake_images_cat = generator_cat(latent)

    # Pass fake images through discriminator
    # targets are set to zero for all the fake images
    fake_targets_cat = torch.zeros(fake_images_cat.size(0), 1, device=device)
    fake_preds_cat = discriminator_cat(fake_images_cat)
    fake_loss_cat = F.binary_cross_entropy(fake_preds_cat, fake_targets_cat)
    fake_score_cat = torch.mean(fake_preds_cat).item()

    # Update discriminator weights
    loss_cat = real_loss_cat + fake_loss_cat
    loss_cat.backward()
    opt_d.step()
    return loss_cat.item(), real_score_cat, fake_score_cat

## Generator Training

- We generate a batch of images using the generator, pass the into the discriminator.

- We calculate the loss by setting the target labels to 1 i.e. real. We do this because the generator's objective is to "fool" the discriminator. 

- We use the loss to perform gradient descent i.e. change the weights of the generator, so it gets better at generating real-like images to "fool" the discriminator.


In [59]:
def train_generator_dog(opt_g):
    # Clear generator gradients
    opt_g.zero_grad()
    
    # Generate fake images
    latent = torch.randn(batch_size, latent_size, 1, 1, device=device)
    fake_images_dog = generator_dog(latent)
    
    # Try to fool the discriminator
    preds_dog = discriminator_dog(fake_images_dog)
    targets_dog = torch.ones(batch_size, 1, device=device)
    loss_dog = F.binary_cross_entropy(preds_dog, targets_dog)
    
    # Update generator weights
    loss_dog.backward()
    opt_g.step()
    
    return loss_dog.item()

In [60]:
def train_generator_cat(opt_g):
    # Clear generator gradients
    opt_g.zero_grad()
    
    # Generate fake images
    latent = torch.randn(batch_size, latent_size, 1, 1, device=device)
    fake_images_cat = generator_cat(latent)
    
    # Try to fool the discriminator
    preds_cat = discriminator_cat(fake_images_cat)
    targets_cat = torch.ones(batch_size, 1, device=device)
    loss_cat = F.binary_cross_entropy(preds_cat, targets_cat)
    
    # Update generator weights
    loss_cat.backward()
    opt_g.step()
    
    return loss_cat.item()

Saving the intermediate outputs from the generator to understand the speed of the training.

In [61]:
sample_dir_dog = 'generated_dog'
os.makedirs(sample_dir_dog, exist_ok=True)
sample_dir_cat = 'generated_cat'
os.makedirs(sample_dir_cat, exist_ok=True)

In [62]:
def save_samples_dog(index, latent_tensors, show=True):
    fake_images_dog = generator_dog(latent_tensors)
    fake_fname_dog = 'generated-images-{0:0=4d}.png'.format(index)
    save_image(denorm(fake_images_dog), os.path.join(sample_dir_dog, fake_fname_dog), nrow=8)
    print('Saving', fake_fname_dog)
    if show:
        fig, ax = plt.subplots(figsize=(8, 8))
        ax.set_xticks([]); ax.set_yticks([])
        ax.imshow(make_grid(fake_images_dog.cpu().detach(), nrow=8).permute(1, 2, 0))

In [63]:
def save_samples_cat(index, latent_tensors, show=True):
    fake_images_cat = generator_cat(latent_tensors)
    fake_fname_cat = 'generated-images-{0:0=4d}.png'.format(index)
    save_image(denorm(fake_images_cat), os.path.join(sample_dir_cat, fake_fname_cat), nrow=8)
    print('Saving', fake_fname_cat)
    if show:
        fig, ax = plt.subplots(figsize=(8, 8))
        ax.set_xticks([]); ax.set_yticks([])
        ax.imshow(make_grid(fake_images_cat.cpu().detach(), nrow=8).permute(1, 2, 0))

We'll use a fixed set of input vectors to the generator to see how the individual generated images evolve over time as we train the model. Let's save one set of images before we start training our model.

In [64]:
# creating a set of latent tensors that we can use after each epoch
fixed_latent = torch.randn(64, latent_size, 1, 1, device=device)

In [65]:
#saving the samples before training
save_samples_dog(0, fixed_latent)
save_samples_cat(0, fixed_latent)

## Training

In [66]:
def fit_dog(epochs, lr, start_idx=1):
    torch.cuda.empty_cache() #to remove unused data from GPU  
    
    # Losses & scores
    losses_g_dog = [] # generator losses
    losses_d_dog = [] # discriminator losses
    real_scores_dog = []
    fake_scores_dog = []
    
    # Create optimizers
    opt_d = torch.optim.Adam(discriminator_dog.parameters(), lr=lr, betas=(0.5, 0.999))
    opt_g = torch.optim.Adam(generator_dog.parameters(), lr=lr, betas=(0.5, 0.999))
    
    for epoch in range(epochs):
        for real_images, _ in tqdm(train_dl_dog):
            # Train discriminator
            loss_d, real_score, fake_score = train_discriminator_dog(real_images, opt_d)
            # Train generator
            loss_g = train_generator_dog(opt_g)
            
        # Record losses & scores
        losses_g_dog.append(loss_g)
        losses_d_dog.append(loss_d)
        real_scores_dog.append(real_score)
        fake_scores_dog.append(fake_score)
        
        # Log losses & scores (last batch)
        print("Epoch [{}/{}], loss_g: {:.4f}, loss_d: {:.4f}, real_score: {:.4f}, fake_score: {:.4f}".format(
            epoch+1, epochs, loss_g, loss_d, real_score, fake_score))
    
        # Save generated images
        save_samples_dog(epoch+start_idx, fixed_latent, show=False)
    
    return losses_g_dog, losses_d_dog, real_scores_dog, fake_scores_dog

In [67]:
def fit_cat(epochs, lr, start_idx=1):
    torch.cuda.empty_cache() #to remove unused data from GPU  
    
    # Losses & scores
    losses_g_cat = [] # generator losses
    losses_d_cat = [] # discriminator losses
    real_scores_cat = []
    fake_scores_cat = []
    
    # Create optimizers
    opt_d = torch.optim.Adam(discriminator_cat.parameters(), lr=lr, betas=(0.5, 0.999))
    opt_g = torch.optim.Adam(generator_cat.parameters(), lr=lr, betas=(0.5, 0.999))
    
    for epoch in range(epochs):
        for real_images, _ in tqdm(train_dl_cat):
            # Train discriminator
            loss_d, real_score, fake_score = train_discriminator_cat(real_images, opt_d)
            # Train generator
            loss_g = train_generator_cat(opt_g)
            
        # Record losses & scores
        losses_g_cat.append(loss_g)
        losses_d_cat.append(loss_d)
        real_scores_cat.append(real_score)
        fake_scores_cat.append(fake_score)
        
        # Log losses & scores (last batch)
        print("Epoch [{}/{}], loss_g: {:.4f}, loss_d: {:.4f}, real_score: {:.4f}, fake_score: {:.4f}".format(
            epoch+1, epochs, loss_g, loss_d, real_score, fake_score))
    
        # Save generated images
        save_samples_cat(epoch+start_idx, fixed_latent, show=False)
    
    return losses_g_cat, losses_d_cat, real_scores_cat, fake_scores_cat

In [68]:
lr = 0.0002
epochs = 1000

In [69]:
history_dog = fit_dog(epochs, lr)

In [None]:
history_cat = fit_cat(epochs, lr)

In [None]:
losses_g_dog, losses_d_dog, real_scores_dog, fake_scores_dog = history_dog

In [None]:
losses_g_cat, losses_d_cat, real_scores_cat, fake_scores_cat = history_cat

## Saving the checkpoints

In [None]:
# Save the model checkpoints 
torch.save(generator_dog.state_dict(), 'G_dog.pth')
torch.save(discriminator_dog.state_dict(), 'D_dog.pth')

In [None]:
# Save the model checkpoints 
torch.save(generator_cat.state_dict(), 'G_cat.pth')
torch.save(discriminator_cat.state_dict(), 'D_cat.pth')