<a href="https://colab.research.google.com/github/CR4ZYM4D/GPT/blob/master/colabmodelNotebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import mmap
import random
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import numpy as np
from transformers import AutoTokenizer

from google.colab import drive

#using GPU if available
drive.mount('/content/drive')
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

tokenizer = AutoTokenizer.from_pretrained('gpt2')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
cuda


In [21]:
# important constants to be used in the model

block_size = 1026 # size of a single word or a combination of words (we will refer to this a s a block)

batch_size = 12 # no. of said blocks or words that we will handle at once

vector_dimension = 512 # dimensions of each of the alphabet or token vector

dropout = 0.5

n_heads = 12 # no of attention heads

n_layers = 6 # no of block layers used

max_sequence_length = 1026 # max no of tokens that will be generated

learning_rate = 3e-4

max_iterations = 2000

train_step_iteration = max_iterations/10

max_test_iterations = 500

test_iterations = 10

test_step_iterations = 50

model_path = './content/models/nb/colabmodel.pkl'

In [22]:
# function to read a text file and return all characters present in it

def readTextFile(path):

    with open(path, 'r', encoding = 'UTF-8') as f:

        text = f.read()

    characters = sorted(set(text))

    return characters


In [23]:
    !unzip "/content/drive/My Drive/training data.zip" -d "/content/training_data"

Archive:  /content/drive/My Drive/training data.zip
replace /content/training_data/training data.txt? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [49]:
# function to get a chunk from memory of training/testing data

def getChunk(split: str, i):

    file_path = '/content/training_data/training data.txt' if split == 'train' else './dataset/testing data.txt'

    with open(file_path, "rb") as f:

        with mmap.mmap(f.fileno(), 0, access = mmap.ACCESS_READ) as mm:

            file_size = len(mm)

            start_pos = i

            mm.seek(start_pos)

            block = mm.read(block_size * batch_size)

            decoded_block = block.decode(encoding = "utf-8", errors = "ignore").replace("\r", "")

            data = tokenizer.encode(text = decoded_block, add_special_tokens= True, return_tensors = 'pt').aslist()

    return data

In [53]:
# getting the batch from either of the splits. Training split by default

def getBatch(split = "train", idx = 0):

    data = getChunk(split, idx)

    index = torch.randint(0, block_size -1, size = (batch_size,))

    x = torch.stack([data[i : i + block_size] for i in index])
    y = torch.stack([data[i+1: i+block_size+1] for i in index])

    # Add assertions to check the range of target indices
    assert torch.all(y >= 0), "Target indices contain negative values."
    assert torch.all(y < tokenizer.vocab_size), f"Target indices are out of vocabulary range. Max target index: {torch.max(y)}, Vocab size: {tokenizer.vocab_size}"

    x, y = x.to(device), y.to(device)

    return x, y

In [26]:

# class that performs the feed forward mechanism of the decoder block (class that normalizes the vectors, aplies Relu and again normalizes them)

class FeedForward(nn.Module):

    def __init__(self, vector_dimension):
        super().__init__()

        self.layer = nn.Sequential(
            nn.Linear(vector_dimension, vector_dimension * 4),
            nn.ReLU(),
            nn.Linear(vector_dimension * 4, vector_dimension)
        )

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):

        return self.dropout(self.layer(x))

In [27]:
# class for the single head self attention

class AttentionHead(nn.Module):

    def __init__(self, dimension_head):
        super().__init__()

        self.key = nn.Linear(vector_dimension, dimension_head, bias = False)
        self.value = nn.Linear(vector_dimension, dimension_head, bias = False)
        self.query = nn.Linear(vector_dimension, dimension_head, bias = False)

        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):

        batch, time, channels = x.shape

        k = self.key(x)
        q = self.query(x)
        v = self.value(x)

        att = (q @ k.transpose(-2, -1)) / (k.shape[-1] ** 0.5)

        att = att.masked_fill(self.tril[:time, :time] == 0, float('-inf'))

        att = F.softmax(att, dim = -1)
        att = self.dropout(att)
        out = att @ v

        return out

In [28]:
# class for the multi head attention mechanism

class MultiHeadAttention(nn.Module):

    def __init__(self, n_heads, dimension_head):
        super().__init__()

        self.attention_heads = nn.ModuleList([AttentionHead(dimension_head) for _ in range (n_heads)])

        self.projection_layer = nn.Linear(n_heads * dimension_head, vector_dimension)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):

        out = torch.cat([h(x) for h in self.attention_heads], dim =-1)

        out = self.dropout(self.projection_layer(out))

        return out

In [29]:
# class of a single decoder block

class Block(nn.Module):

    def __init__(self, n_heads, vector_dimension):
        super().__init__()

        dimension_head = vector_dimension // n_heads

        self.attention = MultiHeadAttention(n_heads, dimension_head)

        self.feed_forward = FeedForward(vector_dimension)

        self.layer_norm_1 = nn.LayerNorm(vector_dimension)

        self.layer_norm_2 = nn.LayerNorm(vector_dimension)

    def forward(self, x):

        y = self.attention(x)

        x = self.layer_norm_1(x + y)

        y = self.feed_forward(x)

        x = self.layer_norm_2(x + y)

        return x

In [50]:
# Building the model class

class GPTModel(nn.Module):

    # constructor

    def __init__(self, vocab_size):
        super().__init__()

        self.vocab_size = vocab_size

        self.token_embeddings = nn.Embedding(vocab_size, vector_dimension)

        self.positional_encodings = nn.Embedding(block_size, vector_dimension)

        self.layers = nn.Sequential(*[Block(n_heads, vector_dimension) for _ in range (n_layers)])

        self.final_layer_norm = nn.LayerNorm(vector_dimension)

        self.linear = nn.Linear(vector_dimension, vocab_size)

        self.apply(self.initWeights)

    # method to initialize the weights

    def initWeights(self, module):

        if isinstance(module, nn.Linear):

            torch.nn.init.normal_(module.weight, std = 0.02)

        elif isinstance(module, nn.LayerNorm):

            torch.nn.init.normal_(module.weight, std = 0.02)

    # method to forward the next token

    def forward(self, index, targets = None):

        batch, time = index.shape

        token_embedding = self.token_embeddings(index)

        positional_encoding = self.positional_encodings(torch.arange(time, device = device))

        x = token_embedding + positional_encoding

        x = self.layers(x)

        x = self.final_layer_norm(x)

        logits = self.linear(x)

        if targets == None:

            loss = None

        else:

            batch, time, channels = logits.shape

            logits = logits.view(batch * time, channels)

            targets = targets.view(batch * time)

            loss = F.cross_entropy(logits, targets)

        return logits, loss

    # method to generate the tokens

    def generate(self, index, max_sequence_length):

        result = torch.clone(index)

        for _ in range(max_sequence_length):

            logits, loss = self.forward(index)

            logits = logits[:, -1, :]

            probabilities = F.softmax(logits, dim = -1)

            next_index = torch.multinomial(probabilities, num_samples = 1)

            index = torch.cat((index, next_index), dim = 1)

            result = torch.cat((index, next_index), dim = 1)

            a, b = index.shape

            if b >= block_size:
                index = index[:, 1: ]

        return result

In [31]:
# method to optimize and train the model

def train(model: GPTModel, index):

    optimizer = torch.optim.AdamW(model.parameters(), lr = learning_rate)

    avg_training_losses = []

    training_losses = []

    sum = 0

    for i in range (max_iterations):

        x, y = getBatch("train", index)

        logits, loss = model.forward(x, y)

        sum += loss.item()

        training_losses.append(loss.item())

        avg_training_losses.append(sum/(i+1))

        optimizer.zero_grad(set_to_none = True)

        loss.backward()

        optimizer.step()

        if (i+1) % train_step_iteration == 0 :

            print(f"training loss at step {i+1} is: {loss.item(): .5f}")
            print(f"average training loss at step {i+1} is: {avg_training_losses[-1]: .5f}")

    # plt.scatter(np.arange(0, max_iterations), avg_training_losses)
    # plt.title(" average training data loss in n loops v/s num loops")
    # plt.xticks(np.arange(0, max_iterations+1, 1000))
    # plt.yticks(np.arange(0, 2.1, 0.1))
    # plt.grid(True)
    # plt.savefig(f"../graphs/nb/avg_training loss{learning_rate}.jpeg")
    # plt.show()

    # plt.scatter(np.arange(0, max_iterations), training_losses)
    # plt.title("training data loss in n loops v/s num loops")
    # plt.xticks(np.arange(0, max_iterations+1, 1000))
    # plt.yticks(np.arange(0, 10.1, 0.5))
    # plt.grid(True)
    # plt.savefig(f"../graphs/nb/training loss{learning_rate}.jpeg")
    # plt.show()

    torch.save(model, model_path)
    print("saved model")

In [32]:
# method to calculate the loss

@torch.no_grad()

def calculateLoss(model):

    model.eval()

    out = {}

    splits = ["train", "test"]

    for split in splits:

        losses = torch.zeros(test_iterations)

        for iteration in range (test_iterations):

            x, y = getBatch(split)

            logits, loss = model.forward(x, y)

            losses[iteration] = loss.item()

        out[split] = losses.mean()

        model.train()

    return out

In [55]:
vocab_size = tokenizer.vocab_size

model = torch.load(model_path, weights_only= False) if os.path.exists(model_path) else GPTModel(vocab_size)

model = model.to(device)


with open ('/content/training_data/training data.txt', 'r', encoding = 'utf-8') as f:
  i =0
  while f.readline() :

    train(model, i)

    i = i + block_size * batch_size


train_losses = []
test_losses = []

for i in range(max_test_iterations):

    loss = calculateLoss(model)

    train_losses.append(loss['train'])
    test_losses.append(loss['test'])

    if (i+1) % test_step_iterations == 0:

        print( f"at step {i+1} training loss is: {loss['train']: .5f} and testing loss is: {loss['test']: .5f}")

# plt.scatter(np.arange(1, max_test_iterations+1), train_losses, color = "r", label = "training set")
# plt.scatter(np.arange(1, max_test_iterations+1), test_losses, color = "g", label = "testing set")

# plt.xticks(np.arange(0, max_test_iterations+1, 2 * test_iterations))
# plt.yticks(np.arange(0, 2, 0.05))

# plt.xlabel("loop num.")
# plt.ylabel("data loss")

# plt.title("loss when testing")
# plt.grid(True)
# plt.legend()
# plt.savefig(f"../graphs/nb/testing loss{learning_rate}.jpeg")

while (True):

    prompt = input("Enter a prompt")

    context = tokenizer.encode(text = prompt,add_special_tokens= True, return_tensors = 'pt', device= device )

    context = context.unsqueeze(0)

    generated_chars = tokenizer.decode(model.generate(context, max_sequence_length))[0].tolist()

    print(generated_chars)

RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
