In [None]:
import os 
import json 

import torch 
import torch.nn as nn 
import torch.optim as optim 
from torch.utils.data.dataloader import Dataset , DataLoader

import torchvision 
import torchvision.transforms as transforms
import torchvision.datasets as datasets 

import torch.autograd as tag
import matplotlib.pyplot as plt 

import numpy as np 
import pandas as pd 

from PIL import Image


In [None]:
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
print(" Starting, device =" ,device )

 Starting, device = cuda:0


In [None]:
dic = {'n01968897': 0, 'n01770081': 1, 'n01818515': 2, 'n02011460': 3, 'n01496331': 4, 'n01847000': 5, 'n01687978': 6, 'n01740131': 7, 'n01537544': 8, 'n01491361': 9, 'n02007558': 10, 'n01735189': 11, 'n01630670': 12, 'n01440764': 13, 'n01819313': 14, 'n02002556': 15, 'n01667778': 16, 'n01755581': 17, 'n01924916': 18, 'n01751748': 19, 'n01984695': 20, 'n01729977': 21, 'n01614925': 22, 'n01608432': 23, 'n01443537': 24, 'n01770393': 25, 'n01855672': 26, 'n01560419': 27, 'n01592084': 28, 'n01914609': 29, 'n01582220': 30, 'n01667114': 31, 'n01985128': 32, 'n01820546': 33, 'n01773797': 34, 'n02006656': 35, 'n01986214': 36, 'n01484850': 37, 'n01749939': 38, 'n01828970': 39, 'n02018795': 40, 'n01695060': 41, 'n01729322': 42, 'n01677366': 43, 'n01734418': 44, 'n01843383': 45, 'n01806143': 46, 'n01773549': 47, 'n01775062': 48, 'n01728572': 49, 'n01601694': 50, 'n01978287': 51, 'n01930112': 52, 'n01739381': 53, 'n01883070': 54, 'n01774384': 55, 'n02037110': 56, 'n01795545': 57, 'n02027492': 58, 'n01531178': 59, 'n01944390': 60, 'n01494475': 61, 'n01632458': 62, 'n01698640': 63, 'n01675722': 64, 'n01877812': 65, 'n01622779': 66, 'n01910747': 67, 'n01860187': 68, 'n01796340': 69, 'n01833805': 70, 'n01685808': 71, 'n01756291': 72, 'n01514859': 73, 'n01753488': 74, 'n02058221': 75, 'n01632777': 76, 'n01644900': 77, 'n02018207': 78, 'n01664065': 79, 'n02028035': 80, 'n02012849': 81, 'n01776313': 82, 'n02077923': 83, 'n01774750': 84, 'n01742172': 85, 'n01943899': 86, 'n01798484': 87, 'n02051845': 88, 'n01824575': 89, 'n02013706': 90, 'n01955084': 91, 'n01773157': 92, 'n01665541': 93, 'n01498041': 94, 'n01978455': 95, 'n01693334': 96, 'n01950731': 97, 'n01829413': 98, 'n01514668': 99}

In [None]:
class CustomTrainLoader( Dataset):
    
    def __init__(self , data_dir , transform = None ):
        super().__init__() 
        self.data_dir = data_dir 
        self.transform = transform
        
        with open( os.path.join( data_dir , 'Labels.json')) as f : 
            self.class_labels = json.load(f) 
        
        # Dataloader iterable is stored in images , labels lists 
        self.images : list = [] 
        self.labels : list = [] 
        
        
        for i in range( 1 , 2 ):  # from train.X1 to train.X4 
            
            xi_dir = os.path.join( self.data_dir , f'train.X{i}')
            
            for classname in os.listdir( xi_dir )  : 
                
                class_dir = os.path.join( xi_dir , classname)
                label_name = self.class_labels[ classname]
                
                for image_name  in os.listdir( class_dir):
                    
                    image_path = os.path.join( class_dir , image_name)
                    
                    
                    self.images.append( image_path )
                    self.labels.append( dic[classname] )

    def __len__(self) :
        return len( self.labels)
    
    def __getitem__(self , idx ) :
        image_path = self.images[idx]
        label = self.labels[idx]
        
        image = Image.open( image_path).convert('RGB')
        
        if self.transform : 
            image = self.transform(image)
        
        return image , label

class CustomTestLoader(Dataset):
    
    def __init__(self , data_dir , transform= None):
        super().__init__() 
        self.root_dir = data_dir
        self.transform = transform
        
        with open( "./archive/Labels.json") as f:
            self.class_labels = json.load(f) 
        
        self.images :list = [] 
        self.labels :list = [] 
        
        for class_name in os.listdir( os.path.join(self.root_dir , 'val.X')):
            
            class_dir = os.path.join( self.root_dir , class_name)
            
            label = self.class_labels[class_name]
            
            for image_name in os.listdir( class_dir):
                
                image_path = os.path.join( class_dir , image_name)
                
                self.images.append( image_path)
                self.labels.append(label)
    
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx ):
        image_path = self.images[idx]
        label = self.labels[idx]
        
        image = Image.open(image_path).convert('RGB')
        
        return image , label 
        
        


In [None]:
class QuantizeFunction(tag.Function):

    @staticmethod
    def forward(ctx, tensor, qp=0.5, normalize=True):
        ctx.qp = qp
        ctx.normalize = normalize
        if normalize == True:
            output = qp*torch.round(tensor/qp)
        else:
            output = torch.round(tensor/qp)
        return output

    @staticmethod
    def backward(ctx, grad_output):
        if ctx.normalize == True:
            grad_input = grad_output.clone()
        else:
            grad_input = grad_output.clone()/ctx.qp
        return grad_input, None, None

class QuantizeLayer(torch.nn.Module):
    """
        Custom defined.
    """
    def __init__(self, qp=0.5, normalize=True):
        super(QuantizeLayer, self).__init__()
        self.qp = qp
        self.normalize = normalize

    def forward(self, input):
        # print("Quantizer used ")
        return QuantizeFunction.apply(input, self.qp, self.normalize)

Helper Functions 

In [None]:
def accuracy(net, dataloader):
  net.to(device)
  net.eval()
  correct = 0
  total = 0
  with torch.no_grad():
      for batch in dataloader:
          images, labels = batch[0].to(device), batch[1].to(device)
          outputs = net(images)
          _, predicted = torch.max(outputs.data, 1)
          total += labels.size(0)
          correct += (predicted == labels).sum().item()
  return correct/total


def smooth(x, size):
  return np.convolve(x, np.ones(size)/size, mode='valid')

Training Function 

In [None]:

def Train(net, dataloader, testLoader=None ,  epochs=1, start_epoch=0, lr=0.01, momentum=0.9, decay=0.0005,
          verbose=1, print_every=100, state=None, schedule={}, checkpoint_path=None):

  net.to(device)
  net.train()

# To store the losses for plotting purpose
  losses = []

    
  task_criterion = nn.CrossEntropyLoss()  # 100 class classification ( 1-hot-encoding )

# Stochastic Gradient Descend
  optimizer = optim.SGD(net.parameters(), lr=lr, momentum=momentum, weight_decay=decay)

  # Load previous training state (Not Required for this model )
  if state:
      net.load_state_dict(state['net'])
      optimizer.load_state_dict(state['optimizer'])
      start_epoch = state['epoch']
      losses = state['losses']

  # Fast forward lr schedule through already trained epochs
  for epoch in range(start_epoch):
    if epoch in schedule:
      print ("Learning rate: %f"% schedule[epoch])
      for g in optimizer.param_groups:
        g['lr'] = schedule[epoch]

# TRAINING START
  for epoch in range(start_epoch, epochs):
    sum_loss = 0.0

    # Update learning rate when scheduled
    if epoch in schedule:
      print ("Learning rate: %f"% schedule[epoch])
      for g in optimizer.param_groups:
        g['lr'] = schedule[epoch]

    for i, batch in enumerate(dataloader, 0):
       
        inputs, labels = batch[0].to(device), (torch.Tensor(batch[1])).to(device)

        optimizer.zero_grad() #1

        outputs = net(inputs)

        # TASK LOSS
        # Approach: Classification using categorical cross entropy.
        task_loss = task_criterion(outputs, labels)

        task_loss.backward()

        optimizer.step()


        losses.append(task_loss.item())
        sum_loss += task_loss.item()

        if i % print_every == print_every-1:    # print every 10 mini-batches
            if verbose:
              print('[%d, %5d] loss: %.5f' % (epoch, i + 1, sum_loss / print_every))
            sum_loss = 0.0

    print("EPOCH ", epoch)
    print(" Training Accuracy is: " , accuracy(net ,  dataloader) , end= "|")
    
    # print( "Testing Accuracy is: ", accuracy(net , testLoader), end = "\n\n")

    if checkpoint_path: #(Not required for this case)
      state = {'epoch': epoch+1, 'net': net.state_dict(), 'optimizer': optimizer.state_dict(), 'losses': losses}
      torch.save(state, checkpoint_path + 'checkpoint-%d.pkl'%(epoch+1))

  return losses

In [None]:
       

def getData( root_dir = './archive/'  ):
    
    transform_train = transforms.Compose(
        [ transforms.Resize( (128 , 128 )) ,  transforms.ToTensor() , transforms.Normalize( mean = (0.5 , 0.5 , 0.5) , std = (0.5 , 0.5 , 0.5)) , transforms.RandomHorizontalFlip( p=0.20) ]
    )
    transform_test = transforms.Compose(
        [transforms.Resize((128 , 128 )) , transforms.ToTensor() , transforms.Normalize( mean = (0.5 , 0.5 , 0.5 ) , std = (0.5 , 0.5 , 0.5))]
    )
    
    train_data = CustomTrainLoader( data_dir= root_dir  , transform = transform_train)
    # test_data = CustomTestLoader( data_dir= root_dir , transform= transform_test )
    
    train_loader = DataLoader( train_data , batch_size = 128 , shuffle = True )
    # test_loader = DataLoader( test_data , batch_size = 128 , shuffle= False )
    
    # train_loader = DataLoader( CustomTrainLoader( data_dir = root_dir , transform = transform )  , batch_size = 128 , shuffle = True )
    # test_loader = DataLoader( CustomTestLoader(data_dir = root_dir , transform = transform) , batch_size = 128 , shuffle = False )
    # return 0 
    return {'train': train_loader , 'test': None }
    

In [None]:
class Reshape(nn.Module):
    def __init__(self, *args):
        super().__init__()
        self.shape = args

    def forward(self, x):
        return x.view(self.shape)

class VAE(nn.Module):
    def __init__(self , qp= None , embed_dim = 200 ):
        super().__init__()
        self.qp = qp
        self.embed_dim = embed_dim

        self.encoder = nn.Sequential(
                nn.Conv2d(3, 32, stride=2, kernel_size=3, bias=False, padding=1),
                nn.BatchNorm2d(32),
                nn.LeakyReLU(0.1, inplace=True),
                nn.Dropout2d(0.25),
                #
                nn.Conv2d(32, 64, stride=2, kernel_size=3, bias=False, padding=1),
                nn.BatchNorm2d(64),
                nn.LeakyReLU(0.1, inplace=True),
                nn.Dropout2d(0.25),
                #
                nn.Conv2d(64, 64, stride=2, kernel_size=3, bias=False, padding=1),
                nn.BatchNorm2d(64),
                nn.LeakyReLU(0.1, inplace=True),
                nn.Dropout2d(0.25),
                #
                nn.Conv2d(64, 64, stride=2, kernel_size=3, bias=False, padding=1),
                nn.BatchNorm2d(64),
                nn.LeakyReLU(0.1, inplace=True),
                nn.Dropout2d(0.25),
                #
                nn.Flatten(),
        )

        # The Latent Vector dimension is set as 200 can be reduced as per needs.
        self.z_mean = torch.nn.Linear(2048*2, self.embed_dim)
        self.z_log_var = torch.nn.Linear(2048*2, self.embed_dim)

        if qp is not None: 
            self.quantize = QuantizeLayer(qp = qp , normalize= True )
        else: 
            self.quantize = None

        self.decoder = nn.Sequential(
                torch.nn.Linear(self.embed_dim, 2048*2),
                Reshape(-1, 64, 8, 8),
                #
                nn.UpsamplingNearest2d(scale_factor=2),
                nn.Conv2d(64, 64, stride=1, kernel_size=3, padding=1),
                nn.BatchNorm2d(64),
                nn.LeakyReLU(0.1, inplace=True),
                nn.Dropout2d(0.25),
                #
                nn.UpsamplingNearest2d(scale_factor=2),
                nn.Conv2d(64, 64, stride=1, kernel_size=3, padding=1),
                nn.BatchNorm2d(64),
                nn.LeakyReLU(0.1, inplace=True),
                nn.Dropout2d(0.25),
                #
                nn.UpsamplingNearest2d(scale_factor=2),
                nn.Conv2d(64, 32, stride=1, kernel_size=3, padding=1),
                nn.BatchNorm2d(32),
                nn.LeakyReLU(0.1, inplace=True),
                nn.Dropout2d(0.25),
                #
                nn.UpsamplingNearest2d(scale_factor=2),
                nn.Conv2d(32, 3, stride=1, kernel_size=3, padding=1),
                nn.Sigmoid()
                )


    def encoding_fn(self, x):
        x = self.encoder(x)
        z_mean, z_log_var = self.quantize(self.z_mean(x)), self.quantize(self.z_log_var(x))
        encoded = self.reparameterize(z_mean, z_log_var)
        return encoded


    def reparameterize(self, z_mu, z_log_var):
        eps = torch.randn(z_mu.size(0), z_mu.size(1)).to(z_mu.get_device())
        z = z_mu + eps * torch.exp(z_log_var/2.)
        return z

    def forward(self, x):
        x = self.encoder(x)
        
        if( self.quantize == None):
            z_mean, z_log_var = self.z_mean(x),self.z_log_var(x)
            
        else:
            z_mean, z_log_var = self.quantize(self.z_mean(x)), self.quantize(self.z_log_var(x))
            
        encoded = self.reparameterize(z_mean, z_log_var)
        decoded = self.decoder(encoded)
        return encoded, z_mean, z_log_var, decoded

In [None]:
data = getData(root_dir='./archive/')

Model-> resnet 18 

In [None]:
# resnet_model = torchvision.models.resnet18(pretrianed = True)

resnet_model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)


Using cache found in /home/vihan/.cache/torch/hub/pytorch_vision_v0.10.0


In [None]:
# No of out_features = 100 
# print(resnet_model)
resnet_model.fc = nn.Linear( in_features= resnet_model.fc.in_features , out_features=100 , bias = False)

Variational Autoencoder 

In [None]:
vae_model = VAE( qp = None , embed_dim = 200 )

In [None]:
class combined_model( nn.Module ):
    
    def __init__( self , resnet , vae ):
        super().__init__() 
        
        self.resnet = resnet 
        self.vae = vae 
    
    def forward( self , x ):
        return self.resnet( self.vae(x))

In [None]:
model = combined_model( resnet_model , vae_model )

In [None]:
losses= Train( resnet_model , data['train'] , epochs = 7 , schedule= {0:0.01 , 3:0.001 })

Learning rate: 0.010000
EPOCH  0
 Training Accuracy is:  0.9571446063425982|EPOCH  1
 Training Accuracy is:  0.0|EPOCH  2
 Training Accuracy is:  0.0|Learning rate: 0.001000
EPOCH  3
 Training Accuracy is:  0.0|EPOCH  4
 Training Accuracy is:  0.0|EPOCH  5
 Training Accuracy is:  0.0|EPOCH  6
 Training Accuracy is:  0.0|

In [None]:
data['train'].__len__() 

64

In [None]:
for i, batch in enumerate( data['train'] , 0 ):
    # print( type( batch[0]))
    # print(  type( batch[1]))
    print( batch[0].shape )
    print( len(batch[1]))
    
    break
    

torch.Size([128, 3, 128, 128])
128
