In [None]:

import os
import sys
from six.moves import urllib
from scipy.io import loadmat

import time
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader 
from torchvision.transforms.functional import convert_image_dtype
import matplotlib.pyplot as plt
import tqdm 

## Utility functions

In [None]:
def log_sum_exp(x):
    m = torch.max(x,1)[0]
    s = torch.logsumexp(x-m.unsqueeze(1) , 1)
    return s+m

In [None]:
## Utility functions for loading SVHN dataset

def maybe_download(data_dir):
    new_data_dir = os.path.join(data_dir, 'svhn')
    if not os.path.exists(new_data_dir):
        os.makedirs(new_data_dir)
        def _progress(count, block_size, total_size):
            sys.stdout.write('\r>> Downloading %.1f%%' % (float(count * block_size) / float(total_size) * 100.0))
            sys.stdout.flush()
        filepath, _ = urllib.request.urlretrieve('http://ufldl.stanford.edu/housenumbers/train_32x32.mat', new_data_dir+'/train_32x32.mat', _progress)
        filepath, _ = urllib.request.urlretrieve('http://ufldl.stanford.edu/housenumbers/test_32x32.mat', new_data_dir+'/test_32x32.mat', _progress)

def svhn_data_load(data_dir, subset='train'):
    maybe_download(data_dir)
    if subset=='train':
        train_data = loadmat(os.path.join(data_dir, 'svhn') + '/train_32x32.mat')
        trainx = train_data['X']
        trainy = train_data['y'].flatten()
        trainy[trainy==10] = 0
        return trainx, trainy
    elif subset=='test':
        test_data = loadmat(os.path.join(data_dir, 'svhn') + '/test_32x32.mat')
        testx = test_data['X']
        testy = test_data['y'].flatten()
        testy[testy==10] = 0
        return testx, testy
    else:
        raise NotImplementedError('subset should be either train or test')


### Data preprocessing

This section is for reproducability of results 

In [None]:
trainx, trainy = svhn_data_load('data','train')
testx, testy = svhn_data_load('data','test')

#Remove this once the training happens properly, and GPU's can be used effectively. 

#trainx = trainx[:,:,:,:10000]
#trainy = trainy[:10000]

#testx= testx[:,:,:,:10000]
#testy = testy[:10000]

trainx = np.transpose(trainx,(3,2,0,1))
testx = np.transpose(testx,(3,2,0,1))


Creating the dataset class and the data loaders 

In [None]:
class SvhnDataset(Dataset):
    def __init__(self, x,y):
        super(SvhnDataset, self).__init__()
        
        self.x = x 
        self.y = y
        self.unl1 = x[rng_data.permutation(x.shape[0])]
        self.unl2 = x[rng_data.permutation(x.shape[0])]
        
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self,idx):
        
        x_lab = convert_image_dtype(torch.tensor(self.x[idx]))
        lab = torch.tensor(self.y[idx],dtype = torch.int64)
        x_ul1 = convert_image_dtype(torch.tensor(self.unl1[idx]))
        x_ul2 = convert_image_dtype(torch.tensor(self.unl2[idx]))
        
        return x_lab, lab, x_ul1, x_ul2



## Generator model 

In [None]:

class View(nn.Module):
    def __init__(self, *shape):
        super(View, self).__init__()
        self.shape = shape
        
    def forward(self, input):
        bs = input.size(0)
        return input.view(bs,*self.shape)

class Generator(nn.Module):
    
    def __init__(self):
        
        super(Generator, self).__init__()
        
        self.main = nn.Sequential( 
            nn.Linear(100, 4*4*512), 
            nn.ReLU(), 
            nn.BatchNorm1d(4*4*512), 
            
            View(512,4,4), 
            # state size = 512x4x4
            
            nn.ConvTranspose2d(512,256,4,2,1), 
            nn.BatchNorm2d(256), 
            nn.ReLU(), 
            # state size = 256x8x8
            
            nn.ConvTranspose2d(256,128,4,2,1), 
            nn.BatchNorm2d(128), 
            nn.ReLU(),
            #state size = 128x16x16
            
            nn.utils.weight_norm(nn.ConvTranspose2d(128,3,4,2,1)),
            nn.Tanh()
            #state size = 3x32x32
        )
        
    def forward(self,input):
        return self.main(input)
    


Just testing the model to see if the output is coming as expected. Also, print a summary of the layers in the model

In [None]:
sample_noise = torch.rand(32,100)

test_gen = Generator()
sample_out =  test_gen(sample_noise)

print("Model Summary:")
print(test_gen,'\n')
print(f'Shape of generator output is {sample_out.shape}')


## Discriminator model

In [None]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator,self).__init__()
        
        self.main = nn.Sequential( 
            nn.Dropout(p=0.2),
            
            nn.utils.weight_norm(nn.Conv2d(3,64,3,padding=1)), 
            nn.LeakyReLU(), 
        
            nn.utils.weight_norm(nn.Conv2d(64,64,3,padding=1)), 
            nn.LeakyReLU(0.2), 
            
            nn.utils.weight_norm(nn.Conv2d(64,64,3,stride=2,padding=1)), 
            nn.LeakyReLU(0.2), 
            
            nn.Dropout2d(p=0.5), 
            
            nn.utils.weight_norm(nn.Conv2d(64,128,3,padding=1)), 
            nn.LeakyReLU(0.2), 
        
            nn.utils.weight_norm(nn.Conv2d(128,128,3,padding=1)), 
            nn.LeakyReLU(0.2), 
            
            nn.utils.weight_norm(nn.Conv2d(128,128,3,stride=2,padding=1)), 
            nn.LeakyReLU(0.2), 
            
            nn.Dropout2d(p=0.5), 
            
            nn.utils.weight_norm(nn.Conv2d(128,128,3,padding=0)),
            nn.LeakyReLU(0.2), 
            
            nn.utils.weight_norm(nn.Conv2d(128,128,1)), 
            nn.LeakyReLU(0.2),
            nn.utils.weight_norm(nn.Conv2d(128,128,1)), 
            nn.LeakyReLU(0.2),
            
            nn.AvgPool2d(6),
            View(128))
        
        self.last_layer = nn.utils.weight_norm(nn.Linear(128,10))
        
    def forward(self,input,feature=False):
        if feature:
            return self.main(input)
        else:
            return self.last_layer(self.main(input))
            
    

Testing the discriminator with a random sample

In [None]:
sample_noise = torch.rand(64,3,32,32)
test_disc = Discriminator()

disc_out = test_disc(sample_noise)
disc_out.shape

Weight initialisation function, as copied from the DCGAN tutorial on PyTorch. Remember to change the init values according to the original code 

In [None]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.05)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.05)
        nn.init.constant_(m.bias.data, 0)       
        

Testing the weight initialisation

In [None]:
gen = Generator()
disc = Discriminator()

gen.apply(weights_init)
disc.apply(weights_init)

## Training functions 

In [None]:
def Dstep(x_lab, lab, x_ul, noise):

    output_before_softmax_lab = disc(x_lab)
    output_before_softmax_unl = disc(x_ul)
    output_before_softmax_gen = disc(gen(noise))
    
    l_lab = output_before_softmax_lab[(torch.arange(min(batch_size,lab.shape[0])),lab)]
    
    l_unl = log_sum_exp(output_before_softmax_unl)
    l_gen = log_sum_exp(output_before_softmax_gen)
    
    loss_lab = -torch.mean(l_lab) + torch.mean(log_sum_exp(output_before_softmax_lab))
    loss_unl = -0.5*torch.mean(l_unl) + 0.5*torch.mean(nn.functional.softplus(l_unl)) + 0.5*torch.mean(nn.functional.softplus(l_gen))
    train_err = torch.mean(torch.ne(torch.argmax(output_before_softmax_lab,dim=1),lab).float())
    
    total_loss = loss_lab+ unlabelled_weight*loss_unl
    
    Doptim.zero_grad()
    total_loss.backward()
    Doptim.step()
    
    return loss_lab.item(), loss_unl.item(), train_err.item()


def Gstep(x_ul,noise):
    
    output_unl = disc(x_ul, feature=True)
    output_gen = disc(gen(noise), feature=True)

    m1 = torch.mean(output_unl,axis=0)
    m2 = torch.mean(output_gen,axis=0)
    loss_gen = torch.mean(abs(m1-m2))

    Goptim.zero_grad()
    loss_gen.backward()
    Goptim.step()
    
    return loss_gen.item()


def test(x_lab,lab):
    
    output_before_softmax_lab = disc(x_lab)
    train_err = torch.mean(torch.ne(torch.argmax(output_before_softmax_lab,dim=1),lab).float())
    
    return train_err.item()

In [None]:
def trainGAN(disc,gen,Doptim,Goptim,
             dataloader_train,dataloader_test,noise_dim,
             lr, num_epochs=1, device = 'cuda'):
    
    for epoch in range(num_epochs):
        
        lr = lr*min(3-epoch/400,1)
        for param_group in Doptim.param_groups:
            param_group['lr'] = lr
        for param_group in Goptim.param_groups:
            param_group['lr'] = lr
            
        begin = time.time()
        disc.train()
        gen.train()

        disc.to(device)
        gen.to(device)
        
        loss_lab = 0
        loss_unl = 0
        train_err = 0
        loss_gen = 0
        disc_avg_params = None

        for _, (x_lab,lab,x_ul1,x_ul2) in enumerate(tqdm.tqdm(dataloader_train, desc="Training GAN")):  

            noise = torch.rand(noise_dim)

            noise = noise.to(device)
            x_lab = x_lab.to(device)
            lab = lab.to(device)
            x_ul1 = x_ul1.to(device)
            x_ul2 = x_ul2.to(device)

            ll,lu,te = Dstep(x_lab, lab, x_ul1, noise)

            loss_lab+=ll
            loss_unl+=lu
            train_err+=te

            ## Historical averaging

            current_param = [param.data for param in disc.parameters()]
            if disc_avg_params == None:
                disc_avg_params = current_param
            else:
                disc_avg_params = [a+0.0001*(p-a) for (p,a) in zip(current_param,disc_avg_params)]
                
            noise = torch.rand(noise_dim).cuda()

            lg = Gstep(x_ul2,noise)
            loss_gen+=lg

        loss_lab/= (_+1)
        loss_unl/= (_+1)
        train_err/= (_+1)
        loss_gen/=(_+1)


        # Shifting the averaged parameters to test the model 
        orig_param = [param.data for param in disc.parameters()]
        for param,avg in zip(disc.parameters(),disc_avg_params):
            param.data.copy_(avg)

        ## Testing the model 

        test_err = 0
        for _,(tx,lab,tx_ul1,tx_ul2) in enumerate(tqdm.tqdm(dataloader_test,desc = " Testing GAN")):

            disc.eval()
            gen.eval()

            tx = tx.cuda()
            lab = lab.cuda()

            with torch.no_grad():
                te= test(tx,lab)
            test_err+=te

        test_err/=(_+1)

        #Shifting the original parameters back to the model 
        for param,orig in zip(disc.parameters(),orig_param):
            param.data.copy_(orig)

        print(f"Epoch {epoch}, loss_lab = {loss_lab}, loss_unl = {loss_unl}, train err = {train_err}, test err = {test_err}")
        sys.stdout.flush()


        

## Training the model 

This section deals with the training of the model. All dataloaders and models are expected to be instanatiated here, for ease of tuning hyperparameters. 

In [None]:
## All data related variables 

seed_data = 1
seed = 1
count = 50
batch_size = 24
unlabelled_weight = 1

rng_data = np.random.RandomState(seed_data)
rng = np.random.RandomState(seed)

noise_dim = (batch_size,100)

dataloader_train = DataLoader(SvhnDataset(trainx, trainy), batch_size=batch_size, shuffle=True, num_workers=8)
dataloader_test = DataLoader(SvhnDataset(testx, testy), batch_size=batch_size, shuffle=True, num_workers=8)

In [None]:
# All model related variables

disc = Discriminator()
gen = Generator()

disc.apply(weights_init)
gen.apply(weights_init)

lr = 0.001
betas = (0.9,0.999)

Goptim = torch.optim.Adam(gen.parameters(), lr,betas)
Doptim = torch.optim.Adam(disc.parameters(),lr,betas)


In [None]:
# Commencing training

trainGAN(disc,gen,Doptim,Goptim,
         dataloader_train,dataloader_test,noise_dim,
         lr, num_epochs=1, device = 'cuda:0')