In [24]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Categorical
from torch.utils.tensorboard import SummaryWriter
import numpy as np
from sklearn.model_selection import ParameterGrid

Models

In [26]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1, dropout=0):
        super(RNN, self).__init__()
        self.embedding = nn.Embedding(input_size, input_size)
        self.rnn = nn.RNN(input_size=input_size, 
                          hidden_size=hidden_size,
                          num_layers=num_layers,
                          dropout=dropout)
        self.decoder = nn.Linear(in_features=hidden_size,
                                 out_features=output_size)
        
    def forward(self, x, hidden_state):
        embeddings = self.embedding(x)
        output, hidden_state = self.rnn(embeddings, hidden_state)
        output = self.decoder(output)
        return output, hidden_state

Training data

In [27]:
text = open("input.txt", 'r').read()
chars = sorted(list(set(text)))
data_size = len(text)
vocab_size = len(chars)
print("-------------------------------------------------------------------")
print("Preparing training data......")
print(f"Total number of characters in the text is: {data_size}")
print(f"Total number of unique characters is: {vocab_size}")
print(f"The unique characters are: {chars}")
print("-------------------------------------------------------------------")

chars_to_ids = {c:i for i, c in enumerate(chars)}
ids_to_chars = {i:c for i, c in enumerate(chars)}

encode = lambda s:[chars_to_ids[c] for c in s]
decode = lambda l:[ids_to_chars[i] for i in l]

data = torch.tensor(encode(text), dtype=torch.long)
split = int(0.9*len(data))
train_data = data[:split]
test_data = data[split:]
# train_data = torch.unsqueeze(data[:split], dim=1)
# test_data = torch.unsqueeze(data[split:], dim=1)

-------------------------------------------------------------------
Preparing training data......
Total number of characters in the text is: 1115394
Total number of unique characters is: 65
The unique characters are: ['\n', ' ', '!', '$', '&', "'", ',', '-', '.', '3', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
-------------------------------------------------------------------


In [28]:
# Parameters
hidden_size = 256
seq_len = 128
num_layers = 3
dropout = 0
lr = 1e-4
epochs = 80
test_seq_len = 5000

# device
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Available device: {device}")

Available device: cuda


In [32]:
param_dict = {'hidden_size':[256],
              'dropout':[0],
              'lr':[1e-4],
              'epoch':[100],
              'num_layers':[1, 2, 3, 4, 5],
              'seq_len':[128]
              }
param_grid = ParameterGrid(param_dict)

In [33]:
def train(param_dict, device, train_data, vocab_size, experiment_name):
    
    model = RNN(input_size=vocab_size,
                hidden_size=param_dict['hidden_size'],
                output_size=vocab_size,
                num_layers=param_dict['num_layers'],
                dropout=param_dict['dropout']).to(device)
    
    # Record experiment
    log_dir = os.path.join('runs', experiment_name, 'num_layers', str(param_dict['num_layers']))
    writer = SummaryWriter(log_dir=log_dir)
    
    model.train()
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(params=model.parameters(), lr=param_dict['lr'])
    train_data = train_data.to(device)

    print("Training.....")
    for epoch in range(param_dict['epoch']):
        start_idx = torch.randint(0, 200, (1,))
        n = 0    
        total_loss = 0
        hidden_state = None
        seq_len = param_dict['seq_len']
        
        while True:
            input_seq = train_data[start_idx : start_idx+seq_len]
            target_seq = train_data[start_idx+1 : start_idx+seq_len+1]
            
            output_seq, hidden_state = model(input_seq, hidden_state)
            hidden_state = hidden_state.data
            
            loss = loss_fn(torch.squeeze(output_seq), torch.squeeze(target_seq))
            total_loss += loss
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            start_idx += seq_len
            n += 1
            
            if start_idx + seq_len + 1 > len(train_data) - 1:
                break
            
        if epoch % 1 == 0:    
            print(f"Epoch: {epoch}")
            print(f"Loss: {total_loss/n:.5f}") 
        
        writer.add_scalars(main_tag='Train Loss', tag_scalar_dict={'train_loss':total_loss}, global_step=epoch)
        
    writer.close()

In [34]:
for parameters in param_grid:
    train(param_dict=parameters, device=device, train_data=train_data, vocab_size=vocab_size, experiment_name='Model_layer_test')

Training.....
Epoch: 0
Loss: 2.19266
Training.....
Epoch: 0
Loss: 2.08227
Training.....
Epoch: 0
Loss: 2.03990
Training.....
Epoch: 0
Loss: 2.03145
Training.....
Epoch: 0
Loss: 2.04331


In [None]:
def generate(model, test_seq_len, test_data):
    
    test_data = test_data.to(device)
    print("Generating Texts.....")

    char_num = 0
    hidden_state_gen = None
    start_idx_gen = torch.randint(0, 200, (1,))
    input_seq_gen = test_data[start_idx_gen : start_idx_gen+seq_len]
    while True:
        
        input = input_seq_gen[-seq_len:]
        output, hidden_state_gen = model(input, hidden_state_gen)
        output = F.softmax(torch.squeeze(output[-1]), dim=0)
        dist = Categorical(output)
        index = dist.sample()
        print(ids_to_chars[index.item()], end='')
        
        input_seq_gen = torch.cat((input_seq_gen, index.unsqueeze(dim=0)), dim=0)
        char_num += 1
        
        if char_num > test_seq_len:
            break
    print('\n')

In [None]:
generate(rnn, test_seq_len, test_data)

In [None]:
train(rnn_3, 150, lr, device, train_data)

In [None]:
generate(rnn_3, test_seq_len, test_data)

Image Generation

Dataset

In [None]:
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.transforms import ToTensor
from torchvision.datasets import FashionMNIST, MNIST, CIFAR10
import matplotlib.pyplot as plt

In [None]:
path = '..\Data'
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Lambda(lambda x: (x * 255).type(torch.long))
])
train_data = FashionMNIST(root=path, train=True, download=True, transform=transform)
test_data = FashionMNIST(root=path, train=False, download=True, transform=transform)
# train_data = MNIST(root=path, train=True, download=True, transform=ToTensor())
# test_data = MNIST(root=path, train=False, download=True, transform=ToTensor()) 

In [None]:
fig = plt.figure(figsize=(6, 6))
for i in range(1, 10):
    idx = torch.randint(0, len(train_data), (1,)).item()
    img, label = train_data[idx]
    fig.add_subplot(3, 3, i)
    plt.imshow(img.squeeze())
    plt.title(train_data.classes[label])
    plt.axis(False)

In [None]:
train_data_loader = DataLoader(train_data, batch_size=32, shuffle=True)

In [None]:
train_data_flatten = torch.empty((1), dtype=torch.long)
i = 1
for x, _ in train_data_loader:
    flattened = x.flatten()
    train_data_flatten = torch.cat((train_data_flatten, flattened))

In [None]:
class ImageRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1, dropout=0):
        super(ImageRNN, self).__init__()
        self.embedding = nn.Embedding(input_size, input_size)
        self.rnn = nn.RNN(input_size=input_size, 
                          hidden_size=hidden_size,
                          num_layers=num_layers,
                          dropout=dropout)
        self.decoder = nn.Linear(in_features=hidden_size,
                                 out_features=output_size)
        
    def forward(self, x, hidden_state):
        embeddings = self.embedding(x)
        output, hidden_state = self.rnn(embeddings, hidden_state)
        output = self.decoder(output)
        return output, hidden_state

In [None]:
ImgRNN = ImageRNN(input_size=256, 
                  hidden_size=512,
                  output_size=256,
                  num_layers=1,
                  dropout=0).to(device)

In [None]:
train(ImgRNN, epochs=40, lr=0.0003, device=device, train_data=train_data_flatten, seq_len=128)