Training a decoder only transformer for generating Elon Musk tweets

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

Loading Elon Tweets (test dataset), cleaning up, and tokenizing with dict representing the vocab, also np arr will be saved with all the tweet combination lengths 

In [3]:
df = pd.read_csv('TweetsElonMusk.csv')
df = df['tweet']

# number of df rows
n = len(df)

# take all rows and concat them into one string
text = ''
for i in range(n):
    text += df[i]

# find unique chars in text
set_of_unique_chars = set(text)

# create a dictionary of unique chars, mapping each char to an int
char_to_int = {}

for i, char in enumerate(set_of_unique_chars):
    char_to_int[char] = i

# add 395 as <sos> token, 396 as <eos> token, 397 as <pad> token
char_to_int['<sos>'] = 395
char_to_int['<eos>'] = 396
char_to_int['<pad>'] = 397

# find row with longest string
max_len = 0
for i in range(n):
    if len(df[i]) > max_len:
        max_len = len(df[i])

elon_data = np.zeros((n, max_len + 2))

print(elon_data.shape)

# 395 is the sos token and 396 is the eos token, 397 is the padding token
for i in range(n):
    elon_data[i][0] = 395
    for j in range(len(df[i])):
        elon_data[i][j + 1] = char_to_int[df[i][j]]
    elon_data[i][len(df[i]) + 1] = 396
    elon_data[i][len(df[i]) + 2:] = 397

# save to np file
np.save('elon_data.npy', elon_data)

(12562, 426)


In [4]:
print(char_to_int['<pad>'])

397


functions for converting tokens into chars and chars into tokens

In [5]:
def tokens_to_chars(tokens):
    chars = []
    for token in tokens:
        for char in char_to_int:
            if char_to_int[char] == token:
                chars.append(char)
    return chars

def chars_to_tokens(chars):
    tokens = []
    for char in chars:
        tokens.append(char_to_int[char])
    return tokens

architecture parameters

In [91]:
layers = 3
epochs = 10
batch_size = 100
embedding_dim = 64
dict_size = len(char_to_int)
attention_dim = 16
feed_forward_dim = embedding_dim
num_heads = 8 

## Decoder Architecture 

In [147]:
class Decoder():
    def __init__(self,layers, epochs, batch_size, embedding_dim, dict_size, feed_forward_dim, num_heads, attention_dim):
        self.layers = layers
        self.epochs = epochs
        self.batch_size = batch_size
        self.embedding_dim = embedding_dim
        self.dict_size = dict_size
        self.feed_forward_dim = feed_forward_dim
        self.num_heads = num_heads
        self.attention_dim = attention_dim

        self.embedding = nn.Embedding(num_embeddings=dict_size+1, embedding_dim=embedding_dim)
        
        self.decoder_layer_1 = DecoderLayer(self.embedding_dim, self.attention_dim, self.feed_forward_dim, self.num_heads)
        self.decoder_layer_2 = DecoderLayer(self.embedding_dim, self.attention_dim, self.feed_forward_dim, self.num_heads)
        self.decoder_layer_3 = DecoderLayer(self.embedding_dim, self.attention_dim, self.feed_forward_dim, self.num_heads)

        self.linear = nn.Linear(in_features=embedding_dim, out_features=dict_size+1)
        self.softmax = nn.Softmax(dim=1)


    def forward(self, x):
        x = self.embedding(x)
        positional_encodings = self.positional_encoding(len(x), self.embedding_dim)
        x += positional_encodings
        
        x = self.decoder_layer_1.forward(x)

        pass

    ## This was generated by chat gpt-3, so hopefully it works
    def positional_encoding(self, seq_len, embedding_dim):
        positions = np.arange(seq_len)[:, np.newaxis]
        angles = np.power(10000, -(2 * (np.arange(embedding_dim) // 2) / embedding_dim))
        angles = angles[np.newaxis, :]

        positional_encodings = positions * angles

        # Apply sine to even indices in the array; 2i
        positional_encodings[:, 0::2] = np.sin(positional_encodings[:, 0::2])

        # Apply cosine to odd indices in the array; 2i+1
        positional_encodings[:, 1::2] = np.cos(positional_encodings[:, 1::2])

        positional_encodings = torch.from_numpy(positional_encodings).float()
        return positional_encodings

class DecoderLayer(nn.Module):    
    def __init__(self, embedding_dim, attention_dim, feed_forward_dim, num_heads):
        super(DecoderLayer, self).__init__()
        self.multi_head_attention = MultiHeadAttention(embedding_dim=embedding_dim, attention_dim=attention_dim, num_heads=num_heads)
        self.feed_forward = FeedForward()
        self.layer_norm = LayerNorm()

    def forward(self, x):
        x = self.multi_head_attention.forward(x)
        return x 

class MultiHeadAttention(nn.Module):
    def __init__(self, embedding_dim, attention_dim, num_heads):
        super(MultiHeadAttention, self).__init__()

        self.num_heads = num_heads
        self.attention_dim = attention_dim
        self.embedding_dim = embedding_dim

        # linear transform from embedding -> attention_dim 
        # later divide into attention heads 
        self.Q = nn.Linear(in_features=embedding_dim, out_features=attention_dim * num_heads)
        self.K = nn.Linear(in_features=embedding_dim, out_features=attention_dim * num_heads)
        self.V = nn.Linear(in_features=embedding_dim, out_features=attention_dim * num_heads)

    def forward(self, x):
        Q_vectors = self.Q(x)
        K_vectors = self.K(x)
        V_vectors = self.V(x)

        # this results in the shape sequence length, num_heads, attention_dim
        Q_vectors = torch.reshape(Q_vectors, (x.shape[0], self.num_heads, self.attention_dim))
        K_vectors = torch.reshape(K_vectors, (x.shape[0], self.num_heads, self.attention_dim))
        V_vectors = torch.reshape(V_vectors, (x.shape[0], self.num_heads, self.attention_dim))

        # now reshape into atten num heads, sequence length, attention dim
        Q_vectors = Q_vectors.transpose(0,1)
        K_vectors = K_vectors.transpose(0,1)
        V_vectors = V_vectors.transpose(0,1)

        # dot product of Q and K but not first dimension
        attention_scores = torch.matmul(Q_vectors, K_vectors.transpose(1,2))
        attention_scores = attention_scores / np.sqrt(self.attention_dim)

        # now the shape is attention head, sequence length, softmax values 

        # softmax over attention dim 
        attention_scores = nn.Softmax(dim=2)(attention_scores)

        # Multiply the value vectors by the attention scores {possible point of error}
        weighted_sum = torch.matmul(attention_scores, V_vectors)

        # Concatenate the heads back together
        concatenated = weighted_sum.transpose(0, 1).reshape(x.shape[0], -1, self.attention_dim * self.num_heads)

        return concatenated

class FeedForward(nn.Module):
    def __init__(self):
        super(FeedForward, self).__init__()
        pass

    def forward(self, x):
        pass

class LayerNorm(nn.Module):
    def __init__(self):
        super(LayerNorm, self).__init__()
        pass

    def forwards(self, x):
        pass

In [146]:
decoder = Decoder(layers=layers, epochs=epochs, batch_size=batch_size, embedding_dim=embedding_dim, dict_size=dict_size, feed_forward_dim=feed_forward_dim, num_heads=num_heads, attention_dim=attention_dim)

# numpy arr to tensor
test_tensor = torch.from_numpy(elon_data[0])
test_tensor = test_tensor.int()

decoder.forward(test_tensor)

torch.Size([8, 426, 16])
torch.Size([426, 1, 128])
