In [None]:
import torch
import torch.nn as nn
import numpy as np
import pickle
from fastai.text.all import *

In [None]:
if torch.cuda.is_available():
  device = "cuda:0"
else:
  device = "cpu"
device

In [None]:
def scaled_dot_product_attention(query, key, value, mask=None):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / np.sqrt(dim_k)
    if mask is not None:
        if scores.shape[1] == mask.shape[1]:
            scores = scores.masked_fill(mask == 0, float("-inf"))
        else:
            mask = torch.tril(torch.ones(scores.shape[1], scores.shape[1], dtype=torch.float)).unsqueeze(0).to(device)
            scores = scores.masked_fill(mask == 0, float("-inf"))
    weights = F.softmax(scores, dim=-1)
    return weights.bmm(value)

class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim, vocab_size):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)
        self.mask = torch.tril(torch.ones(head_dim, head_dim, dtype=torch.float)).unsqueeze(0).to(device)

    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state), self.k(hidden_state), self.v(hidden_state), self.mask)
        return attn_outputs

class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim, config.vocab_size) for _ in range(num_heads)]
        )
        self.output_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, hidden_state):
        x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
        x = self.output_linear(x)
        return x

class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        
    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

class TransformerDecoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x):
        hidden_state = self.layer_norm_1(x)
        x = x + self.attention(hidden_state)
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x

class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size, 
                                             config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings,
                                                config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()

    def forward(self, input_ids):
        if input_ids.dtype != torch.int:
            input_ids = input_ids.int()
        # Create position IDs for input sequence
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.int).unsqueeze(0).to(device)
        # Create token and position embeddings
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        # Combine token and position embeddings
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        
        return embeddings

class TransformerDecoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerDecoderLayer(config) 
                                     for _ in range(config.num_hidden_layers)])

    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x

class ShellTransformer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.decoder = TransformerDecoder(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)
        
    def forward(self, x):
        x = self.decoder(x)#[:, 0, :] 
        x = self.dropout(x)
        x = self.classifier(x)
        return x

In [None]:
file = '/home/chris/University/gnn_project/dataset-preprocessed.txt'

# Initialize an empty list to store strings
txts = []

# Read the content of each line in the file and append it to the list
with open(file, 'r', encoding='utf-8') as f:
    for line in f:
        txts.append(line.strip())  # Strip any leading/trailing whitespace from the line

txts = L(txts)

In [None]:
class MyTokenizer(Transform):
    def setups(self, items):
        self.tok = BaseTokenizer()
        
    def encodes(self, txts):
        toks = self.tok(txts)
        tokenized_sentences = [[tok for tok in gen] for gen in toks]
        flattened_list = [item for sublist in tokenized_sentences for item in sublist if item]
        return flattened_list
    
    def decodes(self, encoded):
        decoded_values = TitledStr(''.join(encoded))
        return decoded_values
            
class MyNumerizer(Transform):
    def setups(self, items):
        self.num = Numericalize()
        self.num.setup(items)
        self.vocab = self.num.vocab
        

    def encodes(self, toks):
        return self.num(toks)
    
    def decodes(self, encoded):
        return self.num.decode(encoded)  
    
txt200 = txts[:]
tokn = MyTokenizer()
tokn.setup(txt200)

toks = txt200.map(tokn)

num = MyNumerizer()
num.setup(toks)

tfms = [[tokn, num]]
cut = int(len(txts[:limit])*0.8)
splits = [list(range(cut)), list(range(cut,len(txts[:limit])))]
dsets = Datasets(txts[:limit], tfms)
dls = dsets.dataloaders(dl_type=LMDataLoader)
dls.show_batch(max_n=2)

from transformers import AutoConfig
config = AutoConfig.from_pretrained('bert-base-uncased')

config.vocab_size = len(dls.vocab)
config.num_labels = len(dls.vocab)
config.hidden_size = 395
config.num_hidden_layers = 12
config.num_attention_heads = 5
config.max_position_embeddings = 512
config.intermediate_size = int((config.hidden_size * config.num_attention_heads) * 0.7)
transformer = ShellTransformer(config)

transformer.to(device)
dls.to(device)

learn = Learner(
    dls, 
    transformer, 
    loss_func=CrossEntropyLossFlat(), 
    metrics=[accuracy]
)

learn.fit_one_cycle(1, 1e-3)

learn.save('BaseTokenizer')

In [None]:
class MyTokenizer(Transform):
    def setups(self, items):
        self.tok = SpacyTokenizer()
        
    def encodes(self, txts):
        toks = self.tok(txts)
        tokenized_sentences = [[tok for tok in gen] for gen in toks]
        flattened_list = [item for sublist in tokenized_sentences for item in sublist if item]
        return flattened_list
    
    def decodes(self, encoded):
        decoded_values = TitledStr(''.join(encoded))
        return decoded_values
            
class MyNumerizer(Transform):
    def setups(self, items):
        self.num = Numericalize()
        self.num.setup(items)
        self.vocab = self.num.vocab
        

    def encodes(self, toks):
        return self.num(toks)
    
    def decodes(self, encoded):
        return self.num.decode(encoded)  
    
txt200 = txts[:]
tokn = MyTokenizer()
tokn.setup(txt200)

toks = txt200.map(tokn)

num = MyNumerizer()
num.setup(toks)

tfms = [[tokn, num]]
cut = int(len(txts[:limit])*0.8)
splits = [list(range(cut)), list(range(cut,len(txts[:limit])))]
dsets = Datasets(txts[:limit], tfms)
dls = dsets.dataloaders(dl_type=LMDataLoader)
dls.show_batch(max_n=2)

from transformers import AutoConfig
config = AutoConfig.from_pretrained('bert-base-uncased')

config.vocab_size = len(dls.vocab)
config.num_labels = len(dls.vocab)
config.hidden_size = 395
config.num_hidden_layers = 12
config.num_attention_heads = 5
config.max_position_embeddings = 512
config.intermediate_size = int((config.hidden_size * config.num_attention_heads) * 0.7)
transformer = ShellTransformer(config)

transformer.to(device)
dls.to(device)

learn = Learner(
    dls, 
    transformer, 
    loss_func=CrossEntropyLossFlat(), 
    metrics=[accuracy]
)

learn.fit_one_cycle(1, 1e-3)

learn.save('SpacyTokenizer')

In [None]:
class MyTokenizer(Transform):
    def setups(self, items):
        self.tok = WordTokenizer()
        
    def encodes(self, txts):
        toks = self.tok(txts)
        tokenized_sentences = [[tok for tok in gen] for gen in toks]
        flattened_list = [item for sublist in tokenized_sentences for item in sublist if item]
        return flattened_list
    
    def decodes(self, encoded):
        decoded_values = TitledStr(''.join(encoded))
        return decoded_values
            
class MyNumerizer(Transform):
    def setups(self, items):
        self.num = Numericalize()
        self.num.setup(items)
        self.vocab = self.num.vocab
        

    def encodes(self, toks):
        return self.num(toks)
    
    def decodes(self, encoded):
        return self.num.decode(encoded)  
    
txt200 = txts[:]
tokn = MyTokenizer()
tokn.setup(txt200)

toks = txt200.map(tokn)

num = MyNumerizer()
num.setup(toks)

tfms = [[tokn, num]]
cut = int(len(txts[:limit])*0.8)
splits = [list(range(cut)), list(range(cut,len(txts[:limit])))]
dsets = Datasets(txts[:limit], tfms)
dls = dsets.dataloaders(dl_type=LMDataLoader)
dls.show_batch(max_n=2)

from transformers import AutoConfig
config = AutoConfig.from_pretrained('bert-base-uncased')

config.vocab_size = len(dls.vocab)
config.num_labels = len(dls.vocab)
config.hidden_size = 395
config.num_hidden_layers = 12
config.num_attention_heads = 5
config.max_position_embeddings = 512
config.intermediate_size = int((config.hidden_size * config.num_attention_heads) * 0.7)
transformer = ShellTransformer(config)

transformer.to(device)
dls.to(device)

learn = Learner(
    dls, 
    transformer, 
    loss_func=CrossEntropyLossFlat(), 
    metrics=[accuracy]
)

learn.fit_one_cycle(1, 1e-3)

learn.save('WordTokenizer')

In [None]:
class MyTokenizer(Transform):
    def setups(self, items):
        path = untar_data(URLs.IMDB)
        self.tok = Tokenizer.from_folder(path)
        self.tok.setup(items)
        
    def encodes(self, txts):
        return self.tok(txts)
    
    def decodes(self, encoded):
        return self.tok.decode(encoded)
            
class MyNumerizer(Transform):
    def setups(self, items):
        self.num = Numericalize()
        self.num.setup(items)
        self.vocab = self.num.vocab

    def encodes(self, toks):
        return self.num(toks)
    
    def decodes(self, encoded):
        return self.num.decode(encoded) 
    
txt200 = txts[:]
tokn = MyTokenizer()
tokn.setup(txt200)

toks = txt200.map(tokn)

num = MyNumerizer()
num.setup(toks)

tfms = [[tokn, num]]
cut = int(len(txts[:limit])*0.8)
splits = [list(range(cut)), list(range(cut,len(txts[:limit])))]
dsets = Datasets(txts[:limit], tfms)
dls = dsets.dataloaders(dl_type=LMDataLoader)
dls.show_batch(max_n=2)

from transformers import AutoConfig
config = AutoConfig.from_pretrained('bert-base-uncased')

config.vocab_size = len(dls.vocab)
config.num_labels = len(dls.vocab)
config.hidden_size = 395
config.num_hidden_layers = 12
config.num_attention_heads = 5
config.max_position_embeddings = 512
config.intermediate_size = int((config.hidden_size * config.num_attention_heads) * 0.7)
transformer = ShellTransformer(config)

transformer.to(device)
dls.to(device)

learn = Learner(
    dls, 
    transformer, 
    loss_func=CrossEntropyLossFlat(), 
    metrics=[accuracy]
)

learn.fit_one_cycle(1, 1e-3)

learn.save('PreTrainedWordTokenizer')

In [None]:
class MyTokenizer(Transform):
    def setups(self, items):
        self.tok = SentencePieceTokenizer()#SubwordTokenizer(vocab_sz=300)
        self.tok.setup(items)
        
    def encodes(self, txts):
        toks = self.tok(txts)
        tokenized_sentences = [[tok for tok in gen] for gen in toks]
        flattened_list = [item for sublist in tokenized_sentences for item in sublist if item]
        return flattened_list
    
    def decodes(self, encoded):
        decoded_values = TitledStr(''.join(encoded))
        return decoded_values
            
class MyNumerizer(Transform):
    def setups(self, items):
        self.num = Numericalize()
        self.num.setup(items)
        self.vocab = self.num.vocab
        
    def encodes(self, toks):
        return self.num(toks)
    
    def decodes(self, encoded):
        return self.num.decode(encoded)  
    
txt200 = txts[:]
tokn = MyTokenizer()
tokn.setup(txt200)

toks = txt200.map(tokn)

num = MyNumerizer()
num.setup(toks)

tfms = [[tokn, num]]
cut = int(len(txts[:limit])*0.8)
splits = [list(range(cut)), list(range(cut,len(txts[:limit])))]
dsets = Datasets(txts[:limit], tfms)
dls = dsets.dataloaders(dl_type=LMDataLoader)
dls.show_batch(max_n=2)

from transformers import AutoConfig
config = AutoConfig.from_pretrained('bert-base-uncased')

config.vocab_size = len(dls.vocab)
config.num_labels = len(dls.vocab)
config.hidden_size = 395
config.num_hidden_layers = 12
config.num_attention_heads = 5
config.max_position_embeddings = 512
config.intermediate_size = int((config.hidden_size * config.num_attention_heads) * 0.7)
transformer = ShellTransformer(config)

transformer.to(device)
dls.to(device)

learn = Learner(
    dls, 
    transformer, 
    loss_func=CrossEntropyLossFlat(), 
    metrics=[accuracy]
)

learn.fit_one_cycle(1, 1e-3)

learn.save('SentencePieceTokenizer')