In [None]:
from collections import defaultdict
import math
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchtext
import wandb
from torchtext.data import RawField, ReversibleField, LabelField
from torchtext.datasets import WikiText2

In [None]:
config = {
            'embedding_dim': 8,
            'attention_heads': 4,
            'device': 'cuda',
            'datafile': './city_names.txt',
            'learning_rate': 0.1,
            'encoder_blocks': 2,
            'decoder_blocks': 2,
            'max_tokens': 50,
    
            #'dropout': 0.1,
            #'batch_size': 400,
            #'dataset': 'imagenette2-320',
            #'init_gain': 5,
            #'initializer': None,
            #'load_workers': os.cpu_count(), 
            #'max_epochs': 1000,
            #'optimizer': 'SGD',
            #'random_seed': 1,
            #'training_loops': 4,
            #'cuda_device_ids': [0, 1, 2],
            #'num_hidden_nodes': 300,
         }

device = torch.device(config['device'])


Pulling list if cities from: https://www.britannica.com/topic/list-of-cities-and-towns-in-the-United-States-2023068


In [None]:
class SelfAttention(nn.Module):
    def __init__(self, embedding_dim=config['embedding_dim'], attention_heads=config['attention_heads']):
        super().__init__()
        #print('')
        k_d = int(embedding_dim / attention_heads)
        self.Wq = torch.randn((attention_heads, embedding_dim, k_d))
        self.Wk = torch.randn((attention_heads, embedding_dim, k_d))
        self.Wv = torch.randn((attention_heads, embedding_dim, k_d))
        self.softmax = nn.Softmax(dim=2)
    
    def forward(self, in_vectors):
        # in_vectors.shape = (max_tokens, embedding_dim)
        #print('in_vectors.shape:', in_vectors.shape)
        #print('self.Wq.shape:', self.Wq.shape)
        #print('self.Wk.shape:', self.Wk.shape)
        #print('self.Wv.shape:', self.Wv.shape)
        
        queries = torch.matmul(in_vectors, self.Wq) #shape = (heads, max_tokens, k_d)
        keys = torch.matmul(in_vectors, self.Wk) #shape = (heads, max_tokens, k_d)
        values = torch.matmul(in_vectors, self.Wv) #shape = (heads, max_tokens, k_d)
        k_d = keys.shape[2]

        scores = torch.matmul(queries, torch.transpose(keys, 1,2)) #shape = (heads, max_tokens, num vectors)
        #print('scores:', scores)
        #print('scores / math.sqrt(k_d):', scores / math.sqrt(k_d))
        normalized_scores = self.softmax(scores / math.sqrt(k_d)) #shape = (heads, max_tokens, num vectors)
        Zi = torch.matmul(normalized_scores, values)  #shape = (heads, max_tokens, k_d)
        Z = torch.squeeze(torch.cat(torch.split(Zi, 1, dim=0), 2)) #shape = (max_tokens, embedding_dim)

        return Z  # shape = (max_tokens, embedding_dim)

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, embedding_dim=config['embedding_dim'], attention_heads=config['attention_heads']):
        super().__init__()
        
        self.attention = SelfAttention(embedding_dim=embedding_dim, attention_heads=attention_heads)
        self.attn_norm = nn.LayerNorm(embedding_dim)
        self.ffnn = torch.nn.Linear(embedding_dim, embedding_dim)
        self.ffnn_norm = nn.LayerNorm(embedding_dim)

    def forward(self, in_vectors):
        # in_vectors.shape = (max_tokens, embedding_dim)
        #print('in_vectors:', in_vectors)
        attn = self.attention(in_vectors)
        #print('attn:', attn)
        a1 = self.attn_norm(in_vectors + attn)  # shape = (max_tokens, embedding_dim)
        a2 = self.ffnn_norm(a1 + self.ffnn(a1))  # shape = (max_tokens, embedding_dim)
        return a2


In [None]:
class Encoder(nn.Module):
    def __init__(self, 
                 embedding_dim=config['embedding_dim'], 
                 attention_heads=config['attention_heads'], 
                 num_blocks=config['encoder_blocks']):
        super().__init__()
                
        blocks = []
        for i in range(num_blocks):
            blocks.append(EncoderBlock(embedding_dim=embedding_dim, attention_heads=attention_heads))
        self.blocks = nn.Sequential(*blocks)
    
    def forward(self, in_vectors):
        # FIXME: positional offsets
        return self.blocks(in_vectors)
    

In [None]:
class EncoderDecoderAttention(nn.Module):
    def __init__(self, embedding_dim=config['embedding_dim'], attention_heads=config['attention_heads']):
        super().__init__()
        k_d = int(embedding_dim / attention_heads)
        self.Wq = torch.randn((attention_heads, embedding_dim, k_d))
        self.Wk = torch.randn((attention_heads, embedding_dim, k_d))
        self.Wv = torch.randn((attention_heads, embedding_dim, k_d))
        self.softmax = nn.Softmax(dim=2)
    
    def forward(self, in_vectors, encoder_vectors):
        # in_vectors.shape = (number of vectors, embedding_dimension)
        queries = torch.matmul(in_vectors, self.Wq) #shape = (heads, num vectors, k_d)
        keys = torch.matmul(encoder_vectors, self.Wk) #shape = (heads, num vectors, k_d)
        values = torch.matmul(encoder_vectors, self.Wv) #shape = (heads, num vectors, k_d)
        k_d = keys.shape[2]

        scores = torch.matmul(queries, torch.transpose(keys, 1,2)) #shape = (heads, num vectors, num vectors)
        #print('scores:', scores)
        #print('scores / math.sqrt(k_d):', scores / math.sqrt(k_d))
        normalized_scores = self.softmax(scores / math.sqrt(k_d)) #shape = (heads, num vectors, num vectors)
        Zi = torch.matmul(normalized_scores, values)  #shape = (heads, num vectors, k_d)
        Z = torch.squeeze(torch.cat(torch.split(Zi, 1, dim=0), 2)) #shape = (num vectors, embedding_dim)

        return Z  # shape = (num vectors, embedding_dim)

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, embedding_dim=config['embedding_dim'], attention_heads=config['attention_heads']):
        super().__init__()
        
        # FIXME: mask out future self-attention
        self.self_attention = SelfAttention(embedding_dim=embedding_dim, attention_heads=attention_heads)
        self.self_attn_norm = nn.LayerNorm(embedding_dim) 
        self.enc_attention = EncoderDecoderAttention(embedding_dim=embedding_dim, attention_heads=attention_heads)
        self.enc_attn_norm = nn.LayerNorm(embedding_dim) 
        self.ffnn = torch.nn.Linear(embedding_dim, embedding_dim)
        self.ffnn_norm = nn.LayerNorm(embedding_dim) 

    def forward(self, all_vectors):
        in_vectors, encoder_vectors = all_vectors
        a1 = self.self_attn_norm(in_vectors + self.self_attention(in_vectors))
        a2 = self.enc_attn_norm(a1 + self.enc_attention(a1, encoder_vectors))
        a3 = self.ffnn_norm(a2 + self.ffnn(a1))
        print('a3[:,0]=', a3[:,0])
        return (a3, encoder_vectors)


In [None]:
class Decoder(nn.Module):
    def __init__(self, 
                 embedding_dim=config['embedding_dim'], 
                 attention_heads=config['attention_heads'], 
                 num_blocks=config['decoder_blocks']):
        super().__init__()
                
        blocks = []
        for i in range(num_blocks):
            blocks.append(DecoderBlock(embedding_dim=embedding_dim, attention_heads=attention_heads))
        self.blocks = nn.Sequential(*blocks)
        
        
    def forward(self, encoder_vectors):
        out_vectors, _ = self.blocks((encoder_vectors, encoder_vectors))
        return out_vectors

In [None]:
class Transformer(nn.Module):
    def __init__(self, 
                 vocab, 
                 embedding_dim=config['embedding_dim'], 
                 attention_heads=config['attention_heads'], 
                 encoder_blocks=config['encoder_blocks'],
                 decoder_blocks=config['decoder_blocks']):
        super().__init__()
        self.vocab = vocab
        self.encode = Encoder(embedding_dim=embedding_dim, attention_heads=attention_heads, num_blocks=encoder_blocks)
        self.decode = Decoder(embedding_dim=embedding_dim, attention_heads=attention_heads, num_blocks=decoder_blocks)

        self.ffnn = nn.Linear(embedding_dim, len(self.vocab))
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, string):
        embedded = self.vocab(string)
        print('embedded.shape:', embedded.shape)
        encoded = self.encode(embedded)
        decoded = self.decode(encoded)
        return decoded
        return self.softmax(self.ffnn(decoded)) 

In [None]:
class Vocab(nn.Module):
    def __init__(self, 
                 data_file=config['datafile'], 
                 embedding_dim=config['embedding_dim'], 
                 split_field='', 
                 max_tokens=config['max_tokens'],
                 device=device):
        super().__init__()
        
        self.data_file = data_file
        self.split_field = split_field
        self.embedding_dim = embedding_dim
        self.max_tokens = max_tokens

        self.itos = []
        self.stoi = {}
        self.stoe = {}
        self.freq = defaultdict(int)
        self._register_token('<EOS>')
        self._register_token(None)

        self.load_strings(self.data_file)

    def __len__(self):
        return len(self.itos)

    def _register_token(self, token):
        if not token in self.stoi:
            self.itos.append(token)
            self.stoi[token] = len(self) - 1
            self.stoe[token] = torch.randn(self.embedding_dim)
        self.freq[token] += 1

    def tokenize(self, string):
        if self.split_field == '':
            ret = list(string)
        else:
            ret = string.split(self.split_field)
        return list(map(str.lower, ret))

    def load_strings(self, filename):
        with open(filename, 'r') as f:
            for line in f.readlines():
                for token in self.tokenize(line):
                    self._register_token(token)
                    
    def embed(self, string):
        tokens = self.tokenize(string)
        tokens = tokens[:self.max_tokens - 1] + ['<EOS>']
        if len(tokens) < self.max_tokens:
            tokens.extend([None] * (self.max_tokens - len(tokens)))
        vectors = [self.stoe[token] for token in tokens]
        tensors = list(map(lambda t: t.unsqueeze(0), vectors))
        return torch.cat(tensors, 0)
                    
    def forward(self, string):
        return self.embed(string)
        

In [None]:
vocab = Vocab()
model = Transformer(vocab)

In [None]:
eval_model = model.eval()

In [None]:
t=eval_model('New York')

In [None]:
print(t[:,0])

In [None]:
import sys
print(sys.version)

In [None]:
vocab=Vocab()

In [None]:
len(vocab)

In [None]:
vocab.itos

In [None]:
-Inf

In [None]:
t.shape