## Y PREDICTION MODEL

In [12]:
#creating model
import time
import torch
from torch import nn

#creating dataset
from torch.utils.data import TensorDataset, DataLoader

#warnings
import warnings

#default settings
warnings.filterwarnings("ignore")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [13]:
LENGTH_SEQUENCE = 10
LEARNING_RATE = 0.1
NUM_EPOCHS = 500
BATCH_SIZE = 512
INPUT_DIM = 10

# Data preprocessing

In [14]:
# Generating Sequence 
def generateSequence(length = LENGTH_SEQUENCE):
    x = torch.randint(0, 10, (length,))
    y = torch.cat((x[0].unsqueeze(dim=0), torch.add(x[0], x[1:])))

    for i in y:
        if i >= 10:
            i -= 10

    return x.unsqueeze(dim=0), y.unsqueeze(dim=0)

In [15]:
X,y = generateSequence()

In [16]:
# Creating dataset and make train test spliting

def createDataset(number_sequences = 10000, length_sequence = LENGTH_SEQUENCE, test_size = 0.2, val_size = 0.1, batch_size = BATCH_SIZE):
    x_train, y_train = generateSequence(length_sequence)
    for _ in range(number_sequences - 1):
        x, y = generateSequence(length_sequence)
        x_train = torch.cat((x_train, x))
        y_train = torch.cat((y_train, y))

    x_test, y_test = generateSequence(length_sequence)
    for _ in range(round(number_sequences * test_size) - 1):
        x, y = generateSequence(length_sequence)
        x_test = torch.cat((x_test, x))
        y_test = torch.cat((y_test, y))

    x_val, y_val = generateSequence(length_sequence)
    for _ in range(round(number_sequences * val_size) - 1):
        x, y = generateSequence(length_sequence)
        x_val = torch.cat((x_val, x))
        y_val = torch.cat((y_val, y))

    train_dataset = TensorDataset(x_train, y_train)
    test_dataset = TensorDataset(x_test, y_test)
    val_dataset = TensorDataset(x_val, y_val)

    train = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
    val = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

    return train, test, val

# Creating model

In [17]:
# Simple RNN model with LSTM,GRU or RNN cell

class Model(torch.nn.Module):

    def __init__(self, model, embed_dim, hidden_dim, layer_dim):
        super().__init__()
        self.embed = nn.Embedding(INPUT_DIM, embed_dim)
        self.model = model(embed_dim, hidden_dim, layer_dim, batch_first=True)
        self.linear = nn.Linear(hidden_dim, INPUT_DIM)

    def forward(self, sentence):
        embed = self.embed(sentence)
        output, hidden = self.model(embed)
        return self.linear(output)

# Creating training class

In [18]:
# Creating training class 

class Training:
    def __init__(self, model):
        self.model = model
        self.loss_fn = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.SGD(self.model.parameters(), lr=LEARNING_RATE)

    def train(self, train, test):
        for epoch in range(1, NUM_EPOCHS + 1):
            train_loss, train_accuracy, iter_num = .0, .0, .0
            start_epoch_time = time.time()
            self.model.train()
            for x, y in train:
                x = x.to(device)
                y = y.view(1, -1).squeeze().to(device)

                self.optimizer.zero_grad()

                out = self.model.forward(x).view(-1, INPUT_DIM)

                loss = self.loss_fn(out, y)
                train_loss += loss.item()

                batch_accuracy = (out.argmax(dim=1) == y)
                train_accuracy += batch_accuracy.sum().item() / batch_accuracy.shape[0]

                loss.backward()
                self.optimizer.step()
                iter_num += 1
            if (epoch < 2) | (epoch % 50 == 0):
                print(f"Epoch: {epoch}, loss: {train_loss:.4f}, acc: " f"{train_accuracy / iter_num:.4f}", end=" | ")

            test_loss, test_accuracy, iter_num = .0, .0, .0
            self.model.eval().to(device)
            for x, y in test:
                x = x.to(device)
                y = y.view(1, -1).squeeze().to(device)

                out = self.model.forward(x).view(-1, INPUT_DIM)

                loss = self.loss_fn(out, y)
                test_loss += loss.item()

                batch_accuracy = (out.argmax(dim=1) == y)
                test_accuracy += batch_accuracy.sum().item() / batch_accuracy.shape[0]
                iter_num += 1
            if (epoch < 2) | (epoch % 50 == 0):
                print(f"test loss: {test_loss:.4f}, test acc: {test_accuracy / iter_num:.4f} | " f"{time.time() - start_epoch_time:.2f} sec.")

# Training model

In [19]:
train, test, val = createDataset()

In [20]:
models = {
        "RNN": [nn.RNN, 32, 128, 5],
        "LSTM": [nn.LSTM, 32, 64, 1],
        "GRU": [nn.GRU, 32, 64, 1]
        }
model_RNN = Model(*models["RNN"]).to(device)
model_GRU = Model(*models["GRU"]).to(device)
model_LSTM = Model(*models["LSTM"]).to(device)

In [21]:
# Training RNN with rnn cell

training_RNN = Training(model_RNN)
training_RNN.train(train, test)

Epoch: 1, loss: 46.0642, acc: 0.0980 | test loss: 9.2085, test acc: 0.0979 | 0.46 sec.
Epoch: 50, loss: 39.7972, acc: 0.2340 | test loss: 7.9934, test acc: 0.2277 | 0.45 sec.
Epoch: 100, loss: 16.4282, acc: 0.5483 | test loss: 3.4211, test acc: 0.5341 | 0.41 sec.
Epoch: 150, loss: 0.1364, acc: 1.0000 | test loss: 0.0273, test acc: 1.0000 | 0.43 sec.
Epoch: 200, loss: 0.0512, acc: 1.0000 | test loss: 0.0103, test acc: 1.0000 | 0.23 sec.
Epoch: 250, loss: 0.0308, acc: 1.0000 | test loss: 0.0063, test acc: 1.0000 | 0.24 sec.
Epoch: 300, loss: 0.0218, acc: 1.0000 | test loss: 0.0044, test acc: 1.0000 | 0.21 sec.
Epoch: 350, loss: 0.0168, acc: 1.0000 | test loss: 0.0034, test acc: 1.0000 | 0.28 sec.
Epoch: 400, loss: 0.0136, acc: 1.0000 | test loss: 0.0028, test acc: 1.0000 | 0.28 sec.
Epoch: 450, loss: 0.0114, acc: 1.0000 | test loss: 0.0023, test acc: 1.0000 | 0.23 sec.
Epoch: 500, loss: 0.0098, acc: 1.0000 | test loss: 0.0020, test acc: 1.0000 | 0.23 sec.


In [22]:
# Training RNN with gru cell

training_GRU = Training(model_GRU)
training_GRU.train(train, test)

Epoch: 1, loss: 46.0690, acc: 0.1308 | test loss: 9.1985, test acc: 0.1266 | 0.14 sec.
Epoch: 50, loss: 45.1465, acc: 0.1933 | test loss: 9.0574, test acc: 0.1841 | 0.14 sec.
Epoch: 100, loss: 43.5866, acc: 0.1979 | test loss: 8.7343, test acc: 0.1896 | 0.13 sec.
Epoch: 150, loss: 39.6761, acc: 0.2556 | test loss: 7.9394, test acc: 0.2501 | 0.13 sec.
Epoch: 200, loss: 15.1622, acc: 0.8953 | test loss: 2.9665, test acc: 0.9001 | 0.11 sec.
Epoch: 250, loss: 1.5494, acc: 0.9999 | test loss: 0.3111, test acc: 0.9999 | 0.13 sec.
Epoch: 300, loss: 0.6326, acc: 1.0000 | test loss: 0.1278, test acc: 0.9999 | 0.12 sec.
Epoch: 350, loss: 0.3789, acc: 1.0000 | test loss: 0.0766, test acc: 1.0000 | 0.17 sec.
Epoch: 400, loss: 0.2651, acc: 1.0000 | test loss: 0.0536, test acc: 1.0000 | 0.13 sec.
Epoch: 450, loss: 0.2022, acc: 1.0000 | test loss: 0.0409, test acc: 1.0000 | 0.12 sec.
Epoch: 500, loss: 0.1626, acc: 1.0000 | test loss: 0.0328, test acc: 1.0000 | 0.11 sec.


In [23]:
# Training RNN with LSTM cell

training_LSTM = Training(model_LSTM)
training_LSTM.train(train, test)

Epoch: 1, loss: 46.0786, acc: 0.1023 | test loss: 9.2136, test acc: 0.0984 | 0.13 sec.
Epoch: 50, loss: 45.2861, acc: 0.1928 | test loss: 9.0751, test acc: 0.1845 | 0.14 sec.
Epoch: 100, loss: 44.5541, acc: 0.1944 | test loss: 8.9298, test acc: 0.1868 | 0.14 sec.
Epoch: 150, loss: 38.7255, acc: 0.2973 | test loss: 7.7640, test acc: 0.2894 | 0.16 sec.
Epoch: 200, loss: 24.0121, acc: 0.6104 | test loss: 4.8069, test acc: 0.6124 | 0.12 sec.
Epoch: 250, loss: 3.5890, acc: 0.9989 | test loss: 0.7207, test acc: 0.9987 | 0.13 sec.
Epoch: 300, loss: 1.0765, acc: 1.0000 | test loss: 0.2194, test acc: 0.9999 | 0.19 sec.
Epoch: 350, loss: 0.5496, acc: 1.0000 | test loss: 0.1134, test acc: 1.0000 | 0.13 sec.
Epoch: 400, loss: 0.3534, acc: 1.0000 | test loss: 0.0734, test acc: 1.0000 | 0.13 sec.
Epoch: 450, loss: 0.2554, acc: 1.0000 | test loss: 0.0534, test acc: 1.0000 | 0.13 sec.
Epoch: 500, loss: 0.1986, acc: 1.0000 | test loss: 0.0417, test acc: 1.0000 | 0.13 sec.


# Calculating score

In [24]:
# Calculating accuracy score on validation dataset

def calculatingScore(model):
    val_loss, val_accuracy, iter_num = .0, .0, 0
    loss_fn = nn.CrossEntropyLoss()
    model.eval()
    
    for x, y in val:
      x = x.to(device)
      y = y.view(1, -1).squeeze().to(device)
    
      out = model.forward(x).view(-1, INPUT_DIM)
    
      loss = loss_fn(out, y)
      val_loss += loss.item()
    
      batch_accuracy = (out.argmax(dim=1) == y)
      val_accuracy += batch_accuracy.sum().item() / batch_accuracy.shape[0]
      iter_num += 1
    
    print(f"val loss: {val_loss:.4f} | val acc: {val_accuracy / iter_num:.4f}")

# Results

In [25]:
print("RNN model: ")
calculatingScore(model_RNN)
print("GRU model: ")
calculatingScore(model_GRU)
print("LSTM model: ")
calculatingScore(model_LSTM)

RNN model: 
val loss: 0.0010 | val acc: 1.0000
GRU model: 
val loss: 0.0170 | val acc: 1.0000
LSTM model: 
val loss: 0.0205 | val acc: 1.0000
