In [212]:
import torch
from torch import nn
import pandas as pd
import numpy as np
import re
from transformers import PreTrainedTokenizerFast
import unidecode
import yaml
from typing import List, Dict

In [213]:
# Load & configure tokeninzer
tokenizer = PreTrainedTokenizerFast.from_pretrained('./tokenizers/gpt2_2k')
tokenizer.add_special_tokens({'pad_token': '[PAD]', 'mask_token': '[MASK]'})

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'GPT2Tokenizer'. 
The class this function is called from is 'PreTrainedTokenizerFast'.


0

In [214]:
# Load config file for the model
with open(f'./configs/lstm.yaml', 'r') as in_file:
    cfg = yaml.load(in_file, Loader=yaml.FullLoader)
hparams = cfg['model']['architecture']

In [226]:
# Function to clean & normlize text
def clean(text):
    t = text.lower()
    t = t.replace('\\n', ' ').replace('\\t', ' ').replace('\t', ' ').replace('. com', '.com')
    t = re.sub(r'https?:\/\/[a-z.\/A-Z\d]*', ' ', t)
    t = re.sub(r"\ [A-Za-z]*\.com", ' ', t)
    t = re.sub(r"@\S+", '', t)
    t = t.replace('@', '')
    t = unidecode.unidecode(t)
    t = t.replace('#', '_')
    
    to_replace = ["&quot;", ':&lt;', ':&gt;', '&amp;', '-&lt;', '-&gt;', '=&lt;', '=&gt;', 's&lt;', 's&gt;']
    for x in to_replace:
        t = t.replace(x, '')
    pattern = re.compile(r"([A-Za-z])\1{1,}", re.DOTALL)
    t = pattern.sub(r"\1\1", t)
    
    pattern = re.compile(r"([\s.,\/#!$%^&*?;:{}=_`()+-])\1{1,}")
    t = pattern.sub(r'\1', t)
    t = re.sub(' {2,}', '', t)
    t = t.lower()
    t = t.strip()
    t = t.rstrip()
    return t

In [227]:
#twitt = "@Orange I looooooove the new Livebox, it's so pretty #Livebox6"
twitt = "@Bosch my freaking battery catch fire last night rosting my cat #RIPfluffy"

In [228]:
clean(twitt)

'my freaking battery catch fire last night rosting my cat _ripfluffy'

In [229]:
input_ids = tokenizer(twitt)['input_ids']

In [230]:
# Load the model weights
model_weights = torch.load('./models/lstm_128.pt')

## Embeddings Layer
A simple lookup table that stores embeddings of a fixed dictionary and size.

This module is used to store tokens embeddings and retrieve them using indices. The input to the module is a list of indices, and the output is the corresponding word embeddings.

In [231]:
class NpEmbeddings:
    def __init__(self, model: Dict):
        self.embs = dict(zip(range(2000), model['encoder.embs.weight'].numpy()))
        
    def forward(self, x: List[int]):
        return np.array([self.embs[idx] for idx in x])

    def __call__(self, x: List[int]):
        return self.forward(x)

In [232]:
embs = NpEmbeddings(model_weights)

In [233]:
x = embs(input_ids)
x

array([[-0.00664131, -0.00703674, -0.01127713, ...,  0.00683154,
         0.00050304, -0.01573904],
       [-0.00697829,  0.01343679, -0.0078092 , ...,  0.00431453,
        -0.00435441, -0.00223201],
       [ 0.06620484,  0.16069634, -0.03993553, ..., -0.10422891,
        -0.05582594,  0.02631515],
       ...,
       [ 0.06088695, -0.0635412 , -0.08631796, ...,  0.01144394,
         0.12519038,  0.05046234],
       [ 0.2222735 , -0.13870618,  0.13226719, ...,  0.10218486,
        -0.04203468, -0.00406977],
       [ 0.0276247 ,  0.03676317, -0.0945969 , ..., -0.01917078,
        -0.09409595,  0.19517466]], dtype=float32)

## Layer Normalization
Layer Normalization normalizes the data with respect to the last dimension of the tensor. The formula is 

$y = \frac{x-E[x]}{\sqrt{Var[x]+1e-4}}*\gamma+\beta$

The mean and the standard deviation are calculated over the last dimension. The parameters $\gamma$ and $\beta$ are learn during training.

In [234]:
class NpLayerNorm:
    def __init__(self, hparams: Dict, model: Dict):
        self.hparams = hparams
        self.eps = 1e-5
        self.gamma = torch.load('./models/lstm_128.pt')['encoder.embs_norm.weight'].numpy()
        self.beta = torch.load('./models/lstm_128.pt')['encoder.embs_norm.bias'].numpy()
        
    def forward(self, x: np.array):
        assert x.shape[-1] == self.hparams['hidden_dim']
        assert len(x.shape) == 2
        r = np.zeros(x.shape)
        for i in range(x.shape[0]):
            r[i] = (x[i] - np.mean(x[i])) / (np.sqrt(np.var(x[i]) + self.eps)) * self.gamma + self.beta
        return r
    
    def __call__(self, x: np.array):
        return self.forward(x)

In [235]:
npLN = NpLayerNorm(hparams, model_weights)

In [236]:
x = npLN(x)

# LSTM Layer
Applies a multi-layer long short-term memory (LSTM) RNN to an input sequence.

For each element in the input sequence, the following function is computed:

![](img/lstm.png)

In [237]:
def sigmoid(x: np.array):
    return 1 / (1 + np.exp(-x))

class NpLSTM:
    def __init__(self, hparams: Dict, model: Dict):
        self.hparams = hparams
        self.W_ii, self.W_if, self.W_ig, self.W_io = model['encoder.lstm.weight_ih_l0'].split(self.hparams['hidden_dim'])
        self.W_hi, self.W_hf, self.W_hg, self.W_ho = model['encoder.lstm.weight_hh_l0'].split(self.hparams['hidden_dim'])
        self.b_ii, self.b_if, self.b_ig, self.b_io = model['encoder.lstm.bias_ih_l0'].split(self.hparams['hidden_dim'])
        self.b_hi, self.b_hf, self.b_hg, self.b_ho = model['encoder.lstm.bias_hh_l0'].split(self.hparams['hidden_dim'])
        
        self.W_ii, self.W_if, self.W_ig, self.W_io = self.W_ii.numpy(), self.W_if.numpy(), self.W_ig.numpy(), self.W_io.numpy()
        self.W_hi, self.W_hf, self.W_hg, self.W_ho = self.W_hi.numpy(), self.W_hf.numpy(), self.W_hg.numpy(), self.W_ho.numpy()
        self.b_ii, self.b_if, self.b_ig, self.b_io = self.b_ii.numpy(), self.b_if.numpy(), self.b_ig.numpy(), self.b_io.numpy()
        self.b_hi, self.b_hf, self.b_hg, self.b_ho = self.b_hi.numpy(), self.b_hf.numpy(), self.b_hg.numpy(), self.b_ho.numpy()
        
    def lstm_cell(self, x_t, h_tm1, c_tm1):
        i_t = sigmoid(
            np.dot(self.W_ii, x_t) + self.b_ii + np.dot(self.W_hi, h_tm1) + self.b_hi
        )
        f_t = sigmoid(
            np.dot(self.W_if, x_t) + self.b_if + np.dot(self.W_hf, h_tm1) + self.b_hf
        )
        g_t = np.tanh(
            np.dot(self.W_ig, x_t) + self.b_ig + np.dot(self.W_hg, h_tm1) + self.b_hg
        )
        o_t = sigmoid(
            np.dot(self.W_io, x_t) + self.b_io + np.dot(self.W_ho, h_tm1) + self.b_ho
        )

        c_t = f_t * c_tm1 + i_t * g_t
        h_t = o_t * np.tanh(c_t)

        return o_t, h_t, c_t

    def forward(self, x: np.array):
        assert len(x.shape) == 2
        assert x.shape[1] == self.hparams['hidden_dim']
        
        x_t, h_t, c_t = None, np.zeros(self.hparams['hidden_dim']), np.zeros(self.hparams['hidden_dim'])
        for i in range(x.shape[0]):
            x_t = x[i]
            _, h_t, c_t = self.lstm_cell(x_t, h_t, c_t)

        return h_t
    
    def __call__(self, x: np.array):
        return self.forward(x)

In [238]:
nplstm = NpLSTM(hparams, model_weights)

In [239]:
x = nplstm(x)

## Classification Head
The classification head is simply two dense layer with a tanh in between. Both layers have biases.

The head output the logit, to get the probability don't forget to apply a sigmoid to the logit.

In [240]:
class NpClassificationHead:
    def __init__(self, hparams: Dict, model: Dict):
        self.hparams = hparams
        self.dense1_w = model['fc.dense1.weight'].numpy().T
        self.dense1_b = model['fc.dense1.bias'].numpy()
        
        self.dense2_w = model['fc.dense2.weight'].numpy().T
        self.dense2_b = model['fc.dense2.bias'].numpy()
    
    def forward(self, x: np.array):
        assert len(x.shape) == 1
        assert x.shape[0] == self.hparams['hidden_dim']
        
        x = np.dot(x, self.dense1_w) + self.dense1_b
        x = np.tanh(x)
        x = np.dot(x, self.dense2_w) + self.dense2_b
        
        return x
    
    def __call__(self, x: np.array):
        return self.forward(x)

In [241]:
npfc = NpClassificationHead(hparams, model_weights)

In [242]:
x = npfc(x)

In [243]:
prob = sigmoid(x)
prob

array([0.1304966])

## Putting it all Together
By putting all layers together gives us the classifier model.

In [244]:
class NpClassifier:
    def __init__(self, hparams: Dict, model: Dict):
        self.hparams = hparams
        self.embs = NpEmbeddings(model)
        self.lnorm = NpLayerNorm(hparams, model)
        self.lstm = NpLSTM(hparams, model)
        self.fc = NpClassificationHead(hparams, model)
        
    def forward(self, input_ids: List[int]):
        x = self.embs(input_ids)
        x = self.lnorm(x)
        x = self.lstm(x)
        x = self.fc(x)
        x = sigmoid(x)
        return float(x)
    
    def __call__(self, input_ids: List[int]):
        return self.forward(input_ids)

In [245]:
classifier = NpClassifier(hparams, model_weights)

In [246]:
classifier(input_ids)

0.13049660450326692

In [247]:
# Function to evaluate the model on the data.
# in:
#    data: list of twitts
# out:
#    res: a list of tuples (twitt, pred)

def predict(data: List[str]):
    data_clean = [clean(t) for t in data]
    preds = [classifier(tokenizer(t)['input_ids']) for t in data_clean]
    preds = [float(x) for x in preds]
    return list(zip(data, preds))

In [248]:
data = [
    "I just created my first LaTeX file from scratch. That didn't work out very well. (See @amandabittner , it's a great time waster)",
    "AHH YES LOL IMA TELL MY HUBBY TO GO GET ME SUM MCDONALDS =]",
    "RT @shrop: Awesome JQuery reference book for Coda! http://www.macpeeps.com/coda/ #webdesign"
]

In [249]:
predict(data)

[("I just created my first LaTeX file from scratch. That didn't work out very well. (See @amandabittner , it's a great time waster)",
  0.25448374732423096),
 ('AHH YES LOL IMA TELL MY HUBBY TO GO GET ME SUM MCDONALDS =]',
  0.8471391608879367),
 ('RT @shrop: Awesome JQuery reference book for Coda! http://www.macpeeps.com/coda/ #webdesign',
  0.9531386291444308)]