In [21]:
# ------------ RNN -------------------------------------------
# ------------------
# ------------------
# ------------------
# ------------------
# ------------------
# ------------------
# ------------------

In [2]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
# Imports
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.utils.data as DataLoader
import torchvision.datasets as datase
import torchvision.transforms as transforms

In [5]:
# Create Fully Connected Network
class NN(nn.Module):
    def __init__(self, input_size, num_classes):
        super(NN, self).__init__()
        self.fc1 = nn.Linear(input_size, 50)
        self.fc2 = nn.Linear(50, num_classes)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [36]:
# Set Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [37]:
# Hyperparamteres
input_size  = 784
num_classes = 10
learning_rate = .001
batch_size = 64
epoch_size = 1

In [202]:
# Load Data
train_set = datasets.MNIST(root = 'dataset/', train = True, transform = transforms.ToTensor(), download = True)
train_loader = DataLoader(dataset = train_set, batch_size = batch_size, shuffle = True)

test_set = datasets.MNIST(root = 'dataset/', train = False, transform = transforms.ToTensor(), download = True)
test_loader = DataLoader(dataset = test_set, batch_size = batch_size, shuffle = True)

In [39]:
# Initialize Network
model = NN(input_size = input_size, num_classes = num_classes).to(device)

In [40]:
# Loss and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = learning_rate)

In [41]:
# Train Network

for epoch in range(num_epochs):
    for batch_ix , (data, targets) in enumerate(train_loader):
        
        # Get data to cuda if possible
        data = data.to(device = device)
        targets = targets.to(device = device)
        
        # Get to correct shape
        data = data.reshape(data.shape[0], -1)
        
        # Forward
        scores = model(data)
        loss = criterion(scores, targets)
        
        # Backward    
        optimizer.zero_grad()
        loss.backward()
        
        # Gradient Descent
        optimizer.step()

In [42]:
# Check accuracy on training and test how good our model is
def check_accuracy(loader, model):
    num_correct = 0
    num_samples = 0
    model.eval()
    
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device = device)
            y = y.to(device = device)
            x = x.reshape(x.shape[0], -1)
            
            scores = model(x)
            _, predictions = scores.max(1)
            num_correct += (predictions == y).sum()
            num_samples += predictions.size(0)
            
        print(f'Got{num_correct}/ {num_samples} with accuracy {float(num_correct) / float(num_samples) * 100 : .2f}')

check_accuracy(train_loader, model)
check_accuracy(test_loader, model)

Got57108/ 60000 with accuracy  95.18
Got9469/ 10000 with accuracy  94.69


In [52]:
input_size = 28
sequence_length = 28
num_layers = 2
hidden_size = 256
num_classes = 10
learning_rate = .0001
batch_size = 64
num_epochs = 2

In [53]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first = True)
        self.fc  = nn.Linear(hidden_size * sequence_length, num_classes) 
        
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        out, _ = self.rnn(x, h0)
        out    = out.reshape(out.shape[0], -1)
        out = self.fc(out)
        return out

In [56]:
class GRU(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(GRU, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first = True)
        self.fc  = nn.Linear(hidden_size * sequence_length, num_classes) 
        
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        out, _ = self.gru(x, h0)
        out    = out.reshape(out.shape[0], -1)
        out = self.fc(out)
        return out

In [73]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first = True)
        self.fc  = nn.Linear(hidden_size, num_classes) 
        
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        out, _ = self.lstm(x, (h0, c0))
        # out    = out.reshape(out.shape[0], -1)
        out = self.fc(out[:,-1,:])
        return out

In [76]:
class BiLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(BiLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.blstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first = True, bidirectional = True)
        self.fc  = nn.Linear(hidden_size * 2, num_classes) 
        
    def forward(self, x):
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)
        
        out, _ = self.blstm(x, (h0, c0))
        # out    = out.reshape(out.shape[0], -1)
        out = self.fc(out[:,-1,:])
        return out

In [77]:
model = BiLSTM(input_size, hidden_size, num_layers, num_classes).to(device)

In [78]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = learning_rate)

for epoch in range(num_epochs):
    for batch_ix , (data, targets) in enumerate(train_loader):
        
        # Get data to cuda if possible
        data = data.to(device = device).squeeze(1)
        targets = targets.to(device = device)
        
        # Forward
        scores = model(data)
        loss = criterion(scores, targets)
        
        # Backward    
        optimizer.zero_grad()
        loss.backward()
        
        # Gradient Descent
        optimizer.step()

def check_accuracy(loader, model):
    num_correct = 0
    num_samples = 0
    model.eval()
    
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device = device).squeeze(1)
            y = y.to(device = device)
            
            scores = model(x)
            _, predictions = scores.max(1)
            num_correct += (predictions == y).sum()
            num_samples += predictions.size(0)
            
        print(f'Got{num_correct}/ {num_samples} with accuracy {float(num_correct) / float(num_samples) * 100 : .2f}')
        
check_accuracy(train_loader, model)
check_accuracy(test_loader, model)

Got57114/ 60000 with accuracy  95.19
Got9525/ 10000 with accuracy  95.25


In [22]:
# Seq2Seq Model
# ------------------
# ------------------
# ------------------
# ------------------
# ------------------
# ------------------
# ------------------

In [126]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.datasets import Multi30k
from torchtext.legacy.data import Field, BucketIterator
import numpy as np
import spacy
import random
from torch.utils.tensorboard import SummaryWriter
import en_core_web_sm
import de_core_news_sm
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

In [127]:
spacy_ger = de_core_news_sm .load()
spacy_eng = en_core_web_sm .load()

In [128]:
def tokenizer_ger(text):
    return [tok.text for tok in spacy_ger.tokenizer(text)]

In [129]:
def tokenizer_eng(text):
    return [tok.text for tok in spacy_eng.tokenizer(text)]

In [130]:
german = Field(tokenize = tokenizer_ger, lower = True, init_token = '<sos>' , eos_token = '<eos>')

In [131]:
english = Field(tokenize = tokenizer_eng, lower = True, init_token = '<sos>' , eos_token = '<eos>')

In [132]:
train_iter, valid_iter, test_iter = Multi30k(split=('train', 'valid','test'))

In [133]:
german.build_vocab(train_data, max_size = 10000, min_freq = 2)

In [134]:
english.build_vocab(train_data, max_size = 10000, min_freq = 2)

In [135]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, p):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers  = num_layers
        
        self.dropout = nn.Dropout(p)
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout = p)
        
    def forward(self, x):
        # x shape : (seq_length, N)
        
        embedding = self.dropout(self.embedding(x))
        # embedding shape : (seq_length, N, embedding_size)         
        
        outputs, (hidden, cell) = self.rnn(embedding)
        
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, output_size, num_layers, p):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.dropout = nn.Dropout(p)
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout = p)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x, hidden, cell):
        # shape of x : (N) but we want (1, N)
        x = x.unsqueeze(0)
        
        embedding = nn.Embedding(self.embedding(x))
        # embedding shape : (1, N, embedding_size)
        
        outputs, (hidden, cell) = self.rnn(embedding, (hidden, cell))
        # shape of outputs : (1, N, hidden_size)
        
        predictions = self.fc(outputs)
        # shape of predictions : (1, N, length_of_vocab)
        
        predictions = predictions.squeeze(0)
        
        return predctions, hidden, cell

class Seq2Seq(nn.Module):
        def __init__(self, encoder, decoder):
            super(Seq2Seq, self).__init__()
            self.encoder = encoder
            self.decoder = decoder
        
        def forward(self, source, target, teacher_force_ratio = 0.5):
            batch_size = source.shape[1]
            target_len = target.shape[0]
            target_vocab_size = len(english.vocab)
            
            outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(device)
            
            hidden, cell = self.encoder(source)
            
            # Grab start token 
            x = target[0]
            
            for t in range(1, target_len):
                output, hidden, cell = self.decoder(x, hidden, cell)
                
                outputs[t] = output
                
                
                # output shape : (N, english_vocab_size)
                best_guess = output.argmax(1)
                
                x = target[t] if random.random() < teacher_force_ratio else best_guess
            
            return ouputs

In [136]:
# Training Hyperparamters
num_epochs = 20
learning_rate = .001
batch_size = 64

# Model hyperparameters
load_model = False
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_size_encoder = len(german.vocab)
input_size_decoder = len(english.vocab)
output_size = len(english.vocab)
encoder_embedding_size = 300
decoder_embedding_size = 300
hidden_size = 1024
num_layers = 2
enc_dropout = 0.5
dec_dropout = 0.5

# Tensorboard
writer = SummaryWriter(f'runs/Loss_plot')
step = 0

# train_iterator, validation_iterator, test_iterator = BucketIterator.splits((train_data, validation_data, test_data), 
#                                                                             batch_size = batch_size,
#                                                                             sort_within_batch = True,
#                                                                             sort_key = lambda x: len(x.src),
#                                                                             device = device)

In [137]:
def collate_batch(batch):
    label_list, text_list = [], []
    for (_label, _text) in batch:
        label_list.append(label_transform(_label))
        processed_text = torch.tensor(text_transform(_text))
        text_list.append(processed_text)
    return torch.tensor(label_list), pad_sequence(text_list, padding_value=3.0)

In [142]:
train_list = list(train_iter)
def batch_sampler():
    indices = [(i, len(tokenizer(s[1]))) for i, s in enumerate(train_list)]
    random.shuffle(indices)
    pooled_indices = []
    # create pool of indices with similar lengths 
    for i in range(0, len(indices), batch_size * 100):
        pooled_indices.extend(sorted(indices[i:i + batch_size * 100], key=lambda x: x[1]))

    pooled_indices = [x[0] for x in pooled_indices]

    # yield indices for current batch
    for i in range(0, len(pooled_indices), batch_size):
        yield pooled_indices[i:i + batch_size]

In [143]:
bucket_dataloader = DataLoader(list(train_iter), batch_sampler=batch_sampler(),
                               collate_fn=collate_batch)

In [144]:
encoder_net = Encoder(input_size_encoder, encoder_embedding_size, hidden_size, num_layers, enc_dropout).to(device)
decoder_net = Decoder(input_size_decoder, decoder_embedding_size, hidden_size, output_size, num_layers, dec_dropout).to(device)
model = Seq2Seq(encoder_net, decoder_net).to(device)

In [145]:
pad_idx = english.vocab.stoi['<pad>']
criterion = nn.CrossEntropyLoss(ignore_index = pad_idx)
optimizer = optim.Adam(model.parameters(), lr = learning_rate)

for epoch in range(num_epochs):
    print(f'Epoch[{epoch} / {num_epochs}]')
    checkpoint = {'state_dict' : model.state_dict(), 'optimizer' : optimizer.state_dict()}
    
    for batch_idx, batch in enumerate(bucket_dataloader):
        inp_data = batch.src.to(device)
        target = batch.trg.to(device)
        
        output = model(inp_data, target)
        # output shape : (trg_len, batch_size, output_dim)
        
        output = output[1:].reshape(-1, output.shape[2])
        target = target[1:].reshape(-1)
        
        optimizer.zero_grad()
        loss = criterion(output, target)
        
        loss.backward()
        
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm = 1)
        optimizer.step()
        
        writer.add_scalar('Training Loss', loss, global_step = step)
        step+=1
        

Epoch[0 / 20]
Epoch[1 / 20]
Epoch[2 / 20]
Epoch[3 / 20]
Epoch[4 / 20]
Epoch[5 / 20]
Epoch[6 / 20]
Epoch[7 / 20]
Epoch[8 / 20]
Epoch[9 / 20]
Epoch[10 / 20]
Epoch[11 / 20]
Epoch[12 / 20]
Epoch[13 / 20]
Epoch[14 / 20]
Epoch[15 / 20]
Epoch[16 / 20]
Epoch[17 / 20]
Epoch[18 / 20]
Epoch[19 / 20]


In [20]:
# Vision Transformer
# ------------------
# ------------------
# ------------------
# ------------------
# ------------------
# ------------------
# ------------------

In [1]:
import torch
import torch.nn as nn

In [2]:
class PatchEmbed(nn.Module):
    ''' Split images into patches and then embed them
    
    Parameters
    ----------
    
    img_size : int
        Size of the image (it is a square)
        
    patch_size : int
        Size of the patch (it is a square)
        
    in_chans : int
        Number of input channels
        
    embed_dim : int
        The embedding dimension
    
    
    Attributes
    ----------
    
    num_patches : int
        Number of patches inside of our image
    
    proj : nn.Conv2d
        Convolutional layer that does both the splitting into patches and their embedding
    '''
    def __init__(self, img_size, patch_size, in_chans = 3, embed_dim = 768):
        super().__init__()
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        
        self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size = patch_size, stride = patch_size)
    
    def forward(self, x):
        '''Run forward pass
        
        Parameters
        ----------
        
        x : torch.Tensor
            Shape '(n_samples, in_chans, img_size, img_size)'
        
        Returns
        -------
        
        torch.Tensor
            Shape '(n_samples, n_patches, embed_dim)'
        
        '''
        
        x = self.proj(x) 
        # (n_samples, embed_dim, num_patches ** 0.5, num_patches ** 0.5)         

        x = x.flatten(2)
        # (n_samples, embed_dim, num_patches)
        
        x = x.transpose(1, 2)
        # (n_samples, num_patches, embed_dim)

In [4]:
class Attention(nn.Module):
    '''Attention mechanism
    
    Parameters
    ----------
    
    dim : int
        The input and output dimension of per token feature.
    
    n_heads : int
        Number of attention heads
    
    qkv_bias : bool
        If True then we include bias to the query, key and value operations
    
    attn_p : float
        Dropout probability applied to the query, key and value tensors
    
    proj_p : float
        Dropout probability applied to the output tensor
        
    Attributes 
    ----------
    
    scale : float
        Normalizing constant for the dot product
    
    qkv : nn.Linear 
        Linear projection for the query, key and values
    
    proj : nn.Linear
        Linear mapping that takes in the concatenated output of all attention heads and maps it into a new space
        
    attn_drop, proj_drop : nn.Dropout
        Dropout layers
    '''
    
    def __init__(self, dim, n_heads = 12, qkv_bias = True, attn_p = 0, proj_p = 0):
        super().__init__()
        self.n_heads = n_heads
        self.dim = dim
        self.head_dim = dim // n_heads
        self.scale = self.head_dim ** -0.5
        
        self.qkv = nn.Linear(dim, dim * 3, bias = qkv_bias)
        self.attn_drop = nn.Dropout(attn_p)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_p)
        
    def forward(self, x):
        '''Run forward pass
        
        Parameters
        ----------
        
        x : torch.Tensor
            Shape '(n_samples, n_patches + 1, dim)'
            
        Returns
        -------
        
        x : torch.Tensor
            Shape '(n_samples, n_patches + 1, dim)'
        '''
        
        n_samples, n_tokens, dim = x.shape
        
        if (dim != self.dim):
            raise ValueError
        
        qkv = self.qkv(x)
        # (n_samples, n_patches + 1, 3 * dim)
        
        qkv = qkv.reshape(n_samples, n_tokens, 3, self.n_heads, self.head_dim)
        # (n_samples, n_patches + 1, 3, n_heads, head_dim)
        
        qkv = qkv.permute(2, 0, 3, 1, 4)
        # (3, n_samples, n_heads, n_patches + 1, head_dim)
        
        q, k, v = qkv[0], qkv[1], qkv[2]
        
        k_t = k.transpose(-2, -1) 
        #(n_samples, n_heads, head_dim, n_patches + 1)
        
        dp = (q @ k_t) * self.scale
        #(n_samples, n_heads, n_patches + 1, n_patches + 1)
        
        attn = dp.softmax(dim = -1)
        #(n_samples, n_heads, n_patches + 1, n_patches + 1)
        
        weighted_avg = attn @ v
        #(n_samples, n_heads, n_patches + 1, head_dim)
        
        weighted_avg = weighted_avg.transpose(1, 2)
        #(n_samples, n_patches + 1, n_heads, head_dim)
        
        weighted_avg = weighted_avg.flatten(2)
        #(n_samples, n_patches + 1, dim)
        
        x = self.proj(weighted_avg)
        #(n_samples, n_patches + 1, dim)
        
        x = self.proj_drop(x)
        #(n_samples, n_patches + 1, dim)
        
        return x

In [8]:
class MLP(nn.Module):
    '''Multilayer Perceptron
    
    Parameters
    ----------
    
    in_features : int
        Number of input features
    
    hidden_features : int
        Number of nodes in the hidden layer
        
    out_features : int
        Number of output features
        
    p : float
        Dropout probabilty
        
        
    Attribute
    ---------
    
    fc : nn.Linear
         The first linear layer
    
    act : nn.GELU
        GELU activation function
        
    fc2 : nn.Linear 
        The second linear layer
        
    drop : nn.Dropout
        Dropout layer
    '''
    
    def __init__(self, in_features, hidden_features, out_features, p=0.):
        super().__init__()
        self.fc1 = nn.Linear(in_features, hidden_features)
        self.act = nn.GELU()
        self.fc2 = nn.Linear(hidden_features, out_features)
        self.drop = nn.Dropout(p)
    
    def forward(self, x):
        '''Run forward pass
        
        Parameters
        ----------
        
        x : torch.Tensor
            Shape `(n_samples, n_patches+1, in_features)
        
        Returns
        -------
        
        x : torch.Tensor
            Shape `(n_samples, n_patches+1, out_features)
        '''
        
        x = self.fc1(x)  # (n_samples, n_patches+1, hidden_features)
        x = self.act(x)  # (n_samples, n_patches+1, hidden_features)
        x = self.drop(x) # (n_samples, n_patches+1, hidden_features)
        x = self.fc2(x)  # (n_samples, n_patches+1, out_features)
        x = self.drop(x) # (n_samples, n_patches+1, out_features)
        
        return x

In [16]:
class Block(nn.Module):
    '''Transformer Block
    
    Parameters
    ----------
    
    dim : int
        Embedding dimension
    
    n_heads : int
        Number of attention heads
        
    mlp_ratio : float
        Determines the hidden dimension of the `MLP' module with respect to `dim'
        
    qkv_bias : bool
        If True then we include bias to query, key and value projections.
        
    p, attn_p : float
        Dropout probabilty
        
        
    Attributes
    ---------
    
    norm1, norm2 : LayerNorm
         Layer Normalization
    
    attn : Attention
         Attention module
        
    mlp : MLP 
         MLP module
    '''
    
    def __init__(self, dim, n_heads, mlp_rtio = 4.0, qkv_bias = True, p = 0., attn_p = 0.):
        super().__init__()
        self.norm1 = nn.LayerNorm(dim, eps = 1e-6)
        self.attn  = Attention(dim, n_heads = n_heads, qkv_bias = qkv_bias, attn_p = attn_p, proj_p = p)
        self.norm2 = nn.LayerNorm(dim, eps = 1e-6)
        hidden_features = int(dim * mlp_ratio)
        self.mlp = MLP(in_features = dim, hidden_features = hidden_features, out_features = dim)
        
    def forward(self, x):
        '''
        Run forward pass
        
        Parameters
        ----------
        
        x : torch.Tensor
            Shape `(n_samples, n_patches+1, dim)
        
        Returns
        -------
        
        x : torch.Tensor
            Shape `(n_samples, n_patches+1, dim) 
        '''
        
        x = x + self.attn(self.norm1(x))
        x = x + self.mlp(self.norm2(x))
        
        return x
    

In [17]:
class VisionTransformer(nn.Module):
    '''Simplified implementation of Vision Transformer
    
    Parameters
    ----------
    
    img_size : int
        Both height and width of an image (it's a square)
        
    patch_size : int
        Both height and width of a patch (it's a square)
        
    in_chans : int
        Number of input channels
        
    n_classes : int
        Number of classes
        
    embed_dim : int
        Dimensionality of the token/patch embeddings
        
    depth : int
        Number of blocks
    
    n_heads : int
        Number of attention heads
    
    mlp_ratio : float
        Determines the hidden dimension of the `MLP' module.
    
    qkv_bias : bool
        If True then we include bias to query, key and value projections.
        
    p, attn_p : float
        Dropout probabilty
        
        
    Attributes
    ---------
    
    patch_embed : PatchEmbed
        Instance of `PatchEmbed' Layer
        
    cls_token   : nn.Parameter
         Learnable parameter that will represent the first token in the sequence. It has `embed_dim' elements.
        
    pos_emb     : nn.Parameter
         Positional embedding of the cls token + all the patches.
         It has `(n_patches + 1) * embed_dim' elements
    
    pos_drop    : nn.Dropout
        Dropout layer
    
    blocks      : nn.ModuleList
        List of `Block' modules.
    
    norm        : nn.LayerNorm
        Layer Normalization
    '''
    
    def __init__(self,
                img_size = 384,
                patch_size = 16,
                in_chans = 3,
                n_classes = 1000,
                embed_dim = 786,
                depth = 12,
                n_heads = 12,
                mlp_ratio = 4,
                qkv_bias = True,
                p = 0.,
                attn_p = 0.
                ):
        super().__init__()
        
        self.patch_embed = PatchEmbed(img_size = img_size, patch_size = patch_size, in_chans = in_chans, embed_dim = embed_dim)
        self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
        self.pos_embed = nn.Parameter(torch.zeros(1, 1 + self.patch_embed.n_patches, embed_dim))
        self.pos_drop = nn.Dropout(p=p)
        self.blocks = nn.ModuleList(
            [
                Block
                (
                    dim = embed_dim,
                    n_heads = n_heads,
                    mlp_ratio = mlp_ratio,
                    qkv_bias = qkv_bias,
                    p = p,
                    attn_p = attn_p                    
                )
                
                for _ in range(depth)
            ]
        )
        
        self.norm = nn.LayerNorm(embed_dim, eps = 1e-6)
        self.out = nn.Linear(embed_dim, n_classes)
        
    def forward(self, x):
        '''
        Run forward pass
        
        Parameters
        ----------
        
        x : torch.Tensor
            Shape `(n_samples, in_chans, img_size, img_size)'
        
        Returns
        -------
        
        logits : torch.Tensor
            Logits over all the classes - `(n_samples, n_classes)' 
        '''
        
        n_samples = x.shape[0]
        x = self.patch_embed(x)
        
        cls_token = self.cls_token.expand(n_samples, -1, -1) # (n_samples, 1, embed_dim)
        x = torch.cat((cls_token, x), dim = 1) # (n_samples, 1 + n_patches, embed_dim)
        
        x = x + self.pos_embed # (n_samples, 1 + n_patches, embed_dim)
        x = self.pos_drop(x)
        
        for block in self.blocks :
            x = block(x)
            
        x = self.norm(x)
        cls_token_final = x[:, 0]
        x = self.out(cls_token_final)
        
        return x

In [23]:
# VGG
# ------------------
# ------------------
# ------------------
# ------------------
# ------------------
# ------------------
# ------------------

In [26]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms

In [29]:
VGG16 = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M']
# Then flatten and 4096x4096x1000 linear layers

In [33]:
class VGG_net(nn.Module):
    def __init__(self, in_channels = 3, num_classes = 1000):
        super(VGG_net, self).__init__()
        self.in_channels = in_channels
        self.conv_layers = self.create_conv_layers(VGG16)
        
        self.fcs = nn.Sequential(nn.Linear(512 * 7 * 7, 4096),
                                 nn.ReLU(),
                                 nn.Dropout(p=0.5),
                                 nn.Linear(4096, 4096),
                                 nn.ReLU(),
                                 nn.Dropout(p=0.5),
                                 nn.Linear(4096, num_classes))
    
    def forward(self, x):
        x = self.conv_layers(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fcs(x)
        return x
    
    def create_conv_layers(self, architecture):
        layers = []
        in_channels = self.in_channels
        
        for x in architecture:
            if(type(x) == int):
                out_channels = x
                
                layers += [nn.Conv2d(in_channels = in_channels, out_channels = out_channels, kernel_size = (3, 3), stride = (1, 1), padding = (1, 1)), nn.BatchNorm2d(out_channels), nn.ReLU()]
                in_channels = x
            
            elif x == 'M':
                layers += [nn.MaxPool2d(kernel_size = (2, 2), stride = (2, 2))]
            
        return nn.Sequential(*layers)

In [36]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = VGG_net(in_channels = 3, num_classes = 1000).to(device)
x = torch.rand(1, 3, 224, 224).to(device)
print(model(x).shape)

torch.Size([1, 1000])


In [37]:
# CNN-LSTM Image Captioning
# ------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------

In [38]:
import torch
import torch.nn as nn
import torchvision.models as models

In [40]:
class EncoderCNN(nn.Module):
    def __init__(self, embed_size, train_CNN = False):
        super(EncoderCNN, self).__init__()
        self.train_CNN = train_CNN
        self.inception = models.inception_v3(pretrained = True, aux_logits = False)
        self.inception.fc = nn.Linear(self.inception.fc.in_features, embed_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, images):
        features = self.inception(images)
        
        for name , param in self.inception.named_parameters():
            if "fc.weight" in name or "fc.bias" in name:
                param.requires_grad = True
            else:
                param.requires_grad = train_CNN
        
        return self.dropout(self.relu(features))        

In [41]:
class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        super(DecoderRNN, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm  = nn.LSTM(embed_size, hidden_size, num_layers)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(0.5)
    
    def forward(self, features, captions):
        embeddings = self.dropout(self.embed(captions))
        embeddings = torch.cat((features.unsqueeze(0), embeddings), dim = 0)
        hiddens, _ = self.lstm(embeddings)
        outputs = self.linear(hiddens)
        return outputs


In [44]:
class CNNtoRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        super(CNNtoRNN, self).__init__()
        self.encoderCNN = EncoderCNN(embed_size)
        self.decoderRNN = DecoderRNN(embed_size, hidden_size, vocab_size, num_layers)
    
    def forward(self, images, captions):
        features = self.encoderCNN(images)
        outputs  = self.decoderRNN(features, captions)
        return outputs
    
    def caption_image(self, image, vocabulary, max_length = 50):
        result_caption = []
        
        with torch.no_grad():
            x = self.encoderCNN(image).unsqueeze(0)
            states = None
            
            for _ in range(max_length):
                hiddens, states = self.decoderRNN.lstm(x, states)
                output = self.decoderRNN.linear(hiddens.unsqueeze(0))
                predicted = output.argmax(1)
                
                result_caption.append(predicted.item())
                x = self.decoderRNN.embed(predicted).unsqueeze(0)
                
                if(vocabulary.itos[predicted.item()] == "<EOS>"):
                    break
                    
            return [vocabulary.itos[idx] for idx in result_caption]

In [48]:
# No Training Loop for the architecure yet. Will soon come back to this
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------

In [10]:
# Simple GAN
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------

In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torch.utils.tensorboard import SummaryWriter

In [12]:
class Discriminator(nn.Module):
    def __init__(self, img_dim):
        super().__init__()
        self.disc = nn.Sequential(
                                    nn.Linear(img_dim, 128),
                                    nn.LeakyReLU(0.1),
                                    nn.Linear(128, 1),
                                    nn.Sigmoid()
                                 )
    def forward(self, x):
            return self.disc(x)

In [13]:
class Generator(nn.Module):
    def __init__(self, z_dim, img_dim):
        super().__init__()
        self.gen = nn.Sequential(
                                    nn.Linear(z_dim, 256),
                                    nn.LeakyReLU(0.1),
                                    nn.Linear(256, img_dim),
                                    nn.Tanh()
                                )
        
    def forward(self, x):
            return self.gen(x)

In [14]:
device = "cuda" if torch.cuda.is_available() else "cpu"
lr = 3e-4
z_dim = 64
image_dim = 784
batch_size = 32
num_epochs = 50

disc = Discriminator(image_dim).to(device)
gen =  Generator(z_dim, image_dim).to(device)

In [15]:
fixed_noise = torch.randn((batch_size, z_dim)).to(device)
transforms  = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,) , (0.5,))]) 

In [16]:
dataset  = datasets.MNIST(root="dataset/" , transform = transforms, download = True)
loader   = DataLoader(dataset, batch_size = batch_size, shuffle = True)
opt_disc = optim.Adam(disc.parameters(), lr = lr)
opt_gen  = optim.Adam(gen.parameters(), lr = lr)
criterion = nn.BCELoss()
writer_fake = SummaryWriter(f"runs/GAN_MNIST/fake")
writer_real = SummaryWriter(f"runs/GAN_MNIST/real")
step = 0

In [18]:
for epoch in range(num_epochs):
    for batch_idx , (real , _) in enumerate(loader):
        real = real.view(-1, 784).to(device)
        batch_size = real.shape[0]
        
        # Train Discriminator : max log(D(real)) +  log(1 - D(G(z)))
        noise = torch.randn(batch_size, z_dim).to(device)
        fake  = gen(noise)
        disc_real  = disc(real).view(-1)
        disc_loss_real = criterion(disc_real , torch.ones_like(disc_real)) #  log(D(real)) {min of negative of this}
        
        disc_fake  = disc(fake).view(-1)
        disc_loss_fake = criterion(disc_fake, torch.zeros_like(disc_fake)) #  log((1 - D(G(z)))) {min of negative of this}
        
        loss_disc = (disc_loss_real + disc_loss_fake) / 2
        opt_disc.zero_grad()
        loss_disc.backward(retain_graph = True)
        opt_disc.step()
        
        # Train Generator : min log(1 - D(G(z)))  <-->  max log(D(G(z)))
        output = disc(fake).view(-1)
        gen_loss = criterion(output , torch.ones_like(output))
        
        opt_gen.zero_grad()
        gen_loss.backward()
        opt_gen.step()
        
        if batch_idx == 0:
            print(
                    f"Epoch[{epoch} / {num_epochs}] \n"
                    f"Loss D : {loss_disc : .4f} , Loss G : {gen_loss : .4f}" 
                 )
            
            with torch.no_grad():
                fake = gen(fixed_noise).reshape(-1, 1, 28, 28)
                data = real.reshape(-1, 1, 28, 28)
                img_grid_fake = torchvision.utils.make_grid(fake, normalize = True)
                img_grid_real = torchvision.utils.make_grid(data, normalize = True)
                
                writer_fake.add_image("Mnist Fake Images", img_grid_fake, global_step = step)
                writer_real.add_image("Mnist Real Images", img_grid_real, global_step = step)
                
                step += 1

Epoch[0 / 50] \ Loss D :  0.6131 , Loss G :  0.7316
Epoch[1 / 50] \ Loss D :  0.6359 , Loss G :  0.8937
Epoch[2 / 50] \ Loss D :  0.3602 , Loss G :  1.3473
Epoch[3 / 50] \ Loss D :  0.9751 , Loss G :  0.6499
Epoch[4 / 50] \ Loss D :  0.5774 , Loss G :  0.9267
Epoch[5 / 50] \ Loss D :  0.6263 , Loss G :  0.9647
Epoch[6 / 50] \ Loss D :  0.6542 , Loss G :  0.9407
Epoch[7 / 50] \ Loss D :  0.4121 , Loss G :  1.5104
Epoch[8 / 50] \ Loss D :  0.5918 , Loss G :  1.4942
Epoch[9 / 50] \ Loss D :  0.8102 , Loss G :  0.7931
Epoch[10 / 50] \ Loss D :  0.5829 , Loss G :  0.8360
Epoch[11 / 50] \ Loss D :  0.5085 , Loss G :  1.2615
Epoch[12 / 50] \ Loss D :  0.7817 , Loss G :  0.9677
Epoch[13 / 50] \ Loss D :  0.7596 , Loss G :  0.9585
Epoch[14 / 50] \ Loss D :  0.4953 , Loss G :  1.0973
Epoch[15 / 50] \ Loss D :  0.8616 , Loss G :  0.7013
Epoch[16 / 50] \ Loss D :  0.5627 , Loss G :  1.1396
Epoch[17 / 50] \ Loss D :  0.8596 , Loss G :  0.8698
Epoch[18 / 50] \ Loss D :  0.4357 , Loss G :  1.1929
Epo

In [1]:
# DCGAN
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------

In [2]:
import torch
import torch.nn as nn

In [4]:
class Discriminator(nn.Module):
    def __init__(self, channels_img, features_d):
        # input shape : N x channels_img x 64 x 64
        
        super(Discriminator, self).__init__()
        self.disc = nn.Sequential(
                                     nn.Conv2d(channels_img, features_d, kernel_size = 4, stride = 2, padding = 1), # 32 x 32 
                                     nn.LeakyReLU(0.2),
                                     self._block(features_d, features_d * 2, 4, 2, 1),     # 16 x 16
                                     self._block(features_d * 2, features_d * 4, 4, 2, 1), # 8 x 8
                                     self._block(features_d * 4, features_d * 8, 4, 2, 1),  # 4 x 4
                                     nn.Conv2d(features_d * 8, 1, kernel_size = 4, stride = 2, padding = 0), # 1 x 1
                                     nn.Sigmoid()
                                 )
        
    def _block(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
                             nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias = False), 
                             nn.BatchNorm2d(out_channels), 
                             nn.LeakyReLU(0.2)
                            )
    def forward(self, x):
        return self.disc(x)

In [11]:
class Generator(nn.Module):
    def __init__(self, z_dim, channels_img, features_g):
        # input shape : N x z_dim x 1 x 1
        
        '''
        ------------------------------
        nn.ConvTranspose2d output shape ---> s * (n-1) + f - 2p
        '''
        
        super(Generator, self).__init__()
        self.gen = nn.Sequential(
                                     self._block(z_dim, features_g * 16, 4, 1, 0),     # N x features_g * 16 x 4 x 4
                                     self._block(features_g * 16, features_g * 8, 4, 2, 1), # 8 x 8
                                     self._block(features_g * 8, features_g * 4, 4, 2, 1),  # 16 x 16
                                     self._block(features_g * 4, features_g * 2, 4, 2, 1),  # 32 x 32
                                     nn.ConvTranspose2d(features_g * 2, channels_img, kernel_size = 4, stride = 2, padding = 1), # 64 x 64
                                     nn.Tanh()
                                 )
        
    def _block(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
                             nn.ConvTranspose2d(in_channels, out_channels, kernel_size, stride, padding, bias = False), 
                             nn.BatchNorm2d(out_channels), 
                             nn.ReLU()
                            )
    def forward(self, x):
        return self.gen(x)

In [12]:
def initialize_weights(model):
    for m in model.modules():
        if isinstance(m, (nn.Conv2d, nn.ConvTranspose2d, nn.BatchNorm2d)):
            nn.init.normal_(m.weight.data, 0.0, 0.02)

In [13]:
N, in_channels, H, W = 8, 3, 64, 64
z_dim = 100
x = torch.randn((N, in_channels, H, W))
disc = Discriminator(in_channels, 8)
initialize_weights(disc)
assert disc(x).shape == (N, 1, 1, 1)

gen = Generator(z_dim, in_channels, 8)
initialize_weights(gen)
z = torch.randn((N, z_dim, 1, 1))
assert gen(z).shape == (N, in_channels, H, W)

In [14]:
# --------------------Training-------------------------------------------------------------

In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

In [17]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

learning_rate = 2e-4
batch_size = 128
image_size = 64
channels_img = 1
z_dim = 100
num_epochs = 5
features_disc = 64
features_gen  = 64

In [18]:
transforms = transforms.Compose(
    [
        transforms.Resize(image_size),
        transforms.ToTensor(),
        transforms.Normalize([0.5 for _ in range(channels_img)], [0.5 for _ in range(channels_img)])
    ]
)

In [19]:
dataset = datasets.MNIST(root = "dataset/", train = True, transform = transforms, download = True)
loader = DataLoader(dataset, batch_size = batch_size, shuffle = True)
gen = Generator(z_dim, channels_img, features_gen).to(device)
disc = Discriminator(channels_img, features_disc).to(device)
initialize_weights(gen)
initialize_weights(disc)

In [21]:
opt_gen = optim.Adam(gen.parameters(), lr = learning_rate, betas = (0.5, 0.999))
opt_disc = optim.Adam(disc.parameters(), lr = learning_rate, betas = (0.5, 0.999))
criterion = nn.BCELoss()

In [22]:
fixed_noise = torch.rand(32, z_dim, 1, 1).to(device)
writer_fake = SummaryWriter(f"runs/DCGAN_MNIST/fake")
writer_real = SummaryWriter(f"runs/DCGAN_MNIST/real")
step = 0

In [28]:
for epoch in range(num_epochs):
    for batch_idx, (real, _) in enumerate(loader):
        real = real.to(device)
        noise = torch.randn((batch_size, z_dim, 1, 1)).to(device)
        fake = gen(noise)
        
#       Train Discriminator max log(D(x)) + log(1 - D(G(z)))  
        disc_real = disc(real).reshape(-1)
        disc_real_loss = criterion(disc_real, torch.ones_like(disc_real))
        disc_fake = disc(fake).reshape(-1)
        disc_fake_loss = criterion(disc_fake, torch.zeros_like(disc_fake))
        disc_loss = (disc_fake_loss + disc_real_loss) / 2
        
        opt_disc.zero_grad()
        disc_loss.backward(retain_graph = True)
        opt_disc.step()
        
#       Train Generator min log(1 - D(G(z))) <---> max log(D(G(z))) 
        disc_fake = disc(fake).reshape(-1)
        gen_fake_loss = criterion(disc_fake, torch.ones_like(disc_fake))
        
        opt_gen.zero_grad()
        gen_fake_loss.backward()
        opt_gen. step()
        
        if(batch_idx % 100 == 0):
            print(f"Epoch [{epoch} / {num_epochs}] Batch {batch_idx} / {len(loader)} \ Loss D : {disc_loss : .4f} , Loss G : {gen_fake_loss : .4f}")
            
            with torch.no_grad():
                fake = gen(fixed_noise)
                
                img_grid_real = torchvision.utils.make_grid(real[:32] , normalize = True)
                img_grid_fake = torchvision.utils.make_grid(fake[:32] , normalize = True)
                
                writer_real.add_image("Real", img_grid_real, global_step = step)
                writer_fake.add_image("Fake", img_grid_fake, global_step = step)
                
            step += 1

Epoch [0 / 5] Batch 0 / 469 \ Loss D :  0.5612 , Loss G :  0.9401
Epoch [0 / 5] Batch 100 / 469 \ Loss D :  0.0144 , Loss G :  4.1656
Epoch [0 / 5] Batch 200 / 469 \ Loss D :  0.6813 , Loss G :  0.7287
Epoch [0 / 5] Batch 300 / 469 \ Loss D :  0.6353 , Loss G :  0.2488
Epoch [0 / 5] Batch 400 / 469 \ Loss D :  0.4466 , Loss G :  1.2437
Epoch [1 / 5] Batch 0 / 469 \ Loss D :  0.5299 , Loss G :  1.5852
Epoch [1 / 5] Batch 100 / 469 \ Loss D :  0.6916 , Loss G :  0.6671
Epoch [1 / 5] Batch 200 / 469 \ Loss D :  0.8239 , Loss G :  1.3765
Epoch [1 / 5] Batch 300 / 469 \ Loss D :  0.5727 , Loss G :  0.9257
Epoch [1 / 5] Batch 400 / 469 \ Loss D :  0.5700 , Loss G :  0.9649
Epoch [2 / 5] Batch 0 / 469 \ Loss D :  0.5810 , Loss G :  1.1754
Epoch [2 / 5] Batch 100 / 469 \ Loss D :  0.7114 , Loss G :  0.8233
Epoch [2 / 5] Batch 200 / 469 \ Loss D :  0.6026 , Loss G :  1.5578
Epoch [2 / 5] Batch 300 / 469 \ Loss D :  0.5099 , Loss G :  2.0573
Epoch [2 / 5] Batch 400 / 469 \ Loss D :  0.4635 , Los

In [29]:
# ---------------------------WGAN------------------------------------------------------
# Using the same model as DCGAN, just a different training loop
# -------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------

In [40]:
# WGAN without gradient penalty

class Critic(nn.Module):
    def __init__(self, channels_img, features_d):
        # input shape : N x channels_img x 64 x 64
        
        super(Critic, self).__init__()
        self.disc = nn.Sequential(
                                     nn.Conv2d(channels_img, features_d, kernel_size = 4, stride = 2, padding = 1), # 32 x 32 
                                     nn.LeakyReLU(0.2),
                                     self._block(features_d, features_d * 2, 4, 2, 1),     # 16 x 16
                                     self._block(features_d * 2, features_d * 4, 4, 2, 1), # 8 x 8
                                     self._block(features_d * 4, features_d * 8, 4, 2, 1),  # 4 x 4
                                     nn.Conv2d(features_d * 8, 1, kernel_size = 4, stride = 2, padding = 0) # 1 x 1
                                 )
        
    def _block(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
                             nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias = False), 
                             nn.BatchNorm2d(out_channels), 
                             nn.LeakyReLU(0.2)
                            )
    def forward(self, x):
        return self.disc(x)

In [41]:
disc = Critic(channels_img, features_disc).to(device)

In [42]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
learning_rate = 5e-5
batch_size = 64
image_size = 64
channels_img = 1
z_dim = 100
num_epochs = 5
features_disc = 64
features_gen = 64
critic_iterations = 5
weight_clip = 0.01

In [43]:
opt_gen = optim.RMSprop(gen.parameters(), lr = learning_rate)
opt_disc = optim.RMSprop(disc.parameters(), lr = learning_rate)

In [48]:
fixed_noise = torch.rand(32, z_dim, 1, 1).to(device)
writer_fake = SummaryWriter(f"runs/WGAN_MNIST/fake")
writer_real = SummaryWriter(f"runs/WGAN_MNIST/real")
step = 0

In [49]:
# Training WGAN without gradient penalty

for epoch in range(num_epochs):
    for batch_idx, (real, _) in enumerate(loader):
        real = real.to(device)
        
        for _ in range(critic_iterations):
            noise = torch.randn((batch_size, z_dim, 1, 1)).to(device)
            fake = gen(noise)
        
    #       Train Critic max E[critic(x)] - E[critic(gen_fake)]  
            disc_real = disc(real).reshape(-1)
            disc_fake = disc(fake).reshape(-1)
            disc_loss = -(torch.mean(disc_real) - torch.mean(disc_fake))

            opt_disc.zero_grad()
            disc_loss.backward(retain_graph = True)
            opt_disc.step()
            
            for p in disc.parameters():
                p.data.clamp_(-weight_clip, weight_clip)

#       Train Generator min -E[critic(gen_fake)] 
        disc_fake = disc(fake).reshape(-1)
        gen_fake_loss = -torch.mean(disc_fake)

        opt_gen.zero_grad()
        gen_fake_loss.backward()
        opt_gen. step()

        if(batch_idx % 100 == 0):
            print(f"Epoch [{epoch} / {num_epochs}] Batch {batch_idx} / {len(loader)} \ Loss D : {disc_loss : .4f} , Loss G : {gen_fake_loss : .4f}")

            with torch.no_grad():
                fake = gen(fixed_noise)

                img_grid_real = torchvision.utils.make_grid(real[:32] , normalize = True)
                img_grid_fake = torchvision.utils.make_grid(fake[:32] , normalize = True)

                writer_real.add_image("Real", img_grid_real, global_step = step)
                writer_fake.add_image("Fake", img_grid_fake, global_step = step)

            step += 1

Epoch [0 / 5] Batch 0 / 469 \ Loss D : -0.3524 , Loss G :  0.4251
Epoch [0 / 5] Batch 100 / 469 \ Loss D : -0.3282 , Loss G :  0.0815
Epoch [0 / 5] Batch 200 / 469 \ Loss D : -0.3025 , Loss G :  0.3802
Epoch [0 / 5] Batch 300 / 469 \ Loss D : -0.3584 , Loss G :  0.3346
Epoch [0 / 5] Batch 400 / 469 \ Loss D : -0.3593 , Loss G :  0.4416
Epoch [1 / 5] Batch 0 / 469 \ Loss D : -0.3954 , Loss G :  0.1303
Epoch [1 / 5] Batch 100 / 469 \ Loss D : -0.3537 , Loss G :  0.2726
Epoch [1 / 5] Batch 200 / 469 \ Loss D : -0.3252 , Loss G :  0.3859
Epoch [1 / 5] Batch 300 / 469 \ Loss D : -0.4189 , Loss G :  0.0387
Epoch [1 / 5] Batch 400 / 469 \ Loss D : -0.4580 , Loss G :  0.2559
Epoch [2 / 5] Batch 0 / 469 \ Loss D : -0.4156 , Loss G :  0.0529
Epoch [2 / 5] Batch 100 / 469 \ Loss D : -0.4690 , Loss G :  0.0851
Epoch [2 / 5] Batch 200 / 469 \ Loss D : -0.4553 , Loss G :  0.1036
Epoch [2 / 5] Batch 300 / 469 \ Loss D : -0.4039 , Loss G :  0.4420
Epoch [2 / 5] Batch 400 / 469 \ Loss D : -0.5519 , Los

In [57]:
# WGAN with gradient penalty

class Critic(nn.Module):
    def __init__(self, channels_img, features_d):
        # input shape : N x channels_img x 64 x 64
        
        super(Critic, self).__init__()
        self.disc = nn.Sequential(
                                     nn.Conv2d(channels_img, features_d, kernel_size = 4, stride = 2, padding = 1), # 32 x 32 
                                     nn.LeakyReLU(0.2),
                                     self._block(features_d, features_d * 2, 4, 2, 1),     # 16 x 16
                                     self._block(features_d * 2, features_d * 4, 4, 2, 1), # 8 x 8
                                     self._block(features_d * 4, features_d * 8, 4, 2, 1),  # 4 x 4
                                     nn.Conv2d(features_d * 8, 1, kernel_size = 4, stride = 2, padding = 0) # 1 x 1
                                 )
        
    def _block(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
                             nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias = False), 
                             nn.InstanceNorm2d(out_channels, affine = True), 
                             nn.LeakyReLU(0.2)
                            )
    def forward(self, x):
        return self.disc(x)

In [64]:
loader = DataLoader(dataset, batch_size = batch_size, shuffle = True)

gen = Generator(z_dim, channels_img, features_gen).to(device)
disc = Discriminator(channels_img, features_disc).to(device)

initialize_weights(gen)
initialize_weights(disc)

In [65]:
# gradient penalty

def gradient_penalty(disc, real, fake, device = "cpu"):
    
    batch_size, c, h, w = real.shape
    epsilon = torch.rand((batch_size, 1, 1, 1)).repeat(1, c, h, w).to(device)
    interpolated_images = real * epsilon + fake * (1 - epsilon)
    
#   calculate critic scores
    mixed_scores = disc(interpolated_images)
    
    gradient = torch.autograd.grad(
               inputs = interpolated_images,
               outputs = mixed_scores,
               grad_outputs = torch.ones_like(mixed_scores),
               create_graph = True,
               retain_graph = True
               )[0]
    
    gradient = gradient.view(gradient.shape[0], -1)
    gradient_norm = gradient.norm(2, dim = 1)
    gradient_penalty = torch.mean((gradient_norm - 1) ** 2)
    return gradient_penalty

In [66]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
learning_rate = 1e-4
batch_size = 64
image_size = 64
channels_img = 1
z_dim = 100
num_epochs = 5
features_disc = 64
features_gen = 64
critic_iterations = 5
lambda_gp = 100

In [67]:
opt_gen = optim.Adam(gen.parameters(), lr = learning_rate, betas = (0.0, 0.9))
opt_disc = optim.Adam(disc.parameters(), lr = learning_rate, betas = (0.0, 0.9))

In [71]:
fixed_noise = torch.rand(64, z_dim, 1, 1).to(device)
writer_fake = SummaryWriter(f"runs/WGANGP_MNIST/fake")
writer_real = SummaryWriter(f"runs/WGANGP_MNIST/real")
step = 0

In [72]:
# Training WGAN with gradient penalty

for epoch in range(num_epochs):
    for batch_idx, (real, _) in enumerate(loader):
        real = real.to(device)
        
        for _ in range(critic_iterations):
            noise = torch.randn((batch_size, z_dim, 1, 1)).to(device)
            fake = gen(noise)
        
    #       Train Critic max E[critic(x)] - E[critic(gen_fake)]  
            disc_real = disc(real).reshape(-1)
            disc_fake = disc(fake).reshape(-1)
            gp = gradient_penalty(disc, real, fake, device = device)
            disc_loss = -(torch.mean(disc_real) - torch.mean(disc_fake)) + lambda_gp * gp

            opt_disc.zero_grad()
            disc_loss.backward(retain_graph = True)
            opt_disc.step()
            

#       Train Generator min -E[critic(gen_fake)] 
        disc_fake = disc(fake).reshape(-1)
        gen_fake_loss = -torch.mean(disc_fake)

        opt_gen.zero_grad()
        gen_fake_loss.backward()
        opt_gen.step()

        if(batch_idx % 100 == 0):
            print(f"Epoch [{epoch} / {num_epochs}] Batch {batch_idx} / {len(loader)} \ Loss D : {disc_loss : .4f} , Loss G : {gen_fake_loss : .4f}")

            with torch.no_grad():
                fake = gen(fixed_noise)

                img_grid_real = torchvision.utils.make_grid(real[:32] , normalize = True)
                img_grid_fake = torchvision.utils.make_grid(fake[:32] , normalize = True)

                writer_real.add_image("Real", img_grid_real, global_step = step)
                writer_fake.add_image("Fake", img_grid_fake, global_step = step)

            step += 1

Epoch [0 / 5] Batch 0 / 938 \ Loss D : -0.1797 , Loss G : -0.3483
Epoch [0 / 5] Batch 100 / 938 \ Loss D : -0.3282 , Loss G : -0.5686
Epoch [0 / 5] Batch 200 / 938 \ Loss D : -0.2980 , Loss G : -0.4959
Epoch [0 / 5] Batch 300 / 938 \ Loss D :  0.6585 , Loss G : -0.5405
Epoch [0 / 5] Batch 400 / 938 \ Loss D : -0.0356 , Loss G : -0.6155
Epoch [0 / 5] Batch 500 / 938 \ Loss D :  4.0277 , Loss G : -0.3212
Epoch [0 / 5] Batch 600 / 938 \ Loss D : -0.5443 , Loss G : -0.4086
Epoch [0 / 5] Batch 700 / 938 \ Loss D : -0.1648 , Loss G : -0.4809
Epoch [0 / 5] Batch 800 / 938 \ Loss D : -0.3586 , Loss G : -0.4794
Epoch [0 / 5] Batch 900 / 938 \ Loss D : -0.4290 , Loss G : -0.4260


RuntimeError: The size of tensor a (64) must match the size of tensor b (32) at non-singleton dimension 0

In [166]:
# ---------------------------------- Neural Style Transfer ---------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------

In [74]:
import torch
import torch.nn as nn
import torch.optim as optim
from PIL import Image
import torchvision.transforms as transforms
import torchvision.models as models
from torchvision.utils import save_image

In [77]:
model = models.vgg19(pretrained = True).features
model

Sequential(
  (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (1): ReLU(inplace=True)
  (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (3): ReLU(inplace=True)
  (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (6): ReLU(inplace=True)
  (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (8): ReLU(inplace=True)
  (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (11): ReLU(inplace=True)
  (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (13): ReLU(inplace=True)
  (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (15): ReLU(inplace=True)
  (16): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (17): ReLU(inplace=True)
  (18): MaxPoo

In [78]:
class VGG(nn.Module):
    def __init__(self):
        super(VGG, self).__init__()

        self.chosen_features = ['0', '5', '10', '19', '28']
        self.model = models.vgg19(pretrained = True).features[:29]
    
    def forward(self, x):
        features = []
        
        for layer_num, layer in enumerate(self.model):
            x = layer(x)
            
            if str(layer_num) in self.chosen_features:
                features.append(x)
        
        return features

In [158]:
model = VGG()
model.to(device)

VGG(
  (model): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding

In [152]:
def load_image(image_name, loader, device):
    image = Image.open(image_name)
#     image.convert('L')
    print(image.getbands())
    image = loader(image).unsqueeze(0)
    return image.to(device)

In [153]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
image_size = 356
loader = transforms.Compose([transforms.Resize((image_size, image_size)), transforms.ToTensor()])    

In [161]:
original_image = load_image("goth-batman.jpg", loader, device)
style_image = load_image("picasso2.jpg", loader, device)
generated_image = original_image.clone().requires_grad_(True)

('R', 'G', 'B')
('R', 'G', 'B')


In [162]:
original_image.shape

torch.Size([1, 3, 356, 356])

In [163]:
total_steps = 6000
learning_rate = 0.001
alpha = 1
beta = 0.01
optimizer = optim.Adam([generated_image], lr = learning_rate)

In [164]:
for step in range(total_steps):
    generated_features = model(generated_image)
    original_features  = model(original_image)
    style_features     = model(style_image)
    
    style_loss = original_loss = 0
    
    for g_feat, o_feat, s_feat in zip(generated_features, original_features, style_features):
        batch_size, channels_img, height, width = g_feat.shape
        
        original_loss += torch.mean((g_feat - o_feat) ** 2)
        
#       Compute Gram Matrix
        
        G = g_feat.view(channels_img, height * width).mm(g_feat.view(channels_img, height * width).t())
        S = s_feat.view(channels_img, height * width).mm(s_feat.view(channels_img, height * width).t())
        
        style_loss += torch.mean((G - S) ** 2)
        
    total_loss = (alpha * original_loss) + (beta * style_loss)
    
    optimizer.zero_grad()
    total_loss.backward()
    optimizer.step()
    
    if(step % 200 == 0):
        print(total_loss)
        save_image(generated_image, "generated_2.png")

tensor(2482648.7500, device='cuda:0', grad_fn=<AddBackward0>)
tensor(491829.0312, device='cuda:0', grad_fn=<AddBackward0>)
tensor(165773.3750, device='cuda:0', grad_fn=<AddBackward0>)
tensor(74086.3281, device='cuda:0', grad_fn=<AddBackward0>)
tensor(37368.5820, device='cuda:0', grad_fn=<AddBackward0>)
tensor(20537.1309, device='cuda:0', grad_fn=<AddBackward0>)
tensor(12738.3027, device='cuda:0', grad_fn=<AddBackward0>)
tensor(9002.5391, device='cuda:0', grad_fn=<AddBackward0>)
tensor(7043.1821, device='cuda:0', grad_fn=<AddBackward0>)
tensor(5879.6885, device='cuda:0', grad_fn=<AddBackward0>)
tensor(5104.6313, device='cuda:0', grad_fn=<AddBackward0>)
tensor(4540.7573, device='cuda:0', grad_fn=<AddBackward0>)
tensor(4107.9355, device='cuda:0', grad_fn=<AddBackward0>)
tensor(3760.5364, device='cuda:0', grad_fn=<AddBackward0>)
tensor(3472.5889, device='cuda:0', grad_fn=<AddBackward0>)
tensor(3228.2664, device='cuda:0', grad_fn=<AddBackward0>)
tensor(3017.2554, device='cuda:0', grad_fn=<A

In [167]:
# ---------------------------------- LeNet ---------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------

In [168]:
import torch
import torch.nn as nn

In [171]:
class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        self.relu = nn.ReLU()
        self.pool = nn.AvgPool2d(kernel_size = (2,2) , stride = (2,2))
        self.conv1 = nn.Conv2d(in_channels = 1, out_channels = 6, kernel_size = (5,5), stride = (1,1), padding = (0,0))
        self.conv2 = nn.Conv2d(in_channels = 6, out_channels = 16, kernel_size = (5,5), stride = (1,1), padding = (0,0))
        self.conv3 = nn.Conv2d(in_channels = 16, out_channels = 120, kernel_size = (5,5), stride = (1,1), padding = (0,0))
        self.linear1 = nn.Linear(120 , 84)
        self.linear2 = nn.Linear(84 , 10)
    
    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.pool(x)
        x = self.relu(self.conv2(x))
        x = self.pool(x)
        x = self.relu(self.conv3(x))
        x = x.reshape(x.shape[0], -1)
        x = self.relu(self.linear1(x))
        x = self.linear2(x)
        return x

In [172]:
x = torch.randn((64, 1, 32, 32))
model = LeNet()
print(model(x).shape)

torch.Size([64, 10])


In [173]:
# ---------------------------------- GoogleNet ---------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------------------

In [174]:
import torch
import torch.nn as nn

In [179]:
class conv_block(nn.Module):
    def __init__(self, in_channels, out_channels, **kwargs):
        super(conv_block, self).__init__()
        self.relu = nn.ReLU()
        self.conv = nn.Conv2d(in_channels, out_channels, **kwargs)
        self.batchnorm = nn.BatchNorm2d(out_channels)
    
    def forward(self, x):
        return self.relu(self.batchnorm(self.conv(x)))

In [199]:
class Inception_Block(nn.Module):
    def __init__(self, in_channels, out_1x1, red_3x3, out_3x3, red_5x5, out_5x5, out_1x1pool):
        super(Inception_Block, self).__init__()
        
        self.branch1 = conv_block(in_channels, out_1x1, kernel_size = 1)
        
        self.branch2 = nn.Sequential(
                                     conv_block(in_channels, red_3x3, kernel_size = 1),
                                     conv_block(red_3x3, out_3x3, kernel_size = 3, stride = 1, padding = 1)
                                    )
        
        self.branch3 = nn.Sequential(
                                     conv_block(in_channels, red_5x5, kernel_size = 1),
                                     conv_block(red_5x5, out_5x5, kernel_size = 5, stride = 1, padding = 2)
                                    )
        
        self.branch4 = nn.Sequential(
                                     nn.MaxPool2d(kernel_size = 3, stride = 1, padding = 1),
                                     conv_block(in_channels, out_1x1pool, kernel_size = 1)
                                    )
        
    def forward(self, x):
        return torch.cat([self.branch1(x), self.branch2(x), self.branch3(x), self.branch4(x)], dim = 1)

In [200]:
class GoogleNet(nn.Module):
    def __init__(self, in_channels = 3, num_channels = 1000):
        super(GoogleNet, self).__init__()
        
        self.conv1 = conv_block(in_channels = in_channels, out_channels = 64, kernel_size = (7,7), stride = (2,2), padding = (3,3))
        self.maxpool1 = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        self.conv2 = conv_block(64, 192, kernel_size = 3, stride = 1, padding = 1)
        self.maxpool2 = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)

        #  in_channels, out_1x1, red_3x3, out_3x3, red_5x5, out_5x5, out_1x1pool
        self.inception3a = Inception_Block(192, 64, 96, 128, 16, 32, 32)
        self.inception3b = Inception_Block(256, 128, 128, 192, 32, 96, 64)
        self.maxpool3 = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        
        self.inception4a = Inception_Block(480, 192, 96, 208, 16, 48, 64)
        self.inception4b = Inception_Block(512, 160, 112, 224, 24, 64, 64)
        self.inception4c = Inception_Block(512, 128, 128, 256, 24, 64, 64)
        self.inception4d = Inception_Block(512, 112, 144, 288, 32, 64, 64)
        self.inception4e = Inception_Block(528, 256, 160, 320, 32, 128, 128)
        self.maxpool4 = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        
        self.inception5a = Inception_Block(832, 256, 160, 320, 32, 128, 128)
        self.inception5b = Inception_Block(832, 384, 192, 384, 48, 128, 128)
        
        self.avgpool = nn.AvgPool2d(kernel_size = 7, stride = 1)
        self.dropout = nn.Dropout(p = 0.4)
        self.fc1 = nn.Linear(1024, num_channels)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.maxpool2(x)  
        
        x = self.inception3a(x)
        x = self.inception3b(x)
        x = self.maxpool3(x)
        
        x = self.inception4a(x)
        x = self.inception4b(x)
        x = self.inception4c(x)
        x = self.inception4d(x)
        x = self.inception4e(x)
        x = self.maxpool4(x)
        
        x = self.inception5a(x)
        x = self.inception5b(x)
        
        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.dropout(x)
        x = self.fc1(x)
        return x

In [201]:
x = torch.randn(3, 3, 224, 224)
model = GoogleNet()
print(model(x).shape)

torch.Size([3, 1000])
