# Donald Trump tweet generator

We will be creating a model that learns from text input and generates more text on the fly.

Using a Donald Trump tweet dataset we'll see if our model can successfully generate Trump-like tweets on the fly.

The model will be using character based predictions to predict the next most likely character given a sequence of characters. For example, given the sequence `Donald Trum`, it should ideally predict `p`.

# Dataset

The dataset consists of tweets 5674 tweets from Donald Trump.
Originally taken from https://www.kaggle.com/ayushggarg/all-trumps-twitter-insults-20152021

We create a `TweetDataset` class that inhertits from PyTorch's `Dataset` class.
In doing so, we must overload the following methods:
- `__len__(..)`: return the number of training samples
- `__getitem__(..)`: return sample with given index

We don't train on a per-tweet basis but on a per-sequence basis. That is we define a sequence to be a string of `seq_len` characters which we will feed in as training data.
In the constructor, we therefore split all our tweets into sequences which we space-pad if too short.

Note that if our sequence length is 4, we must actually sample from a string of 5 characters. For instance, the string `hello` will give input sequence `hell` and target sequence `ello`, shifted by one since we feed in a character to predict the next one.

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd

class TweetDataset(Dataset):
    def __init__(self, path, seq_len):
        self.seq_len = seq_len
        
        self.data = pd.read_csv(path)
        
        self.chars = self._get_unique_chars()
        self.vocab_size = len(self.chars)
        print('Vocab size: ' + str(self.vocab_size))
        print(self.chars)
        
        # Dicts used to assign a unique number to every distinct char in data
        self.int2char = dict(enumerate(self.chars))
        self.char2int = { c: i for i, c in self.int2char.items() }
        
        # List of pairs [[input, target], ...]
        self.sequences = self._create_sequences(self.seq_len)
        
    def __len__(self):
        return len(self.sequences)
        
    # Returns input and target sequences for given index
    # input is of shape [seq_len, vocab_size] and is one-hot encoded
    # target is of shape [seq_len] and is NOT one-hot encoded
    def __getitem__(self, idx):
        inpt = self.sequences[idx][0] # Get input sequence
        inpt = self._encode_sequence(inpt) # Encode every char into its integer representation
        inpt = self._one_hot_sequence(inpt, self.vocab_size) # One hot every integer
        
        target = self.sequences[idx][1]
        target = self._encode_sequence(target)
        
        inpt = torch.Tensor(inpt)
        target = torch.Tensor(target)
        
        return inpt, target
    
    # Encodes a given sequence.
    def _encode_sequence(self, seq):
        return [self.char2int[c] for c in seq]
    
    # One hot encodes a single integer. Returns a numpy vector of size VOCAB_SIZE
    def _one_hot_sequence(self, seq, vocab_size):
        enc = []

        for s in seq:
            vec = [0] * vocab_size
            vec[s] = 1
            enc.append(vec)

        return enc
    
    def _create_sequences(self, seq_len):
        seqs = []
        for tweet in self.data['tweet'].values:
            for i in range(0, len(tweet), seq_len + 1):
                seq = tweet[i:i+seq_len+1]

                # Pad sequence if needed
                if len(seq) < seq_len+1:
                    seq += ' ' * (seq_len+1 - len(seq))

                assert(len(seq) == seq_len+1)

                seqs.append(seq)
                
        inputs = []
        targets = []

        for seq in seqs:
            inputs.append(seq[:-1])
            targets.append(seq[1:])
            
        return list(zip(inputs, targets))
    
    def _get_unique_chars(self):
        chars = set()

        for tweet in self.data['tweet'].values:
            for char in tweet:
                chars.add(char)

        chars = list(chars)
        chars.sort()

        return chars

# Model

The model is a simple RNN with one GRU cell that then feeds into a fully connected layer for output.

The `input_size` is the size of the input to the GRU cell. In our case this will be the size of our vocab list, or in other words the number of possible characters we're using.

`hidden_size` is the size of the hidden vector of the GRU cell which recurs at each step. We can try out different sizes to see if performance changes.

`output_size` is the output size of the fully connected layer that takes in as input the GRU hidden state and generates the output of the model. Since we're predicting characters, this will also have to be the size of our vocab list.

`n_layers` is the number of GRU cells we want. We'll stick with 1 for now as stacking multiple cells slows down training quite substantially. Fell free to experiment aside.

The `init_hidden(..)` method is used to initialise the hidden layer of the GRU cell. We start off with a zero matrix.

Note that the `forward` method takes as input arguments `x` (the input) and `hidden` (the hidden layer) and returns an output along with the *updated* hidden layer. This is what allows us to recur by passing the hidden layer around at every forward pass.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class GRU(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, n_layers=1):
        super(GRU, self).__init__()

        # Size of input vector (= number of possible characters)
        self.input_size = input_size
        self.hidden_size = hidden_size
        # Size of output vector (= number of possible characters)
        self.output_size = output_size
        self.n_layers = n_layers

        # With batch_first off, expected input shape is [seq_len, batch_size,
        # input_size]. With on, it's [batch_size, seq_len, input_size]
        self.gru = nn.GRU(input_size, hidden_size, n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    # Shape of x: [batch_size, seq_len, input_size]
    # Shape of hidden: [n_layers, batch_size, hidden_size]
    # Shape of output: [batch_size, input_size]
    def forward(self, x, hidden):
        output, hidden = self.gru(x, hidden)

        output = output.contiguous().view(-1, self.hidden_size)
        output = self.fc(output)

        return output, hidden

    def init_hidden(self, batch_size):
        return torch.zeros(self.n_layers, batch_size, self.hidden_size)

# Training

We first define a few hyperparameters which we'll be able to tweak.

In [None]:
BATCH_SIZE = 32
N_EPOCHS = 10
PRINT_EVERY = 1
LEARNING_RATE = 0.001
SEQ_LEN = 32
N_LAYERS = 1
HIDDEN_SIZE = 128

We then define our training loop, first iterating for the given number of epochs and for each epoch throught the entire dataset.

The dataset gives us batches which we can feed into our model and then backpropagate.
Note that the PyTorch model doesn't take in individual inputs and outputs but batches them directly for us, so `x` and `y` contain `BATCH_SIZE` samples which we all feed in at once.

You can see that we initialise our hidden layer at the beginning and reset it for each batch.

In [None]:
def train(model, optimizer, loss_fn, data, device='cpu'):
    for epoch in range(1, N_EPOCHS + 1):
        loss_avg = 0

        # Iterate over batches
        for i, batch in enumerate(data):
            # Get input and target and send them to given device (cpu/gpu)
            x, y = batch
            x = x.to(device)
            y = y.to(device)
            
            # Init a new hidden layer
            hidden = model.init_hidden(BATCH_SIZE).to(device)
            hidden.to(device)

            # Reset optimizer
            optimizer.zero_grad()
            
            # Pass in batch
            output, hidden = model(x, hidden)
            output.to(device)

            # Compute loss
            loss = loss_fn(output, y.view(-1).long())
            loss_avg += loss

            # Backprop
            loss.backward()
            optimizer.step()

        loss_avg = loss_avg.item() / BATCH_SIZE
        if epoch % PRINT_EVERY == 0:
          print('Epoch ' + str(epoch) + ': ' + str(loss_avg))

Now we can instantiate all our classes and call our training loop.

In [None]:
dataset = TweetDataset('./trump.csv', SEQ_LEN)
# Make dataloader out of our dataset, this will allow us to serve batches
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, drop_last=True)

# Train on GPU if possible
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('Training on ' + str(device))

# Create model
input_size = dataset.vocab_size
model = GRU(input_size, HIDDEN_SIZE, input_size, N_LAYERS)
model.to(device)

# Instantiate optimizer & loss function
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
loss_fn = nn.CrossEntropyLoss()

# Train
train(model, optimizer, loss_fn, dataloader)

# Save model
torch.save(model.state_dict(), './model.pt')

To load the model, run

In [None]:
model = GRU(input_size, HIDDEN_SIZE, input_size, N_LAYERS)
model.load_state_dict(torch.load('./model.pt'))

# Generating tweets

Now that our model has been trained we can use it to generate tweets on the fly!

Remember that our model does character based prediction, we therefore need a start sequence to kick it off.
We also ask for the length of the sequence we wish to generate.

First we start by passing in all the characters of the start sequence, discarding the output but keeping the hidden state to initialise our GRU cell.

Then for the length of the sequence, continuously predict the next character and feed it back in to get the next prediction and so on.

In [None]:
def generate(model, dataset, start, length):
    # Format input
    start = dataset._encode_sequence(start)
    start = dataset._one_hot_sequence(start, dataset.vocab_size)
    start = torch.Tensor(start)

    # Init hidden layer
    hidden = model.init_hidden(1)
    hidden.to(device)

    # Pass in start input, ignoring output
    for s in start:
        s = s.view(1, 1, -1)
        _, hidden = model(s, hidden)

    inpt = start[-1]
    inpt = inpt.view(1, 1, -1)

    res = ''
    for i in range(length):
        output, hidden = model(inpt, hidden)

        # Discard batch size dim
        output = output.view(-1)

        # Get softmax distribution
        output_dist = F.softmax(output, dim=0)

        # Sample predicted char
        top = torch.multinomial(output_dist, 1)[0]
        top = top.detach().numpy()
        top = top.item(0)
        char = dataset.int2char[top]
        res += char

        # Create next input as one hot encoded vector of top char
        inpt = dataset._encode_sequence(char)
        inpt = dataset._one_hot_sequence(inpt, dataset.vocab_size)[0]
        inpt = torch.Tensor(inpt)
        inpt = inpt.view(1, 1, -1)

    return res

In [None]:
res = generate(model, dataset, 'hello', 140)

res