# Preliminary Steps
These are some preliminary steps before addressing the task. Import some basic libraries and set a variable that will be used in multiple steps.

In [459]:
import os, sys
import zipfile
import urllib.request
import re
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tqdm import tqdm

# typing
from typing import List, Callable, Dict

# Dataset

## Constant and utilities

In [460]:
DATASET_NAME = "dependency_treebank"
DOCUMENT_EXTENSION = ".dp"

USE_DOCUMENTS = True #True=Use documents; False = Use sentences
file_end_name = "_documents" if USE_DOCUMENTS else "_sentences"

#List of paths to handle the dataset
DATASET_PATHS = {
    "url" : 'https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/packages/corpora/dependency_treebank.zip', #url to dowload the dataset
    "dataset_folder": os.path.join(os.getcwd(), "Datasets", "Original"), #folder containing the original dataset data
    "dataset_path" : os.path.join(os.getcwd(), "Datasets", "Original", "dependency_treebank.zip"), #path to zipped dataset
    "documents_path" : os.path.join(os.getcwd(), "Datasets", "Original", DATASET_NAME), #folder containing extracted documents (NB: it is created automatically during the extraction)
    "dataframe_folder" : os.path.join(os.getcwd(), "Datasets", "Dataframes", DATASET_NAME), #folder containing the dataframe data
    "dataframe_path" : os.path.join(os.getcwd(), "Datasets", "Dataframes", DATASET_NAME, DATASET_NAME + file_end_name + ".pkl") #path to pickle save of built dataframe
}

TRAINING_DOCS = 100
VALIDATION_DOCS = 50
TEST_DOCS = 49

PADDING = 0

##Folders creation

In [461]:
def create_folders(paths):
  for path in paths:
    if not os.path.exists(path):
      os.makedirs(path)

folders = [DATASET_PATHS["dataset_folder"],
           DATASET_PATHS["dataframe_folder"]]
           
create_folders(folders)

## Dataset download

In [462]:
def download_dataset(dataset_path):
  if not os.path.exists(dataset_path):
      urllib.request.urlretrieve(DATASET_PATHS["url"], dataset_path)

      print("Successful download")

download_dataset(DATASET_PATHS["dataset_path"])

## Dataset extraction

In [463]:
def extract_dataset(dataset_path, dataset_folder, documents_path):
  expected_docs_number = TRAINING_DOCS + VALIDATION_DOCS + TEST_DOCS

  if not os.path.exists(documents_path) or len(os.listdir(documents_path))<expected_docs_number:
    with zipfile.ZipFile(dataset_path, 'r') as zip_ref:
        zip_ref.extractall(dataset_folder)

    print("Successful extraction")

extract_dataset(DATASET_PATHS["dataset_path"],DATASET_PATHS["dataset_folder"],DATASET_PATHS["documents_path"])

##Preprocess

In [464]:
def preprocess(token : str) -> str:
	"""
	Calls the function that cleans the text
	Input: the string to process
	Output: the processed string
	"""
	token = text_to_lower(token)
	token = strip_text(token)
	return token

def text_to_lower(text: str) -> str:
	"""
	Returns the string in lower character
	Input: the string to process
	Output: the processed string
	"""
	return text.lower();

def strip_text(text: str) -> str:
	"""
	Removes any left or right spacing (including carriage return) from text.
	Example:
	Input: '  This assignment is cool\n'
	Output: 'This assignment is cool'
	"""

	return text.strip()

## Dataframe creation

In [465]:
TOKEN_SEPARATOR = " " #Character used to separate tokens in the dataframe
SENTENCE_SEPARATOR = "##" #Characters to signal the end of a sentence (if USE_DOCUMENTS=False)
WORDS_CONTAINER = "document" if USE_DOCUMENTS else "sentence"

def list_to_string(_list):
	string = ""

	for index, value in enumerate(_list):
		string+=value
		if index!=len(_list)-1:
			string+=TOKEN_SEPARATOR
	
	return string

def add_row_to_dataframe_rows(dataframe_rows, split, document, labels):
	if USE_DOCUMENTS:
		dataframe_row = {"split": split, WORDS_CONTAINER: document, "labels": labels}
		dataframe_rows.append(dataframe_row)
	
	else:
		sentences = document.split(SENTENCE_SEPARATOR)
		split_labels = labels.split(SENTENCE_SEPARATOR)
		for i in range(len(sentences)):
			dataframe_row = {"split": split, WORDS_CONTAINER: sentences[i], "labels": split_labels[i]}
			dataframe_rows.append(dataframe_row)
	
def rows_to_dataframe(rows):
	dataframe = pd.DataFrame(rows)
	dataframe = dataframe[["split", WORDS_CONTAINER, "labels"]]

	return dataframe

def get_documents(path):
	files = os.listdir(path)
	documents = filter(lambda name: (name.endswith(DOCUMENT_EXTENSION)), files)
	documents = list(documents)
	documents.sort()
 
	return documents
 
def get_document_number(filename):
	return int(filename.split("_")[1].split(".")[0])
 
def extract_data_from_line(line):
	if line != "\n":
		columns = line.split()
		token = columns[0]
		token = preprocess(token)
		label = columns[1]

		return token, label

	else:
		if USE_DOCUMENTS:
			return None, None
		else:
			return SENTENCE_SEPARATOR, SENTENCE_SEPARATOR

def process_document(document, doc_number):
	tokens = []
	labels = []
	split = ""

	try:
		if os.path.isfile(document):
			#Open the file
			with open(document, mode='r', encoding='utf-8') as text_file:

				#Split in different groups
				if doc_number <= TRAINING_DOCS:
					split = "train"
				elif doc_number <= TRAINING_DOCS+VALIDATION_DOCS:
					split = "validation"
				else:
					split = "test"

				#Stop at end of file
				for line in text_file:
					token, label = extract_data_from_line(line)

					if token!=None and label!=None:
						tokens.append(token)
						labels.append(label)
		 
	except Exception as e:
                print('Failed to process %s. Reason: %s' % (document, e))
                sys.exit(0)

	return split, list_to_string(tokens), list_to_string(labels)

###Build/Load Dataframe

In [466]:
def build_dataframe(documents_path,dataframe_path, save=True):

	documents = get_documents(documents_path)
	dataframe_rows = []

	#Iterate along the files
	for filename in documents:
		document = os.path.join(documents_path, filename)
		doc_number = get_document_number(filename)

		split, tokens, labels = process_document(document, doc_number)
		add_row_to_dataframe_rows(dataframe_rows,split,tokens,labels)

	#Transform the list of rows in a proper dataframe
	dataframe = rows_to_dataframe(dataframe_rows)
	print("Dataframe built successfully")
	
	#Save the dataframe
	if save:
		dataframe.to_pickle(dataframe_path)
		print("Dataframe saved successfully")
 
	return dataframe

def load_dataframe(documents_path, dataframe_path, force_rebuild = False):
	if not os.path.exists(dataframe_path) or force_rebuild:
		return build_dataframe(documents_path, dataframe_path)
	else:
		return pd.read_pickle(dataframe_path)

##Split dataset

In [467]:
df = load_dataframe(DATASET_PATHS["documents_path"],DATASET_PATHS["dataframe_path"], True)

#Test
#print(df.iloc[1][WORDS_CONTAINER])
#print(df.iloc[1]["labels"])

training_set = df.loc[df["split"] == "train"]
validation_set = df.loc[df["split"] == "validation"]
test_set = df.loc[df["split"] == "test"]

Dataframe built successfully
Dataframe saved successfully


##Tokenization and vocabularies

In [468]:
STARTING_TOKEN = 1

def get_tokenizer(corpus, starting_dict=None):
  words_to_tokens = {} if starting_dict==None else starting_dict.copy()

  for text in corpus:
    words = text.split()
    for word in words:
      if not word in words_to_tokens:
        words_to_tokens[word] = len(words_to_tokens)+STARTING_TOKEN

  return words_to_tokens

def tokenize(word, words_to_tokens):
  return words_to_tokens[word]

def detokenize(token, words_to_tokens):
  return words_to_tokens.index(token)

def tokenize_string(string, words_to_tokens, max_lenght):
  tokens = string.split()
  tokenized_sequence = [tokenize(token, words_to_tokens)  for token in tokens]
  padding = max_lenght-len(tokenized_sequence)
  return np.pad(tokenized_sequence, (PADDING, padding), 'constant').tolist()

#Define corpus
train_text = training_set[WORDS_CONTAINER].tolist()
val_text = validation_set[WORDS_CONTAINER].tolist()
test_text = validation_set[WORDS_CONTAINER].tolist()

#Define labels
train_labels = training_set["labels"].tolist()
val_labels = validation_set["labels"].tolist()
test_labels = test_set["labels"].tolist()

#Token dictionaries
train_tokens = get_tokenizer(train_text)
val_tokens = get_tokenizer(val_text, starting_dict = train_tokens)
test_tokens = get_tokenizer(test_text, starting_dict = val_tokens)

#Vocabularies
train_vocab = train_tokens.keys()
val_vocab = val_tokens.keys()
test_vocab = test_tokens.keys()

#Vocab sizes
train_vocab_size = len(train_vocab)
val_vocab_size = len(val_vocab)
test_vocab_size = len(test_vocab)

#Max lenght of a token sequence
corpus = train_text+val_text+test_text
n_tokens = [len(doc.split()) for doc in corpus]
max_length = max(n_tokens)

#Tokenized sets
train_tokenized = np.array(list(map(lambda string: tokenize_string(string, train_tokens,max_length),train_text)))
val_tokenized = np.array(list(map(lambda string: tokenize_string(string, val_tokens,max_length),val_text)))
test_tokenized = np.array(list(map(lambda string: tokenize_string(string, test_tokens,max_length),test_text)))

##Labels encoding

In [469]:
ONE_HOT = False # Determine which encoding method to use: True = One_hot encoding; False = Categorical encoding

def get_categorical_encoding(labels_list):
  encoding = {}

  for label_group in labels_list:
    labels = label_group.split()
    for label in labels:
      if not label in encoding:
        encoding[label] = len(encoding)+1

  return encoding

def get_one_hot_encoding(categorical_encoding):
  one_hot_encoding = {}

  size = len(categorical_encoding)
  labels = categorical_encoding.keys()

  for index, label in enumerate(labels):
    encoding = np.zeros(size, dtype=np.int32)
    encoding[index] = 1
    one_hot_encoding[label] = encoding

  return one_hot_encoding

def get_labels_encoding(labels_list, one_hot):
  categorical_encoding = get_categorical_encoding(labels_list)

  if one_hot:
    return get_one_hot_encoding(categorical_encoding)

  else:
    return categorical_encoding

def encode_label(label, encoding):
  return encoding[label]

def decode_label(value, encoding):
  return encoding.index(value)

def encode_string_of_labels(string, encoding, max_lenght):
  labels = string.split()
  encoded_sequence = [encode_label(label, encoding)  for label in labels]
  padding = max_lenght-len(encoded_sequence)
  return np.pad(encoded_sequence, (PADDING, padding), 'constant')

def is_punctuation_label(label):
  return len(label)==1

labels_list = training_set["labels"].tolist() + validation_set["labels"].tolist() + test_set["labels"].tolist()
labels_encoding = get_labels_encoding(labels_list, ONE_HOT)

# GloVe
This section is the one responsible for the implementation of the GloVe embedding system.

## Constants and utilities


In [470]:
URL_BASE = "https://nlp.stanford.edu/data" #Location of the pre-trained GloVe's files
GLOVE_VERSION = "6B"

EMBEDDING_SIZE = 50 #The dimensionality of the embeddings; to be tested

#List of paths to download and extract GloVe's files
PATHS = {
    "url": URL_BASE + "/glove." + GLOVE_VERSION + ".zip",
    "glove_path": os.path.join(os.getcwd(),"Glove",GLOVE_VERSION),
    "glove_zip": os.path.join(os.getcwd(),"Glove", GLOVE_VERSION, "glove."+GLOVE_VERSION+".zip"),
    "glove_file": os.path.join(os.getcwd(),"Glove", GLOVE_VERSION, "glove."+GLOVE_VERSION+"."+str(EMBEDDING_SIZE)+"d.txt")
}

OOV_METHOD = "Mean" #Determine which OOV method to adopt; choose one between "Mean", "Random" and "Placeholder"

## Download
In this part the presence of the GloVe file is checked. In case of a negative response, it will be downloaded and extracted.

In [471]:
def setup_files():

  '''
  Create the folder if it does not exist.
  Then download the zip file from the web archive if it does not exist.
  Finally exctract the zip file of the GloVe txt file does not exist in the folder.
  '''

  if not os.path.exists(PATHS["glove_path"]):
    os.makedirs(PATHS["glove_path"])

  if not os.path.exists(PATHS["glove_file"]):
    if not os.path.exists(PATHS["glove_zip"]):
      download_glove(PATHS["url"])

    extract_glove(PATHS["glove_zip"],PATHS["glove_path"])

def download_glove(url: str):

    '''
    Download GloVe's zip file from the web.
    '''

    urllib.request.urlretrieve(url, PATHS['glove_zip'])
    print("Successful download")

def extract_glove(zip_file: str,
                  glove_path: str):
  
    '''
    Extract GloVe's zip file.
    '''
  
    with zipfile.ZipFile(PATHS["glove_zip"], 'r') as zip_ref:
      zip_ref.extractall(path=PATHS["glove_path"])
      print("Successful extraction")

## Initialization
In this step, the downloaded GloVe file is loaded into an embedding vocabulary

In [472]:
def load_model(glove_file: str) ->Dict:

  '''
  Open GloVe's txt file and store each of its contained words
  into a dictionary along with their correspondent embedding weights.

  Parameters:
  ----------
  glove_file : str
      GloVe's txt file path.

  Returns:
  -------
  vocabulary: Dict
      GloVe's vocabulary

  '''

  print("Loading GloVe Model...")

  with open(glove_file, encoding="utf8" ) as f: #Open the txt file
      lines = f.readlines() #Read the file line by line

  vocabulary = {}
  for line in lines:
      splits = line.split()
      #Save the first part of the line (word) as the dictionary's key and the second part (the embedding) as the key
      vocabulary[splits[0]] = np.array([float(val) for val in splits[1:]])

  print("GloVe model loaded")

  return vocabulary

## OOV
In this section, some possible "Out Of Vocabulary" handling methods are implemented, along with other OOV-related functions.

In [473]:
#OOV-handling: possible methods

PLACEHOLDER = np.random.uniform(low=-0.05, high=0.05, size=EMBEDDING_SIZE)

def random_embedding(embedding_size: int) ->np.array:
  '''
  Return a numpy array with random values sampled from a uniform distribution

  Parameters:
  ----------
  embedding_size: int
    The embedding size that is used as the size of the numpy array.

  Results:
  -------
  np.array
  A randomized numpy array.
  '''

  return np.random.uniform(low=-0.05, high=0.05, size=embedding_size)

def placeholder_embedding() ->np.ndarray:

  return PLACEHOLDER

def neighbours_mean_embedding(word: str,
                              glove_embedding: Dict[str,int],
                              sentences: List[str],
                              labels_list: List[str]):
  
  '''
  Compute the embedding of an OOV word by taking the mean
  of its neighbours.

  Parameters:
  ---------
  word: str
      The OOV that needs to be embedded.
  glove_embedding: Dict[str, int]
      GloVe's embedding.
  sentences: List[List[str]]
      A list of all the sentences (lists of words) in the current set.

  Returns:
  -------
  mean: int
      The mean of the embedding values of OOV-word's neighbours.
  '''
  
  neighbours = set()

  for sentence,labels_group in zip(sentences,labels_list):
    tokens = sentence.split()
    labels = labels_group.split()
    for index, token in enumerate(tokens):
      if word == token:
        if index!=0:
          left_neighbour = tokens[index-1]
          if left_neighbour in glove_embedding and not is_punctuation_label(labels[index-1]): #Consider only words that are not OOV and not punctuation
            neighbours.add(left_neighbour)
        if index!=len(tokens)-1:
          right_neighbour = tokens[index+1]
          if right_neighbour in glove_embedding and not is_punctuation_label(labels[index+1]): #Consider only words that are not OOV and not punctuation
            neighbours.add(right_neighbour)

  neighbours_embeddings = np.array([glove_embedding[neighbour] for neighbour in neighbours])
  return np.mean(neighbours_embeddings) if len(neighbours)>0 else PLACEHOLDER


#Others
def get_oov_list(words: List[str],
                 glove_embedding: Dict[str, int]) ->List[str]:

  '''
  Return a list of all the words that are not part of the GloVe embedding

  Parameters:
  ----------
  words: List[str]
      A list of unique words from a set of documents.
  glove_embedding: Dict[str, int]
      GloVe's embedding.

  Returns:
  -------
  oov: List[str]
      A list of all the OOV terms.
  '''

  embedding_vocabulary = set(glove_embedding.keys())
  oov = set(words).difference(embedding_vocabulary)
  return list(oov)

##Embedding matrix
Now, having opted for an OOV method, it is possible to create the embedding matrix, which associates the embedding to the correspondent word for the entire vocabulary.

In [474]:
def update_embeddings(glove_embedding: Dict[str, int],
                     new_embeddings: Dict[str, int]):
  
  '''
  Update the GloVe's embeddings by adding the new embeddings of
  the previous OOV words.

  Parameters:
  ----------
  glove_embedding: Dict[str, int]
      GloVe's embedding.
  new_embeddings: Dict[str, int]
      A dictionary containing the new embeddings
      for the analyzed OOV words.
  '''
  
  #Merge GloVe's embeddings with the new discoveries
  glove_embedding.update(new_embeddings)

def build_embedding_matrix(vocab_size: int,
                            glove_embedding: Dict[str, int],
                            embedding_size: int,
                            words_to_tokens: Dict[str,int],
                            oov_method: str,
                            sentences: List[str],
                            labels: List[str]) ->np.ndarray:

  embedding_matrix = np.zeros((vocab_size, embedding_size), dtype=np.float32) #Create an empty embedding matrix

  oov_terms = get_oov_list(words_to_tokens.keys(),glove_embedding)
  discovered_embeddings = {}

  for word, token in tqdm(words_to_tokens.items()):

    if np.all((embedding_matrix[token-STARTING_TOKEN] == 0)):

      if word in oov_terms: #Hanlde the OOV case with one of the methods
        if oov_method == "Random":
          embedding_vector = random_embedding(embedding_size)
        elif oov_method == "Placeholder":
          embedding_vector = placeholder_embedding()
        elif oov_method == "Mean":
          embedding_vector = neighbours_mean_embedding(word, glove_embedding, sentences, labels)
        else:
          raise "Invalid OOV method"
        
        discovered_embeddings[word] = embedding_vector

      else:
        embedding_vector = glove_embedding[word]

      embedding_matrix[token-STARTING_TOKEN] = embedding_vector #Update the embedding matrix

  #The computed values for the OOV words update the GloVe embeddings at the end of the process.
  #Updating these values at runtime affects the "Mean" OOV method.
  update_embeddings(glove_embedding, discovered_embeddings)

  return embedding_matrix

##Train, validation and test embedding matrices
Here all the previous methods defined in the above sections are exploited to create three different vocabularies.

In [475]:
setup_files() #Create a path, download and extract the files, if necessary
glove_embedding = load_model(PATHS["glove_file"]) #Load the GloVe model

#Build the embedding matrix with the training set data
train_embedding_matrix = build_embedding_matrix(train_vocab_size,
                                                glove_embedding,
                                                EMBEDDING_SIZE,
                                                train_tokens,
                                                OOV_METHOD,
                                                train_text,
                                                train_labels)

#Get an updated version of the embedding matrix with the validation set data
val_embedding_matrix = build_embedding_matrix(val_vocab_size,
                                                glove_embedding,
                                                EMBEDDING_SIZE,
                                                val_tokens,
                                                OOV_METHOD,
                                                val_text,
                                                val_labels)

#Get an updated version of the embedding matrix with the test set data
test_embedding_matrix = build_embedding_matrix(test_vocab_size,
                                                glove_embedding,
                                                EMBEDDING_SIZE,
                                                test_tokens,
                                                OOV_METHOD,
                                                test_text,
                                                test_labels)

Loading GloVe Model...
GloVe model loaded


100%|██████████| 7404/7404 [00:03<00:00, 1982.11it/s]
100%|██████████| 9901/9901 [00:01<00:00, 7957.08it/s] 
100%|██████████| 9901/9901 [00:00<00:00, 120762.49it/s]


## Models' input initialization

In [476]:
def build_input(embedding_matrix, tokenized_sequence, embedding_size):
  n_docs = len(tokenized_sequence)
  n_tokens = len(tokenized_sequence[0])

  input = np.zeros((n_docs, n_tokens, embedding_size))

  for doc_index, tokens in enumerate(tokenized_sequence):
    for token_index, token in enumerate(tokens):
      if token!=PADDING:
        input[doc_index][token_index] = embedding_matrix[token-STARTING_TOKEN]
      else:
        input[doc_index][token_index] = np.zeros(embedding_size)

  return input

#Input values
X_train = build_input(train_embedding_matrix, train_tokenized, EMBEDDING_SIZE)
X_val = build_input(val_embedding_matrix, val_tokenized, EMBEDDING_SIZE)
X_test = build_input(test_embedding_matrix, test_tokenized, EMBEDDING_SIZE)

#Class values
y_train = np.array(list(map(lambda string: encode_string_of_labels(string, labels_encoding,max_length),train_labels)))
y_val = np.array(list(map(lambda string: encode_string_of_labels(string, labels_encoding,max_length),val_labels)))
y_test = np.array(list(map(lambda string: encode_string_of_labels(string, labels_encoding,max_length),test_labels)))

# Models
This section is used for creating different models, going from a baseline to slightly more complicated ones.

## Constants and utilities
First of all, define some constants, parameter dictionaries and methods that will be reused by each architecture.

In [477]:
# TODO: all the following constants are temporary 
N_CLASSES = 20  # this must be equal to the number of tags
VOCABULARY_SIZE = 1000  # this must be obtained from the dataset
MAX_SEQUENCE_SIZE = 100  # this must be obtained from the dataset

BATCH_SIZE = 128  # hyper-parameter to properly set
EPOCHS = 5


# Model common compile information
# Use sparse_categorical_crossentropy because labels are one hot encoded
model_compile_info = {
    'optimizer': keras.optimizers.Adam(learning_rate=1e-3),
    'loss': 'sparse_categorical_crossentropy',
    'metrics': [keras.metrics.SparseCategoricalAccuracy()],
}

# Model common training information
training_info = {
    'verbose': 1,
    'epochs': EPOCHS,
    'batch_size': BATCH_SIZE,
    'callbacks': [keras.callbacks.EarlyStopping(monitor='val_loss', 
                                                patience=10,
                                                restore_best_weights=True)]
}

In [478]:
# This tensor should contain the weights obtained by GloVe
embedding_weights = np.zeros(shape=(VOCABULARY_SIZE, EMBEDDING_SIZE))

Define utility methods that will be used to **create**, **train** and **test** the models.

In [479]:
def create_model(name,
                 layers, 
                 compile_info, 
                 show_summary=True) -> keras.Model:
    """
    Create the model using the layers passed as parameters.
    After the creation, the model is compiled and its summary is possibly 
    printed to console.

    Parameters
    ----------
    layers : array
        Array that contains a list of layers that must be added 
        to the model.
    compile_info: Dictionary
        Contains information required for compiling the model.
    show_summary: bool
        If true, then the summary of the model will be printed to console
    

    Returns
    -------
    model : keras.Model
        The keras model.
    """
    model = keras.Sequential(name=name)
    
    for idx, layer in enumerate(layers):

        # Sanity checks for being sure that the last layer has been 
        # correctly set
        if idx == len(layers) - 1:
            assert layer.activation == keras.activations.softmax, 'Wrong activation function'
            assert layer.units == N_CLASSES, 'Wrong number of units'

        model.add(layer)

    # Compile
    model.compile(**compile_info)

    # Print model summary
    if show_summary:
        model.summary()
    
    return model


def train_model(model: keras.Model,
                x_train: np.ndarray,
                y_train: np.ndarray,
                x_val: np.ndarray,
                y_val: np.ndarray,
                training_info: dict):
    """
    Training routine for the Keras model.
    At the end of the training, retrieved History data is shown.

    :param model: Keras built model
    :param x_train: training data in np.ndarray format
    :param y_train: training labels in np.ndarray format
    :param x_val: validation data in np.ndarray format
    :param y_val: validation labels in np.ndarray format
    :param training_info: dictionary storing model fit() argument information

    :return
        model: trained Keras model
    """
    print("Start training! \nParameters: {}".format(training_info))
    history = model.fit(x=x_train, y=y_train,
                        validation_data=(x_val, y_val),
                        **training_info)
    print("Training completed! Showing history...")

    show_history(history)

    return model


def predict_data(model: keras.Model,
                 x: np.ndarray,
                 prediction_info: dict) -> np.ndarray:
    """
    Inference routine of a given input set of examples

    :param model: Keras built and possibly trained model
    :param x: input set of examples in np.ndarray format
    :param prediction_info: dictionary storing model predict() argument information

    :return
        predictions: predicted labels in np.ndarray format
    """

    print('Starting prediction: \n{}'.format(prediction_info))
    print('Predicting on {} samples'.format(x.shape[0]))

    predictions = model.predict(x, **prediction_info)
    return predictions


def evaluate_predictions(predictions: np.ndarray,
                         y: np.ndarray,
                         metrics: List[Callable],
                         metric_names: List[str]):
    """
    Evaluates given model predictions on a list of metric functions

    :param predictions: model predictions in np.ndarray format
    :param y: ground-truth labels in np.ndarray format
    :param metrics: list of metric functions
    :param metric_names: list of metric names

    :return
        metric_info: dictionary containing metric values for each input metric
    """

    assert len(metrics) == len(metric_names)

    print("Evaluating predictions! Total samples: ", y.shape[0])

    metric_info = {}

    for metric, metric_name in zip(metrics, metric_names):
        metric_value = metric(y_pred=predictions, y_true=y)
        metric_info[metric_name] = metric_value

    return metric_info

def model_sanity_check(model: keras.Model, 
                       use_embedding_layer: bool = False):
    """
    Create a random input_tensor and try to pass through the model.
    This method should be used in order to check if the model is 
    working as expected.

    Parameters
    ----------
    model : keras.Model
        The model that must be tested.
    use_embedding_layer: depending on this flag the shape of the input must be 
        treated differently.

    """
    print(f'Sanity check for the model with name: {model.name}')
    # Model sanity check for seeing if it runs correctly
    if use_embedding_layer:
        input_tensor = np.random.uniform(
            size=(BATCH_SIZE, MAX_SEQUENCE_SIZE)
            )
    else:
        input_tensor = np.random.uniform(
            size=(BATCH_SIZE, MAX_SEQUENCE_SIZE, EMBEDDING_SIZE)
            )
    print(f'Input tensor shape: {input_tensor.shape}')
    output_tensor = model(input_tensor)
    print(f'Output tensor shape: {output_tensor.shape}')

Define utility methods for **creating layers** in order to: 
* reduce the code verbosity.
* be sure to always create different architectures with the same layer structures.

In [480]:
# EMBEDDING
# NOTE: Actually this layer has not been used in the final models, 
# but it has been used for some experimentations
def embedding_layer(embedding_weights: np.array,
                    layer_name: str='embedding') -> layers.Embedding:
    """
    Create an embedding layer.

    Parameters
    ----------
    embedding_weights : np.array
        The weights for the embedding layer.
    layer_name : str
        The name of the layer
    
    Returns
    -------
    layer : layers.Embedding
        The created embedding layer.
    """
    layer = layers.Embedding(
        input_dim=VOCABULARY_SIZE, 
        output_dim=EMBEDDING_SIZE, 
        input_length=MAX_SEQUENCE_SIZE,
        weights=[embedding_weights],
        mask_zero=True,
        name=layer_name
        )
    return layer

# MASKING
def masking_layer(input_shape: tuple,
                  mask_value: float=0.0,
                  layer_name: str='masking') -> layers.Masking:
    """
    Create a masking layer.

    Parameters
    ----------
    input_shape : tuple
        The weights for the embedding layer.
    mask_value : the value to mask because it represents the padding 
    layer_name : str
        The name of the layer
    
    Returns
    -------
    layer : layers.Masking
        The created masking layer.
    """
    layer = layers.Masking(
        input_shape=input_shape, 
        mask_value=mask_value,
        name=layer_name
        )
    return layer

# RNN (LSTM and GRU)
def _rnn_size(layer_depth: int) -> int:
    """
    Simple logic used for assigning the number of units 
    to the rnn layer.

    Parameters
    ----------
    layer_depth : int
        The depth of the layer.

    Returns
    -------
    size : int
        The number units.
    """
    size = 64
    if layer_depth > 1:
        size = 128
    return size

def bilstm_layer(layer_depth: int,
                 layer_name: str='bi-lstm') -> layers.Bidirectional:
    """
    Create a bidirectional lstm layer.

    Parameters
    ----------
    layer_depth : int
        The depth of the layer.
    layer_name : str
        The name of the layer
    
    Returns
    -------
    layer : layers.Bidirectional
        The created bidirectional lstm layer.
    """
    size = _rnn_size(layer_depth)
    layer = layers.Bidirectional(
        layers.LSTM(size, 
                    return_sequences=True, 
                    activation='relu'),
                    name=layer_name,
                    
        )
    return layer

def bigru_layer(layer_depth: int,
                layer_name: str='bi-gru') -> layers.Bidirectional:
    """
    Create a bidirectional gru layer

    Parameters
    ----------
    layer_depth : int
        The depth of the layer.
    layer_name : str
        The name of the layer
    
    Returns
    -------
    layer : layers.Bidirectional
        The created bidirectional gru layer.
    """
    size = _rnn_size(layer_depth)
    layer = layers.Bidirectional(
        layers.GRU(size, 
                   return_sequences=True, 
                   activation='relu'),
                   name=layer_name
        )
    return layer

# DENSE
def _dense_size(last_layer:bool) -> int:
    """
    Simple logic for assigning the size of the dense layer.

    Parameters
    ----------
    last_layer : bool
        Indicates if the layer that must be created is the last
        one of the network.
    
    Returns
    -------
    size : int
        The size of the dense layer.
    """
    size = N_CLASSES
    if not last_layer:
        size = 256
    return size

def _dense_activation(last_layer:bool) -> str:
    """
    Simple logic for assigning the activation function of the dense layer.

    Parameters
    ----------
    last_layer : bool
        Indicates if the layer that must be created is the last
        one of the network.
    
    Returns
    -------
    activation : str
        The activation function of the layer.
    """
    activation = 'relu'
    if last_layer:
        activation = 'softmax'
    return activation

def dense_layer(last_layer:bool,
                layer_name: str='dense') -> layers.Dense:
    """
    Create a dense layer

    Parameters
    ----------
    last_layer : bool
        Indicates if the layer that must be created is the last
        one of the network.
    layer_name : str
        The name of the layer
    
    Returns
    -------
    layer : layers.Dense
        The created dense layer.
    """
    size = _dense_size(last_layer)
    activation = _dense_activation(last_layer)
    
    return layers.Dense(size, 
                        activation=activation, 
                        name=layer_name)

## Baseline

In [481]:
# Create layers
baseline_layers = [
                # embedding_layer(embedding_weights=embedding_weights),
                masking_layer(input_shape=(MAX_SEQUENCE_SIZE, EMBEDDING_SIZE), 
                              layer_name='masking_0'),
                bilstm_layer(layer_depth=1, 
                             layer_name='bi-lstm_0'),
                dense_layer(last_layer=True, 
                            layer_name='dense_0')
]

# Create the model
baseline_model = create_model('baseline', 
                              baseline_layers, 
                              model_compile_info)

# Check if the model can actually run
model_sanity_check(baseline_model)

Model: "baseline"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 masking_0 (Masking)         (None, 100, 50)           0         
                                                                 
 bi-lstm_0 (Bidirectional)   (None, 100, 128)          58880     
                                                                 
 dense_0 (Dense)             (None, 100, 20)           2580      
                                                                 
Total params: 61,460
Trainable params: 61,460
Non-trainable params: 0
_________________________________________________________________
Sanity check for the model with name: baseline
Input tensor shape: (128, 100, 50)
Output tensor shape: (128, 100, 20)


## Variations
What follows is the implementation of small variations to the baseline architecture.

### GRU
Change the LSTM layer with the GRU layer

In [482]:
# Create layers
baseline_var1_layers = [
                # embedding_layer(embedding_weights=embedding_weights),
                masking_layer(input_shape=(MAX_SEQUENCE_SIZE, EMBEDDING_SIZE),
                              layer_name='masking_0'),
                bigru_layer(layer_depth=1,
                            layer_name='bi-gru_0'),
                dense_layer(last_layer=True,
                            layer_name='dense_0')
]

# Create the model
baseline_var1_model = create_model('baseline_var1', 
                              baseline_var1_layers, 
                              model_compile_info)

# Check if the model can actually run
model_sanity_check(baseline_var1_model)

Model: "baseline_var1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 masking_0 (Masking)         (None, 100, 50)           0         
                                                                 
 bi-gru_0 (Bidirectional)    (None, 100, 128)          44544     
                                                                 
 dense_0 (Dense)             (None, 100, 20)           2580      
                                                                 
Total params: 47,124
Trainable params: 47,124
Non-trainable params: 0
_________________________________________________________________
Sanity check for the model with name: baseline_var1
Input tensor shape: (128, 100, 50)
Output tensor shape: (128, 100, 20)


### Additional LSTM layer

In [483]:
# Create layers
baseline_var2_layers = [
                # embedding_layer(embedding_weights=embedding_weights),
                masking_layer(input_shape=(MAX_SEQUENCE_SIZE, EMBEDDING_SIZE),
                              layer_name='masking_0'),
                bilstm_layer(layer_depth=1,
                             layer_name='bi-lstm_0'),
                bilstm_layer(layer_depth=2,
                             layer_name='bi-lstm_1'),
                dense_layer(last_layer=True,
                            layer_name='dense_0')
]

# Create the model
baseline_var2_model = create_model('baseline_var2', 
                              baseline_var2_layers, 
                              model_compile_info)

# Check if the model can actually run
model_sanity_check(baseline_var2_model)

Model: "baseline_var2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 masking_0 (Masking)         (None, 100, 50)           0         
                                                                 
 bi-lstm_0 (Bidirectional)   (None, 100, 128)          58880     
                                                                 
 bi-lstm_1 (Bidirectional)   (None, 100, 256)          263168    
                                                                 
 dense_0 (Dense)             (None, 100, 20)           5140      
                                                                 
Total params: 327,188
Trainable params: 327,188
Non-trainable params: 0
_________________________________________________________________
Sanity check for the model with name: baseline_var2
Input tensor shape: (128, 100, 50)
Output tensor shape: (128, 100, 20)


### Additional Dense layer

In [484]:
# Create layers
baseline_var3_layers = [
                # embedding_layer(embedding_weights=embedding_weights),
                masking_layer(input_shape=(MAX_SEQUENCE_SIZE, EMBEDDING_SIZE),
                              layer_name='masking_0'),
                bilstm_layer(layer_depth=1,
                             layer_name='bi-lstm_0'),
                dense_layer(last_layer=False,
                            layer_name='dense_0'),
                dense_layer(last_layer=True,
                            layer_name='dense_1')
]

# Create the model
baseline_var3_model = create_model('baseline_var3', 
                              baseline_var3_layers, 
                              model_compile_info)

# Check if the model can actually run
model_sanity_check(baseline_var3_model)

Model: "baseline_var3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 masking_0 (Masking)         (None, 100, 50)           0         
                                                                 
 bi-lstm_0 (Bidirectional)   (None, 100, 128)          58880     
                                                                 
 dense_0 (Dense)             (None, 100, 256)          33024     
                                                                 
 dense_1 (Dense)             (None, 100, 20)           5140      
                                                                 
Total params: 97,044
Trainable params: 97,044
Non-trainable params: 0
_________________________________________________________________
Sanity check for the model with name: baseline_var3
Input tensor shape: (128, 100, 50)
Output tensor shape: (128, 100, 20)


# Training and Experiments

# Disussion and Error Analysis