In [249]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import nltk
import string
import gensim
import numpy as np
import pandas as pd
from sklearn.feature_extraction import DictVectorizer
from nltk.tokenize import sent_tokenize, word_tokenize
from gensim.models.word2vec import Word2Vec
from gensim.models import Word2Vec
from sklearn.model_selection import KFold
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import ParameterGrid

In [6]:
nltk.download('treebank')

[nltk_data] Downloading package treebank to
[nltk_data]     C:\Users\Admin\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping corpora\treebank.zip.


True

In [254]:
class RNN_1(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(RNN, self).__init__()

        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        self.i2h = nn.Linear(input_size, hidden_size)
        self.h2h = nn.Linear(hidden_size, hidden_size)
        self.h2o = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):
        hidden = F.tanh(self.i2h(input) + self.h2h(hidden))
        output = self.h2o(hidden)
        output = self.softmax(output)
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, self.hidden_size)

class RNN_2(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.rnn = torch.nn.RNN(
            input_size,
            hidden_size,
            nonlinearity='relu',
            batch_first=False
        )
        self.linear = torch.nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h = self.rnn(x)[0]
        x = self.linear(h)
        return x
    
    def reset_parameters(self):
        for layer in model.children():
            if hasattr(layer, 'reset_parameters'):
                layer.reset_parameters()

In [255]:
def f1_score(tp, fp, fn):
    return (2 * tp) / (2 * tp + fp + fn)

def precision_score(tp, fp):
    return tp / (tp + fp)

def accuracy_score(tp, fp, tn, fn):
    return (tp + tn) / (tp + fp + tn + fn)

def recall_score(tp, fn):
    return tp / (tp + fn)

def flatten(matrix):
    flat_list = []
    for row in matrix:
        flat_list += row
    return flat_list

In [292]:
all_categories = ['DET', 'NOUN', 'ADJ', 'VERB', 'ADP', '.', 'ADV', 'CONJ', 'PRT', 'PRON', 'NUM', 'X']
embedding_size = 100
feature_size = embedding_size + 6

# prepare dataset
tagged_sentences = nltk.corpus.treebank.tagged_sents(tagset='universal')
all_sentences = []
max_length = -1
for sentence in tagged_sentences:
    t_sentence = []
    for (word, tag) in sentence:
        t_sentence.append(word)
        
    if len(t_sentence) > max_length:
        max_length = len(t_sentence)
    all_sentences.append(t_sentence)

# create word-embeddings
embeddings = Word2Vec(
    sentences=all_sentences,
    vector_size=embedding_size,
    window=5,
    min_count=1,
    workers=4
)

def feature_engineer(word):
    w = np.concatenate((
            embeddings.wv[word],
            np.array([
                word.isalpha(),  # alphabetic
                "-" in word,
                word.isdigit(),
                word.islower(),
                word.istitle(),
                word.isupper(),
            ]).astype(int)
        )
    )
    return w

# create feature/class tensors
X = torch.zeros((len(all_sentences), max_length, feature_size))
y = torch.zeros((len(all_sentences), max_length))
print(X.size(), y.size())

for i, sentence in enumerate(tagged_sentences):
    s_len = len(sentence)
    for j, (word, tag) in enumerate(sentence):
        j_offset = max_length - (s_len - j)  # pre-pad
        X[i, j_offset] = torch.from_numpy(feature_engineer(word))
        y[i, j_offset] = torch.tensor([all_categories.index(tag)], dtype=torch.long)
    
# normalize tensors
X = torch.nn.functional.normalize(X)

torch.Size([3914, 271, 106]) torch.Size([3914, 271])


In [263]:
K_FOLDS = 5

def run_cv(model, model_idx, cv):
    results = []
    k_fold = KFold(n_splits=K_FOLDS, shuffle=True, random_state=777)
    for k, (train_idx, test_idx) in enumerate(k_fold.split(X, y)):
        
        # reset parameters
        model.reset_parameters()
        criterion = cv['criterion']()
        optimizer = cv['optimizer'](
            model.parameters(),
            lr=cv['lr'],
        )
        
        # split fold into training & testing sets
        X_train, y_train, X_test, y_test = X[train_idx], y[train_idx], X[test_idx], y[test_idx]

        for epoch in range(cv['epoch']):
            # train the model
            loss = train(model, X_train, y_train, criterion, optimizer)
            
            # test the model
            y_pred, loss = test(model, X_test, y_test, criterion)
            
            # evaluate the model
            accuracy = float(torch.sum(y_pred == y_test) / y_test.nelement())
            results.append({
                'fold': k,
                'epoch': epoch,
                'loss': loss,
                'accuracy': accuracy,
                'model_id': model_idx,
                'criterion': cv['criterion'].__name__,
                'optimizer': cv['optimizer'].__name__,
                'learning_rate': cv['lr'],
                'n_hidden': cv['hidden'],
                'max_epochs': cv['epoch'],
            })

    return results

In [257]:
def train(model, X_train, y_train, criterion, optimizer):
    # set the model to training mode
    model.train()
    
    # Clear the gradient buffers of the optimized parameters.
    optimizer.zero_grad()

    # Perform the forward pass of the model
    output = model(X_train)

    # Pick only the output corresponding to last sequence element (input is pre padded)
    output = output[:, -1, :]
    target = y_train[:, -1].long()
    
    loss = criterion(output, target)
    loss.backward()
    optimizer.step()

    return loss.item()

In [258]:
def test(model, X_test, y_test, criterion):
    model.eval()
    
    with torch.no_grad():
        output = model(X_test)
        
        y_pred = output.argmax(dim=2)

        # Pick only the output corresponding to last sequence element (input is pre padded)
        output = output[:, -1, :]
        target = y_test[:, -1].long()
        loss = criterion(output, target)
        
    return y_pred, loss.item()

In [296]:
"""
Run K-Folds Cross-Validation.
"""

# set the random seed
torch.manual_seed(777)

# define cross-validation search parameters
cv_params = {
    'criterion': [torch.nn.NLLLoss, torch.nn.CrossEntropyLoss],
    'optimizer': [torch.optim.RMSprop, torch.optim.Adam],
    'hidden': [64, 128],
    'lr': [0.001, 0.0001], # list(np.logspace(-4, -2, num=3)),
    'epoch': [100],
}

# run & evaluate each model configuration
results = []
for model_idx, cv in enumerate(list(ParameterGrid(cv_params))):
    # debug
    print(f"# model_idx={model_idx}, {cv}\n")
    
    # define model
    model = RNN_2(
        input_size=feature_size,
        hidden_size=cv['hidden'],
        output_size=len(all_categories)
    )

    # run cross-validation & report results
    results.append(
        run_cv(
            model=model,
            model_idx=model_idx,
            cv=cv,
        )
    )

# model_idx=0, {'criterion': <class 'torch.nn.modules.loss.NLLLoss'>, 'epoch': 100, 'hidden': 64, 'lr': 0.001, 'optimizer': <class 'torch.optim.rmsprop.RMSprop'>}

# model_idx=1, {'criterion': <class 'torch.nn.modules.loss.NLLLoss'>, 'epoch': 100, 'hidden': 64, 'lr': 0.001, 'optimizer': <class 'torch.optim.adam.Adam'>}

# model_idx=2, {'criterion': <class 'torch.nn.modules.loss.NLLLoss'>, 'epoch': 100, 'hidden': 64, 'lr': 0.0001, 'optimizer': <class 'torch.optim.rmsprop.RMSprop'>}

# model_idx=3, {'criterion': <class 'torch.nn.modules.loss.NLLLoss'>, 'epoch': 100, 'hidden': 64, 'lr': 0.0001, 'optimizer': <class 'torch.optim.adam.Adam'>}

# model_idx=4, {'criterion': <class 'torch.nn.modules.loss.NLLLoss'>, 'epoch': 100, 'hidden': 128, 'lr': 0.001, 'optimizer': <class 'torch.optim.rmsprop.RMSprop'>}

# model_idx=5, {'criterion': <class 'torch.nn.modules.loss.NLLLoss'>, 'epoch': 100, 'hidden': 128, 'lr': 0.001, 'optimizer': <class 'torch.optim.adam.Adam'>}

# model_idx=6, {'criterion': <

In [302]:
# analyze run results
df = pd.DataFrame.from_records(flatten(results)).set_index(['model_id'])
# print(df)
df.groupby(by=['model_id', 'fold'])[['loss', 'accuracy']].agg("max")

Unnamed: 0_level_0,Unnamed: 1_level_0,loss,accuracy
model_id,fold,Unnamed: 2_level_1,Unnamed: 3_level_1
0,0,-0.858403,0.912980
0,1,-2.132133,0.914644
0,2,-1.115744,0.912325
0,3,-1.347072,0.912617
0,4,-1.164448,0.913987
...,...,...,...
15,0,2.500520,0.012220
15,1,2.598292,0.011532
15,2,2.649918,0.017800
15,3,2.416424,0.918499
