In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import json
import numpy as np
from operator import itemgetter
import random as rnd
import sys
import torch

## Local modules import

In [3]:
sys.path.append('..')

In [4]:
from data_loading import create_word_lists, tidy_sentence_length

In [5]:
from sklearn.model_selection import train_test_split

## Loading data

In [6]:
sys.path.append('../data')

In [7]:
with open('../data/corpus_data.json') as json_file:
    data = json.load(json_file)
data = data['records']

In [8]:
human_transcripts = [entry['human_transcript'] for entry in data]
stt_transcripts   = [entry['stt_transcript'] for entry in data]

In [9]:
human_words, stt_words, word_labels, word_grams, word_sems = \
    create_word_lists(data)

Some of the sentences are too long, so we need to shorten them.

In [10]:
stt_transcripts, stt_words, word_labels, word_grams, word_sems = \
    tidy_sentence_length(stt_transcripts, stt_words, word_labels, word_grams, word_sems)

## Train-test split

We need to extract which sentences contain German words in order to stratify the data split:

In [11]:
max_length = max(map(len, word_labels))
padded_labels = [row + [False] * (max_length - len(row)) for row in word_labels]
padded_labels = np.array(padded_labels)
stat_labels = np.any(padded_labels, axis=1)

Here, we split only indices and not data itself, because the data contains arrays of variable length, which does not work with `train_test_split`:

In [12]:
indices = list(range(len(stt_transcripts)))
tr_indices, te_indices = train_test_split(indices, test_size=0.2, random_state=0, shuffle=True, stratify=stat_labels)

These are hepler functions that will extract data selected by indices:

In [13]:
extract_train = itemgetter(*tr_indices)
extract_test  = itemgetter(*te_indices)

Finally, do data splitting:

In [14]:
tr_stt_transcripts   = extract_train(stt_transcripts)
tr_stt_words         = extract_train(stt_words)

tr_word_labels       = extract_train(word_labels)
tr_word_grams        = extract_train(word_grams)
tr_word_sems         = extract_train(word_sems)

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

te_stt_transcripts   = extract_test(stt_transcripts)
te_stt_words         = extract_test(stt_words)

te_word_labels       = extract_test(word_labels)
te_word_grams        = extract_test(word_grams)
te_word_sems         = extract_test(word_sems)

## BERT

In [15]:
import torch
from transformers import BertTokenizer, BertModel

  from .autonotebook import tqdm as notebook_tqdm


In [16]:
from bert_encoder import encode_sentence

In [17]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model_bert = BertModel.from_pretrained('bert-base-uncased', output_hidden_states=True)
model_bert.eval();

In [18]:
tr_stt_vectors = []
te_stt_vectors = []

Encode the corpus:

In [20]:
for sentence, words in zip(tr_stt_transcripts, tr_stt_words):
    tr_stt_vectors.append(
        encode_sentence(sentence, words, model_bert, tokenizer)
    )

In [21]:
for sentence, words in zip(te_stt_transcripts, te_stt_words):
    te_stt_vectors.append(
        encode_sentence(sentence, words, model_bert, tokenizer)
    )

Let's convert our data to tensors. This functionality will be moved to the Dataset class.

In [51]:
tr_tensor       = torch.vstack(tr_stt_vectors)
tr_label_tensor = torch.tensor([int(element) for sublist in tr_word_labels for element in sublist])
tr_grams_tensor = torch.tensor([int(element) for sublist in tr_word_grams  for element in sublist])
tr_sems_tensor  = torch.tensor([int(element) for sublist in tr_word_sems   for element in sublist])


te_tensor = torch.vstack(te_stt_vectors)
te_label_tensor = torch.tensor([int(element) for sublist in te_word_labels for element in sublist])
te_grams_tensor = torch.tensor([int(element) for sublist in te_word_grams  for element in sublist])
te_sems_tensor  = torch.tensor([int(element) for sublist in te_word_sems   for element in sublist])

# Multi-layer Perceptron

We could also try:
- regularization

In [226]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [227]:
torch_device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {torch_device} device.")

Using cuda device.


## MLP class

In [228]:
class MLP(nn.Module):

    def __init__(self, input_features, hidden_layers, neurons_per_layer):
        super().__init__()

        layers = []
        
        # Append the first layer
        layers.append(nn.Linear(input_features, neurons_per_layer))
        layers.append(nn.ReLU())

        # Append hidden layers
        for _ in range(hidden_layers - 1):
            layers.append(nn.Linear(neurons_per_layer, neurons_per_layer))
            layers.append(nn.ReLU())

        # Append output layer
        layers.append(nn.Linear(neurons_per_layer, 1))
        layers.append(nn.Sigmoid())

        # Create the layer sequence
        self.layers = nn.Sequential(*layers)


    def reset_weights(self):
        for layer in self.children():
            if hasattr(layer, 'reset_parameters'):
                layer.reset_parameters()


    def forward(self, x):
        return self.layers(x)

## Dataset class

In [229]:
class STTDataset(Dataset):

    def __init__(self, embeddings, labels):
        if not torch.is_tensor(embeddings):
            self.embeddings = torch.vstack(embeddings)
        else:
            self.embeddings = embeddings

        if not torch.is_tensor(labels):
            self.labels = torch.tensor([int(element) for sublist in labels for element in sublist])
        else:
            self.labels = labels

    def add_feature(self, feature_tensor):
        if not torch.is_tensor(feature_tensor):
            to_be_added = torch.tensor([int(element) for sublist in feature_tensor for element in sublist])
        else:
            to_be_added = feature_tensor
        self.embeddings = torch.cat((self.embeddings, to_be_added.unsqueeze(1)), dim=1)

    def __len__(self):
        return self.embeddings.shape[0]

    def __getitem__(self, idx):
        return self.embeddings[idx], self.labels[idx].item()

Let's check if Dataset works:

In [230]:
tr_dataset = STTDataset(tr_stt_vectors, tr_word_labels)

In [231]:
tr_dataset.embeddings.shape

torch.Size([49932, 768])

Add list of features:

In [232]:
tr_dataset.add_feature(tr_word_grams)

In [233]:
tr_dataset.embeddings.shape

torch.Size([49932, 769])

Add feature tensor:

In [234]:
tr_dataset.add_feature(tr_sems_tensor)

In [235]:
tr_dataset.embeddings.shape

torch.Size([49932, 770])

Adding the features seems to work.

In [236]:
tr_dataset = STTDataset(tr_stt_vectors, tr_word_labels)

In [237]:
dataloader = DataLoader(tr_dataset, batch_size=128, shuffle=True, num_workers=0)

## Training, testing, cross-validation

We still need to consider how to do data logging.

### Training

In [238]:
def train_model(model, criterion, optimizer, loader):
    model.train()

    losses = []

    for inputs, labels in loader:
        
        optimizer.zero_grad()

        outputs = model(inputs)
        outputs = torch.squeeze(outputs, dim=1)
        loss = criterion(outputs, labels.to(torch.float))

        loss.backward()
        optimizer.step()

        losses.append(loss.item())

    return np.array(losses).mean()

### Validation

Validation loader shuld have `batch_size=len(dataset)`.

In [239]:
 def validate_model(model, criterion, loader):
    model.eval()

    with torch.no_grad():
        for inputs, labels in loader:
            pred = model(inputs)
            pred = torch.squeeze(pred, dim=1)
            loss = criterion(pred, labels.to(torch.float)).item()

    return loss

### Cross-validation

In [240]:
from sklearn.model_selection import StratifiedKFold

If we don't need a dataloader, then this works fine.

In [241]:
def cross_validate_model(model, features, labels, criterion, optimizer, splitter, batch_size=128, num_workers=16):
    training_losses = []
    validation_losses = []

    for fold, (train_indices, val_indices) in enumerate(splitter.split(features, labels)):
        # Training data and dataloader
        tr_data = STTDataset(features[train_indices], labels[train_indices])
        tr_loader = DataLoader(tr_data, batch_size=batch_size, shuffle=True, num_workers=num_workers)

        # Testing data and dataloader
        va_data = STTDataset(features[val_indices], labels[val_indices])
        va_loader = DataLoader(va_data, batch_size=len(va_data), shuffle=False, num_workers=num_workers)

        tr_loss = train_model(model, criterion, optimizer, tr_loader)
        va_loss = validate_model(model, criterion, va_loader)

        training_losses.append(tr_loss)
        validation_losses.append(va_loss)
        model.reset_weights()
    
    training_losses = np.array(training_losses)
    validation_losses = np.array(validation_losses)
    return training_losses.mean(), training_losses.std(), validation_losses.mean(), validation_losses.std()

Let's test this abomination:

In [244]:
model = MLP(tr_tensor.shape[1], 12, 100)
features = tr_tensor
labels = tr_label_tensor
criterion = nn.BCELoss().to(torch_device)
optimizer = optim.Adam(model.parameters(), lr=1e-5)
splitter = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

In [245]:
cross_validate_model(model, features, labels, criterion, optimizer, splitter)

(0.28757988191284123,
 0.22084628393594907,
 0.22555887699127197,
 0.21150503853427366)