In [None]:
import os
from pathlib import Path
from typing import List

import tensorflow as tf
from keras.layers import (Dense, Conv1D)
from keras.models import Sequential
from transformers import BertTokenizer, TFBertModel

import matplotlib.pyplot as plt

# Read the natural language understanding dataset and BERT model

Clone the repos inside intent-detection directory
```
git clone https://github.com/tilde-nlp/NLU-datasets.git
git clone https://huggingface.co/bert-base-multilingual-cased
```

Directory tree should be as follows
```
/intent-detection
├── NLU-datasets
├── bert-base-multilingual-cased
├── run-on-windows.ipynb
```

In [None]:
if "NLU-datasets" not in os.getcwd():
    os.chdir("./NLU-datasets")

In [None]:
def get_data(path: str) -> List[str]:
    """ Read path and append each line without \n as an element to an array.
    Encoding is specified to correctly read files in Russian.
    Example output: ['FindConnection', 'FindConnection', ..., 'FindConnection']
    """
    with open(path, encoding='utf-8') as f:
        array = []
        for line in list(f):
            array.append(line.split('\n')[0])
        return array

In [None]:
path_list = Path("chatbot").glob("**/*.txt")

for path in path_list:
    # because path is object not string
    path_in_str = str(path)
    # print(path_in_str)
    if path_in_str == "chatbot\chatbot_train_ans.txt":
        train_answers = get_data(path_in_str)
    elif path_in_str == "chatbot\chatbot_test_ans.txt":
        test_answers  = get_data(path_in_str)
    elif path_in_str == "chatbot\en\chatbot_test_q.txt":
        en_test  = get_data(path_in_str)
    elif path_in_str == "chatbot\en\chatbot_train_q.txt":
        en_train  = get_data(path_in_str)
    elif path_in_str == "chatbot\lv\chatbot_test_q.txt":
        lv_test  = get_data(path_in_str)
    elif path_in_str == "chatbot\lv\chatbot_train_q.txt":
        lv_train  = get_data(path_in_str)
    elif path_in_str == "chatbot\\ru\chatbot_test_q.txt":
        ru_test  = get_data(path_in_str)
    elif path_in_str == "chatbot\\ru\chatbot_train_q.txt":
        ru_train  = get_data(path_in_str)
    elif path_in_str == "chatbot\et\chatbot_test_q.txt":
        et_test  = get_data(path_in_str)
    elif path_in_str == "chatbot\et\chatbot_train_q.txt":
        et_train  = get_data(path_in_str)
    elif path_in_str == "chatbot\lt\chatbot_test_q.txt":
        lt_test  = get_data(path_in_str)
    elif path_in_str == "chatbot\lt\chatbot_train_q.txt":
        lt_train  = get_data(path_in_str)


print(train_answers)

In [None]:
assert len(train_answers) == len(en_train)

In [None]:
if "NLU-datasets" in os.getcwd():
    os.chdir("..")

print(os.getcwd())

In [None]:
# define model and tokenizer
model_name = "bert-base-multilingual-cased" # loading from huggingface
model_name = "./bert-base-multilingual-cased" # loading from local path

tokenizer = BertTokenizer.from_pretrained(model_name)
model_bert = TFBertModel.from_pretrained(model_name)

# Define classifier

In [None]:
def create_model_one_layer(sentence_length: int, units: int = 2, hidden_size: int = 768):
    """
    returns <tf.Tensor: shape=(1, 1, units), dtype=float32>
    e.g. <tf.Tensor: shape=(1, 1, 2), dtype=float32>
    where 2 = units
    """
    model = Sequential()
    model.add(tf.keras.Input(shape=(sentence_length, hidden_size)))
    model.add(Dense(units, activation='softmax'))
    model.add(Conv1D(units, sentence_length, padding="valid", activation="softmax"))
    model.add(Dense(units, activation='softmax'))
    return model


def create_adam_optimizer(lr=0.001, beta_1=0.9, beta_2=0.999, decay=0, epsilon=None, amsgrad=False):
    # TODO: Replace legacy optimizer with current version of Adam
    return tf.keras.optimizers.legacy.Adam(learning_rate=lr, beta_1=beta_1, beta_2=beta_2, epsilon=epsilon, decay=decay, amsgrad=amsgrad)


# use keras.to_categorical() instead
def encode_labels(answers: List) -> List:
    """ Encode labels in one hot-encoding
    'FindConnection' corresponds to [[1, 0]]
    'DepartureTime' corresponds to [[0, 1]]
    """
    y = []
    for answer in answers:
        if answer == 'FindConnection':
            y.append([[1, 0]])
        else:
            y.append([[0, 1]])
    return y

In [None]:
labels = encode_labels(train_answers)
labels_expanded = tf.convert_to_tensor(labels)

# Test on a small example

## Sentence -> word embedding

In [None]:
batch_size = 4
sentence_length = 20

text = en_train[0:batch_size]
encoded_input = tokenizer(text, padding='max_length', max_length=sentence_length, truncation=True, return_tensors='tf')
encoded_input

In [None]:
# odict_keys(['last_hidden_state', 'pooler_output'])
inputs = model_bert(encoded_input)["last_hidden_state"]
inputs.shape

## Word embedding -> class

In [None]:
learning_rate = 0.03
optimizer = create_adam_optimizer(lr=learning_rate)
classification_model = create_model_one_layer(sentence_length=sentence_length)

classification_model.compile(
    optimizer=optimizer,
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# initial probabilities
classification_model(inputs)

In [None]:
epochs = 5

labels_expanded = tf.convert_to_tensor(labels[0:batch_size])

classification_model.fit(inputs, y=labels_expanded, epochs=epochs)

In [None]:
# view the output of the classification_model: probabilities for labels

classification_model(inputs)

# Run on training dataset

In [None]:
labels = encode_labels(train_answers)
labels_expanded = tf.convert_to_tensor(labels)

In [None]:
def get_classification_model(learning_rate: int, sentence_length: int):
    optimizer = create_adam_optimizer(lr=learning_rate)
    classification_model = create_model_one_layer(sentence_length=sentence_length)

    classification_model.compile(
        optimizer=optimizer,
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    return classification_model


def plot_performance(data, dataset: str, x_label: str = 'accuracy'):
    plt.plot(data)
    ax = plt.gca()
    ax.set_xlabel('epochs')
    ax.set_ylabel(x_label)
    plt.title(f"model {x_label}")
    plt.savefig(f"{dataset}-{x_label}.png")
    # plt.savefig(f"{dataset}{x_label}.pdf", dpi=150) # pdf for LaTeX
    plt.show()


def training(train_dataset, dataset_name: str, learning_rate: int, sentence_length: int, labels_expanded=labels_expanded):

    classification_model = get_classification_model(learning_rate, sentence_length)

    encoded_input = tokenizer(train_dataset, padding='max_length', max_length=sentence_length, truncation=True, return_tensors='tf')
    classification_input = model_bert(encoded_input)["last_hidden_state"]

    print(labels_expanded)
    history = classification_model.fit(classification_input, y=labels_expanded, batch_size=batch_size, epochs=number_of_epochs)
    # predictions = classification_model(classification_input)

    plot_performance(history.history['accuracy'], dataset=dataset_name, x_label='accuracy')
    plot_performance(history.history['loss'], dataset=dataset_name, x_label='loss')

    return classification_model

In [None]:
batch_size = 25
sentence_length = 20
learning_rate = 0.0003
number_of_epochs = 150

## English

In [None]:
classification_model_en = training(en_train, dataset_name="en_train", learning_rate=learning_rate, sentence_length=sentence_length)