In [5]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from transformers import GPT2Tokenizer
from sklearn.model_selection import train_test_split
from torch.optim import Adam


def load_data(lines):

    print("Loading data...")
    dataset_dict = {}
    for line in lines:
        line = line.strip()

        words = line.split()

        for i , word in enumerate(words):
            if i > 0 and i < len(words) - 2: 
                feature = words[:i + 1]
                feature = " ".join(feature)
                feature = feature.replace(",", "").replace(".", "")
                feature = feature.strip()
                label = word + " " +words[i + 1]
                label = label.replace(",", "").replace(".", "")
                label = label.strip()

                dataset_dict[feature] = label
                

    with open("dataset_dict.txt", "w") as f:
        for key, value in dataset_dict.items():
            f.write(f"{key} v: {value}\n")

    return dataset_dict

with open("tinystories_100k.txt", "r") as file:
    lines = file.readlines()



In [6]:

class TextDataset(Dataset):
    def __init__(self, data_dict, tokenizer, max_lenght= 256):

        self.data = list(data_dict.items())
        self.tokenizer = tokenizer
        self.max_length = max_lenght

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        features, label = self.data[idx]
        feature_encoding = self.tokenizer(features, truncation=True, padding='max_length', max_length=self.max_length, return_tensors='pt')['input_ids'].squeeze()
        label_encoding = self.tokenizer(label, truncation=True, padding='max_length', max_length=self.max_length, return_tensors='pt')['input_ids'].squeeze()

        return feature_encoding, label_encoding
    
def collate_fn(batch):

    feature_encoding = []
    label_encoding = []

    for feature, label in batch:
        feature_encoding.append(feature)
        label_encoding.append(label)

    features = torch.stack(feature_encoding)
    labels = torch.stack(label_encoding)

    return features, labels


In [7]:
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.pad_token = tokenizer.eos_token 

dataset = TextDataset(load_data(lines), tokenizer)
dataloader = DataLoader(dataset, batch_size=32, collate_fn=collate_fn, shuffle=True)

Loading data...


In [None]:


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(torch.version.cuda)

class TextGenerationModel(nn.Module):
    def __init__(self, vocab_size, embd_size, hidden_size, output_size):
        super(TextGenerationModel,self).__init__()

        self.embedding_size = embd_size

        self.embedding = nn.Embedding(vocab_size, embd_size)
        self.lstm = nn.LSTM(embd_size, hidden_size, batch_first=True)
        
        self.f1 = nn.Linear(hidden_size, hidden_size // 2)
        self.f2 = nn.Linear(hidden_size // 2, hidden_size // 4)
        self.final_fc = nn.Linear(hidden_size // 4, output_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(0.1)

    def forward(self, text):
        text_embed = self.embedding(text)
        lstm_out, _ = self.lstm(text_embed)
        feat = self.gelu(self.f1(lstm_out))
        feat = self.dropout(feat)
        feat = self.gelu(self.f2(feat))
        feat = self.dropout(feat)
        out = self.final_fc(feat)

        return out
    

model = TextGenerationModel(vocab_size=tokenizer.vocab_size, embd_size=256, hidden_size=128, output_size=tokenizer.vocab_size).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=-100)
optimizer = Adam(model.parameters(), lr=0.005)

num_epochs = 50

for epoch in range(num_epochs):
    model.train()

    for i , (features, labels) in enumerate(dataloader):
        features, labels = features.to(device), labels.to(device)

        labels[labels == tokenizer.pad_token_id] = -100
        
        optimizer.zero_grad()
        outputs = model(features) 

        outputs = outputs.reshape(-1, tokenizer.vocab_size)
        labels  = labels.reshape(-1)       

        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()

    print("Epoch finished {}/{}. Loss: {:.4f}".format(epoch+1, num_epochs, loss.item()))

In [122]:
def eval_encoding(eval_line):
    eval_dict = []

    words = eval_line.split(" ")

    if len(words) > 1:
        features = words
        eval_dict.append(" ".join(features))

    return eval_dict


class InputData():
    def __init__(self, text, tokenizer, max_length=50):
        self.text = text
        self.tokenizer = tokenizer

        self.max_length = max_length

    def __len__(self):
        return len(self.text)
    
    def __getitem__(self, idx):
        text = self.text[idx]
        encoding = self.tokenizer(text, truncation=True, padding='max_length', max_length=self.max_length, return_tensors='pt')['input_ids'].squeeze()

        return {'input_ids': encoding}
    
def eval_collate_fn(batch):

    input_ids = torch.stack([item['input_ids'] for item in batch])
    return input_ids



model.eval()

with open("eval.txt", "r") as file:
    eval_line = file.read()


testdataset = InputData(eval_encoding(eval_line), tokenizer)
test_loader = DataLoader(testdataset, batch_size=32, collate_fn=eval_collate_fn, shuffle=False)

with torch.no_grad():
    for i , (features) in enumerate(test_loader):
        features = features.to(device)
        outputs = model(features)
        class_preds = torch.argmax(outputs, dim=2)
        print(GPT2Tokenizer.from_pretrained('gpt2').decode((class_preds.reshape(-1)[1])))

 red
