<a href="https://colab.research.google.com/github/ElFosco/NLP_assignments/blob/main/Assignment_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 2

**Due to**: 23/12/2021 (dd/mm/yyyy)

**Credits**: Andrea Galassi, Federico Ruggeri, Paolo Torroni

**Summary**: Fact checking, Neural Languange Inference (**NLI**)

# Imports

In [None]:
import os, shutil  # file management
import sys  # system
import pandas as pd  # dataframe management
import numpy as np  # data manipulation
from tqdm import tqdm  # useful during debugging (progress bars)
from typing import List, Callable, Dict  # typing
import re  # regex
import urllib.request  # download files
import zipfile  # unzip files
import gensim  # embeddings
import gensim.downloader as gloader  # embeddings
from sklearn.preprocessing import LabelEncoder, OneHotEncoder  # one-hot encoding
from matplotlib import pyplot as plt  # Plots
import nltk
from nltk.corpus import stopwords  # Remove stopwords
from nltk.stem import SnowballStemmer  # Stemming
from nltk.stem import WordNetLemmatizer

# Models
import tensorflow as tf
from tensorflow.keras import Sequential
from keras.layers import Bidirectional, Dense, SimpleRNN,GlobalAveragePooling1D,Flatten, Concatenate, Add, Average, Dot, Dropout
from keras.layers import concatenate, add, average, dot
from keras.layers.embeddings import Embedding
from keras.preprocessing import sequence
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import load_model
from keras import Input, Model
from keras.regularizers import l2

# F1
from sklearn.metrics import f1_score, accuracy_score, classification_report
from functools import partial

# Grid search
from sklearn.model_selection import GridSearchCV
import copy

#split
from sklearn.model_selection import train_test_split

# Download Data

In [None]:
import os
import requests
import zipfile

def save_response_content(response, destination):
    CHUNK_SIZE = 32768

    with open(destination, "wb") as f:
        for chunk in response.iter_content(CHUNK_SIZE):
            if chunk: # filter out keep-alive new chunks
                f.write(chunk)

def download_data(data_path):
    toy_data_path = os.path.join(data_path, 'fever_data.zip')
    toy_data_url_id = "1wArZhF9_SHW17WKNGeLmX-QTYw9Zscl1"
    toy_url = "https://docs.google.com/uc?export=download"

    if not os.path.exists(data_path):
        os.makedirs(data_path)

    if not os.path.exists(toy_data_path):
        print("Downloading FEVER data splits...")
        with requests.Session() as current_session:
            response = current_session.get(toy_url,
                                   params={'id': toy_data_url_id},
                                   stream=True)
        save_response_content(response, toy_data_path)
        print("Download completed!")

        print("Extracting dataset...")
        with zipfile.ZipFile(toy_data_path) as loaded_zip:
            loaded_zip.extractall(data_path)
        print("Extraction completed!")

download_data('dataset')

# Preprocessing of the Dataset

In [None]:
# function used to preprocess the text
def clean_text(text):

  # delete multiple quotes
  delete_multiple_quotes = "''|``|\.\."
  ris = re.sub(delete_multiple_quotes, '', text)

  # get only the sentence, delete the number before it and the keywords after it
  start_symbol = "^[0-9]*\\t"
  end_symbol = "( )?[\.|\?|\!|\,]( )?(\\t.*)?$"
  ris = re.sub(start_symbol, '', ris)
  ris = re.sub(end_symbol, '', ris)

  # convert the brackets into token, done for the claim string
  ris = re.sub("\(", " -LRB- ",ris)
  ris = re.sub("\)", " -RRB- ",ris)

  # check if numbers are present between tokens LSB and RSB, if it's not the case delete the content
  delete_content_lsb = "-LRB-(.[^0-9]*)-RRB-"
  ris = re.sub(delete_content_lsb, '', ris)

  # check if numbers are present in brackets, if it's not the case delete the content
  delete_content_brackets = "-LSB-(.[^0-9]*)-RSB-"
  ris = re.sub(delete_content_brackets, '', ris)

  # delete brackets token
  delete_brackets = "-LRB-|-RRB-|-RSB-|-LSB-"
  ris = re.sub(delete_brackets, ' ', ris)

  # deal with the &
  ris = re.sub("\&", ' and ', ris)

  # deal with the *
  ris = re.sub("star * reach", 'star*reach', ris)

  # remove tokens that we are not interested in
  remove_tokens = "[\-\"?!#`\$]"  # |[\.] " # added $ and \. handled alone`
  ris = re.sub(remove_tokens, ' ', ris)

  ris.strip()

  ris = " ".join([LEMMATIZER.lemmatize(word) for word in ris.split()])

  return ris.lower()

In [None]:
# stemmer and lemmatizer definition
try:
    STOPWORDS = set(stopwords.words('english'))
except LookupError:
    nltk.download('stopwords')
    STOPWORDS = set(stopwords.words('english'))

STEMMER = SnowballStemmer("english")

nltk.download('wordnet') 
LEMMATIZER = WordNetLemmatizer()

In [None]:
# reading data for generating training dataset, using the preprocessing
train_df = pd.read_csv('dataset/train_pairs.csv')
train_df = train_df.drop(['Unnamed: 0'],axis=1)
train_df['Evidence'] = train_df.apply(lambda row : clean_text(row['Evidence']), axis = 1)
train_df['Claim'] = train_df.apply(lambda row : clean_text(row['Claim']), axis = 1)

# reading data for generating test dataset, using the preprocessing
test_df = pd.read_csv('dataset/test_pairs.csv')
test_df = test_df.drop(['Unnamed: 0'],axis=1)
test_df['Evidence'] = test_df.apply(lambda row : clean_text(row['Evidence']), axis = 1)
test_df['Claim'] = test_df.apply(lambda row : clean_text(row['Claim']), axis = 1)

# reading data for generating validation dataset, using the preprocessing
valid_df = pd.read_csv('dataset/val_pairs.csv')
valid_df = valid_df.drop(['Unnamed: 0'],axis=1)
valid_df['Evidence'] = valid_df.apply(lambda row : clean_text(row['Evidence']), axis = 1)
valid_df['Claim'] = valid_df.apply(lambda row : clean_text(row['Claim']), axis = 1)

In [None]:
# drop not relevant info
X_train = train_df.drop(['Label','ID'], axis=1)
X_val = valid_df.drop(['Label','ID'], axis=1)
X_test = test_df.drop(['Label','ID'], axis=1)

# generating y, for the label
y_train = train_df['Label']
y_val = valid_df['Label']
y_test = test_df['Label']

# converting the label, into 0 and 1, 0 for Refutes, 1 for Supports
label_encoder = LabelEncoder()
y_train = label_encoder.fit_transform(y_train)
y_val = label_encoder.transform(y_val)
y_test = label_encoder.transform(y_test)


## Create GloVe embeddings 

In [None]:
def load_embedding_model(model_type: str,
                         embedding_dimension: int = 50) -> gensim.models.keyedvectors.KeyedVectors:
    """
    Loads a pre-trained word embedding model via gensim library.

    :param model_type: name of the word embedding model to load.
    :param embedding_dimension: size of the embedding space to consider

    :return
        - pre-trained word embedding model (gensim KeyedVectors object)
    """

    download_path = ""

    # Find the correct embedding model name
    if model_type.strip().lower() == 'word2vec':
        download_path = "word2vec-google-news-300"

    elif model_type.strip().lower() == 'glove':
        download_path = "glove-wiki-gigaword-{}".format(embedding_dimension)
    elif model_type.strip().lower() == 'fasttext':
        download_path = "fasttext-wiki-news-subwords-300"
    else:
        raise AttributeError("Unsupported embedding model type! Available ones: word2vec, glove, fasttext")

    # Check download
    try:
        emb_model = gloader.load(download_path)
    except ValueError as e:
        print("Invalid embedding model name! Check the embedding dimension:")
        print("Word2Vec: 300")
        print("Glove: 50, 100, 200, 300")
        raise e

    return emb_model

In [None]:
def check_OOV_terms(embedding_vocabulary: List[str],
                    word_listing: List[str]):
    """
    Checks differences between pre-trained embedding model vocabulary
    and dataset specific vocabulary in order to highlight out-of-vocabulary terms.

    :param embedding_vocabulary: pre-trained word embedding model vocab (list)
    :param word_listing: dataset specific vocabulary (list)

    :return
        - list of OOV terms
    """
    
    oov = set(word_listing).difference(embedding_vocabulary)
    return list(oov)

In [None]:
def build_embedding_matrix(embedding_model: gensim.models.keyedvectors.KeyedVectors,
                           embedding_dimension: int,
                           word_to_idx: Dict[str, int],
                           vocab_size: int,
                           oov_terms: List[str]) -> np.ndarray:
    """
    Builds the embedding matrix of a specific dataset given a pre-trained word embedding model

    :param embedding_model: pre-trained word embedding model (gensim wrapper)
    :param word_to_idx: vocabulary map (word -> index) (dict)
    :param vocab_size: size of the vocabulary
    :param oov_terms: list of OOV terms (list)

    :return
        - embedding matrix that assigns a high dimensional vector to each word in the dataset specific vocabulary (shape |V| x d)
    """
    embedding_matrix = np.zeros((vocab_size, embedding_dimension), dtype=np.float32)

    for word, idx in tqdm(word_to_idx.items()):
        try:
            embedding_vector = embedding_model[word]
        except (KeyError, TypeError):
            embedding_vector = np.random.uniform(low=-0.05, high=0.05, size=embedding_dimension)

        embedding_matrix[idx] = embedding_vector

    return embedding_matrix


def update_embedding_matrix(embedding_model: np.ndarray, 
                            embedding_dimension: int,
                            word_to_idx: Dict[str, int],
                            vocab_size: int,
                            oov_terms: List[str]) -> np.ndarray:
    """
    Builds the embedding matrix of a specific dataset given a pre-trained emdedding matrix

    :param embedding_model: pre-trained embedding matrix
    :param word_to_idx: vocabulary map (word -> index) (dict)
    :param vocab_size: size of the vocabulary
    :param oov_terms: list of OOV terms (list)

    :return
        - embedding matrix that assigns a high dimensional vector to each word in the dataset specific vocabulary (shape |V| x d)
    """
    embedding_matrix = np.zeros((vocab_size, embedding_dimension), dtype=np.float32)

    for word, idx in tqdm(word_to_idx.items()):
        try:
            embedding_vector = embedding_model[idx]
        except (TypeError, IndexError):
            embedding_vector = np.random.uniform(low=-0.05, high=0.05, size=embedding_dimension)

        embedding_matrix[idx] = embedding_vector

    return embedding_matrix


## Tokenizer

In [None]:
class KerasTokenizer(object):
    """
    A simple high-level wrapper for the Keras tokenizer.
    """

    def __init__(self, build_embedding_matrix=False, embedding_dimension=None,
                 embedding_model_type=None, tokenizer_args=None, embedding_model=None):
        if build_embedding_matrix:
            assert embedding_model_type is not None
            assert embedding_dimension is not None and type(embedding_dimension) == int

        self.build_embedding_matrix = build_embedding_matrix
        self.embedding_dimension = embedding_dimension
        self.embedding_model_type = embedding_model_type
        self.embedding_model = embedding_model
        self.embedding_matrix = None
        self.vocab = None

        tokenizer_args = {} if tokenizer_args is None else tokenizer_args
        assert isinstance(tokenizer_args, dict) or isinstance(tokenizer_args, collections.OrderedDict)

        self.tokenizer_args = tokenizer_args

    def build_vocab(self, data, **kwargs):
        print('Fitting tokenizer...')
        self.tokenizer = tf.keras.preprocessing.text.Tokenizer(**self.tokenizer_args)
        self.tokenizer.fit_on_texts(data)
        print('Fit completed!')

        self.vocab = self.tokenizer.word_index

        if self.build_embedding_matrix:
            if self.embedding_model is None:
              print('Loading embedding model! It may take a while...')
              self.embedding_model = load_embedding_model(model_type=self.embedding_model_type, 
                                                          embedding_dimension=self.embedding_dimension)
            
            print('Checking OOV terms in train...')
            self.oov_terms_train = check_OOV_terms(embedding_vocabulary=set(self.embedding_model.vocab.keys()),
                                             word_listing=list(self.vocab.keys()))
            
            print("Total OOV terms: {0} ({1:.2f}%)".format(len(self.oov_terms_train), 100*float(len(self.oov_terms_train)) / len(self.vocab)))

            print('Building the embedding matrix for train...')
            self.embedding_matrix = build_embedding_matrix(embedding_model=self.embedding_model,
                                                           word_to_idx=self.vocab,
                                                           vocab_size=len(self.vocab)+1,          
                                                           embedding_dimension=self.embedding_dimension,
                                                           oov_terms=self.oov_terms_train)
            print('Done for train!')

    def update_vocab(self, data, **kwargs):
      self.tokenizer.fit_on_texts(data)
      if self.build_embedding_matrix:
        old_vocab = self.vocab
        self.vocab = self.tokenizer.word_index
        print('Checking OOV terms...')
        self.oov_terms = check_OOV_terms(embedding_vocabulary=set(old_vocab.keys()), 
                                         word_listing=list(self.vocab.keys()))
        
        print("Total OOV terms: {0} ({1:.2f}%)".format(len(self.oov_terms), 100*float(len(self.oov_terms)) / len(self.vocab)))

        print('Building the embedding matrix...')
        self.embedding_matrix = update_embedding_matrix(embedding_model=self.embedding_matrix,
                                                       word_to_idx=self.vocab,
                                                       vocab_size=len(self.vocab)+1,          
                                                       embedding_dimension=self.embedding_dimension,
                                                       oov_terms=self.oov_terms)

    def get_info(self):
        return {
            'build_embedding_matrix': self.build_embedding_matrix,
            'embedding_dimension': self.embedding_dimension,
            'embedding_model_type': self.embedding_model_type,
            'embedding_matrix': self.embedding_matrix.shape if self.embedding_matrix is not None else self.embedding_matrix,
            'embedding_model': self.embedding_model,
            'vocab_size': len(self.vocab) + 1,
        }

    def tokenize(self, text):
        return text

    def convert_tokens_to_ids(self, tokens):
        if type(tokens) == str:
            return self.tokenizer.texts_to_sequences([tokens])[0]
        else:
            return self.tokenizer.texts_to_sequences(tokens)

    def convert_ids_to_tokens(self, ids):
        return self.tokenizer.sequences_to_texts(ids)

### Downloading embeddings

In [None]:
embedding_dimension = 50
embedding_model = load_embedding_model(model_type="glove", 
                                       embedding_dimension=embedding_dimension)

### Creating tokenizer and Vocabulary

In [None]:
tokenizer_args = {
    'oov_token': "OOV_TOKEN",  # The vocabulary id for unknown terms during text conversion
    'lower' : True,  # default
    'filters' : '' 
}

tokenizer = KerasTokenizer(tokenizer_args=tokenizer_args,
                           build_embedding_matrix=True,
                           embedding_dimension=embedding_dimension,
                           embedding_model_type="glove", 
                           embedding_model=embedding_model)
tokenizer.build_vocab(X_train["Evidence"])
tokenizer.update_vocab(X_train["Claim"])

tokenizer_info = tokenizer.get_info()

print('Tokenizer info: ', tokenizer_info)

### Updating tokenizer with validation and test

In [None]:
tokenizer.update_vocab(X_val["Claim"])
tokenizer.update_vocab(X_test["Claim"])
tokenizer.update_vocab(X_val["Evidence"])
tokenizer.update_vocab(X_test["Evidence"])

In [None]:
a = list(tokenizer.vocab.keys())
a.sort()
print(a)

### Padding for x and computation of max sequence length

In [None]:
def convert_text(df, tokenizer, is_training=False, max_seq_length=None):
    """
    Converts input text sequences using a given tokenizer

    :param texts: either a list or numpy ndarray of strings
    :tokenizer: an instantiated tokenizer
    :is_training: whether input texts are from the training split or not
    :max_seq_length: the max token sequence previously computed with
    training texts.

    :return
        text_ids: a nested list on token indices
        max_seq_length: the max token sequence previously computed with
        training texts.
    """


    text_ids_claim = tokenizer.convert_tokens_to_ids(df['Claim'])
    text_ids_evidence = tokenizer.convert_tokens_to_ids(df['Evidence'])

    # Padding
    if is_training:
        max_seq_length_claim = int(np.quantile([len(seq) for seq in text_ids_claim], 0.99))
        max_seq_length_evidence = int(np.quantile([len(seq) for seq in text_ids_evidence], 0.99))

        if max_seq_length_claim > max_seq_length_evidence:
            max_seq_length = max_seq_length_claim
        else:
          max_seq_length = max_seq_length_evidence

    else:
        assert max_seq_length is not None

    claims = [seq + [0] * (max_seq_length - len(seq)) for seq in text_ids_claim]
    claims = np.array([seq[:max_seq_length] for seq in claims])
    
    evidences = [seq + [0] * (max_seq_length - len(seq)) for seq in text_ids_evidence]
    evidences = np.array([seq[:max_seq_length] for seq in evidences])

    return max_seq_length, np.array([claims, evidences])
        

max_seq_length, x_train = convert_text(X_train, tokenizer, True)
print("Max token sequence: {}".format(max_seq_length))
print('X train shape: ', x_train.shape)

_, x_val = convert_text(X_val, tokenizer, False, max_seq_length)
print('X val shape: ', x_val.shape)

_, x_test = convert_text(X_test, tokenizer, False, max_seq_length)
print('X test shape: ', x_test.shape)

## Sentence embedding

---



In [None]:
embedding_vector_length = embedding_dimension

###First Model

Encode token sequences via a RNN and take the last state as the sentence embedding.

In [None]:
def firstModel(embedding_vector_length, dim):

  input = Input(shape=(max_seq_length))
  x = Embedding(len(tokenizer.vocab.keys())+1, embedding_vector_length, 
                      input_length=max_seq_length, 
                      trainable=True, 
                      mask_zero=True)(input)
  # added l2 regularization due to overfitting
  last_state = SimpleRNN(dim, kernel_regularizer=l2(0.01), recurrent_regularizer=l2(0.01), 
                         bias_regularizer=l2(0.01), return_state=True)(x)
  
  RNN = Model(input, last_state, name="firstModel")

  return RNN

In [None]:
# example to check if it is working, no train done
model_1 = firstModel(embedding_vector_length, 32)

lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
      0.001,
      decay_steps=100000,
      decay_rate=0.96,
      staircase=True)
  
optim = Adam(learning_rate=lr_schedule)
model_1.compile(loss='binary_crossentropy', optimizer=optim, 
                   metrics=['accuracy'])
ris = model_1.predict(x_train[0][0].reshape(-1, len(x_train[0][0])))
print(X_train.Claim[0])
print(ris[1])

###Second Model

Encode token sequences via a RNN and average all the output states.

In [None]:
def secondModel(embedding_vector_length, dim):
  
  input = Input(shape=(max_seq_length))
  x = Embedding(len(tokenizer.vocab.keys())+1, embedding_vector_length, 
                      input_length=max_seq_length, 
                      trainable=True, 
                      mask_zero=True)(input)
  # added l2 regularization due to overfitting
  states = SimpleRNN(dim, kernel_regularizer=l2(0.01), recurrent_regularizer=l2(0.01), 
                     bias_regularizer=l2(0.01), return_sequences=True)(x)
  output = GlobalAveragePooling1D()(states)
  RNN = Model(input, output, name="secondModel")
  return RNN

In [None]:
# example to check if it is working, no train done
model_2 = secondModel(embedding_vector_length,32)
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
      0.001,
      decay_steps=100000,
      decay_rate=0.96,
      staircase=True)
  
optim = Adam(learning_rate=lr_schedule)
model_2.compile(loss='binary_crossentropy', optimizer=optim, 
                   metrics=['accuracy'])
ris = model_2.predict(x_train[0][0].reshape(-1, len(x_train[0][0])))
print(X_train.Claim[0])
print(ris[0])

###Third Model

Encode token sequences via a simple MLP layer. 

In [None]:
def thirdModel(embedding_vector_length, dim):
  
  MLP = Sequential()
  MLP.add(Embedding(len(tokenizer.vocab.keys())+1, embedding_vector_length, 
                      input_length=max_seq_length, 
                      trainable=True, 
                      mask_zero=True))
  MLP.add(Flatten())
  MLP.add(Dense(256, kernel_regularizer=l2(0.01), bias_regularizer=l2(0.01), input_shape=(embedding_vector_length*max_seq_length,), activation='relu'))
  MLP.add(Dense(64, kernel_regularizer=l2(0.01), bias_regularizer=l2(0.01), activation='relu'))
  MLP.add(Dense(dim, kernel_regularizer=l2(0.01), bias_regularizer=l2(0.01), activation='sigmoid'))

  return MLP

In [None]:
# example to check if it is working, no train done
model_3 = thirdModel(embedding_vector_length, 32)
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
      0.001,
      decay_steps=100000,
      decay_rate=0.96,
      staircase=True)
  
optim = Adam(learning_rate=lr_schedule)
model_3.compile(loss='binary_crossentropy', optimizer=optim, 
                   metrics=['accuracy'])
ris = model_3.predict(x_train[0][0].reshape(-1, len(x_train[0][0])))
print(X_train.Claim[0])
print(ris[0])

###Fourth Model

Compute the sentence embedding as the mean of its token embeddings.

In [None]:
def fourthModel(embedding_vector_length, dim=-1):  # dim used for convenience
  
  EMB = Sequential()
  EMB.add(Embedding(len(tokenizer.vocab.keys())+1, embedding_vector_length, 
                      input_length=max_seq_length, 
                      trainable=True, 
                      mask_zero=True))
  EMB.add(GlobalAveragePooling1D())
  return EMB

In [None]:
# example to check if it is working, no train done
model_4 = fourthModel(embedding_vector_length)
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
      0.001,
      decay_steps=100000,
      decay_rate=0.96,
      staircase=True)
  
optim = Adam(learning_rate=lr_schedule)
model_4.compile(loss='binary_crossentropy', optimizer=optim, 
                   metrics=['accuracy'])
ris = model_4.predict(x_train[0][0].reshape(-1, len(x_train[0][0])))
print(X_train.Claim[0])
print(ris[0])

##Merging multi-inputs

In [None]:
# example to check if it is working, no train done
emb_claim = model_1.predict(x_train[0][0].reshape(-1, len(x_train[0][0])))[1]
emb_evidence = model_1.predict(x_train[1][0].reshape(-1, len(x_train[1][0])))[1]
# 1 case
first_emb = concatenate([emb_claim[0], emb_evidence[0]])
print(first_emb)
# 2 case
second_emb = add([emb_claim[0], emb_evidence[0]])
print(second_emb)
# 3 case
third_emb = average([emb_claim[0], emb_evidence[0]])
print(third_emb)

## Cosine Similarity

In [None]:
# fourth model taken as example
model_4 = fourthModel(embedding_vector_length)
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
      0.001,
      decay_steps=100000,
      decay_rate=0.96,
      staircase=True)
  
optim = Adam(learning_rate=lr_schedule)
model_4.compile(loss='binary_crossentropy', optimizer=optim, 
                   metrics=['accuracy'])
emb_claim = model_4.predict(x_train[0][0].reshape(-1, len(x_train[0][0])))
emb_evidence = model_4.predict(x_train[1][0].reshape(-1, len(x_train[1][0])))

class_input = concatenate([emb_claim, emb_evidence])  # Concatenation as input
print("Initial classifier input shape: ", class_input.shape)

# cosine similaritty computed here
cos_sim = dot([emb_claim, emb_evidence], axes=1, normalize=True)  # cos similarity
print("\nCosine similarity: ", cos_sim)

class_input = concatenate([class_input, cos_sim])
print("\nFinal classifier input shape: ", class_input.shape)

## Model definition

In [None]:
def Classifier(embed_model, model_type, type_merge, cosine_similarity,dense_size):
  '''
  Classifier, 
  Emdbed_model: model, model used for the sentence embedding
  Model_type: string, the type of sentence embedding used 
  Type_merge: string, the type of merge used
  Cosine_similarity: bool, indicates wether use it or not
  Dense_size: int, the number of neuron used in the final Dense Layer
  '''

  input_c = Input(shape=(max_seq_length))
  input_e = Input(shape=(max_seq_length))
  embedding_c = embed_model(input_c)
  embedding_e = embed_model(input_e)

  # the first model returns the embeddings in a different position
  if model_type == "firstModel":
    embedding_c = embedding_c[1]
    embedding_e = embedding_e[1]

  # type of merge
  if type_merge == "concat":
      class_input = concatenate([embedding_c, embedding_e])
  elif type_merge == "sum":
      class_input = add([embedding_c, embedding_e])
  elif type_merge == "mean":
      class_input = average([embedding_c, embedding_e])

  # using cosine_similarity
  if cosine_similarity:
      cos_sim = dot([embedding_c, embedding_e], axes=1, normalize=True)
      class_input = concatenate([class_input, cos_sim])

  x = Dropout(0.2)(class_input)
  x = Dense(dense_size, activation="relu", kernel_regularizer=l2(0.01), bias_regularizer=l2(0.01))(x)
  output = Dense(1, activation="sigmoid")(x)

  return Model([input_c, input_e], output, name="Classifier")

## Evaluate models

In [None]:
def evaluate_f1(model, x_data, y_data):
  predictions = model.predict(x_data)
  predictions = np.round(predictions)

  metrics = [
             accuracy_score,
             partial(f1_score, pos_label=1, average='macro')
             ]
  metric_names = [
      "accuracy",
      "f1-score"
  ]
  metric_info = evaluate_predictions(predictions=np.array(predictions),
                                    y=np.array(y_data),
                                    metrics=metrics,
                                    metric_names=metric_names)
  return metric_info

In [None]:
def evaluate_predictions(predictions: np.ndarray,
                         y: np.ndarray,
                         metrics: List[Callable],
                         metric_names: List[str]):
    """
    Evaluates given model predictions on a list of metric functions

    :param predictions: model predictions in np.ndarray format
    :param y: ground-truth labels in np.ndarray format
    :param metrics: list of metric functions
    :param metric_names: list of metric names

    :return
        metric_info: dictionary containing metric values for each input metric
    """

    assert len(metrics) == len(metric_names)

    metric_info = {}

    for i, metric in enumerate(metrics):
        metric_name = metric_names[i]
        metric_value = metric(y_pred=predictions, y_true=y)
        metric_info[metric_name] = metric_value

    return metric_info

## Grid-search

In [None]:
models = {'firstModel': firstModel, 'secondModel': secondModel, 
          'thirdModel': thirdModel, 'fourthModel': fourthModel}
parameters = {'epochs': range(10, 60, 10), 
              'batch_size':[128, 256, 512, 1024, 2048],
              'dim': [32,64],
              'merge_ops': ['mean', 'concat', 'sum'],  
              'cos_sim': [True, False],                
              'start_lr': [10**(-3),10**(-2),10**(-1)],
              'dense_size' : [32,256]
              }
best_scores = {1: 0, 2: 0}
best_params = {1: dict(), 2: dict()}

for epochs in parameters['epochs']:
  print("Epochs: ", epochs)
  for start_lr in parameters['start_lr']:
    print(" Start Learning Rate: ", start_lr)
    for batch_size in parameters['batch_size']:
      print("  Batch Size: ", batch_size)
      for dim in parameters['dim']:
        print("   Dim: ", dim)
        for model_name in models.keys():
          print("    Model: ", model_name)
          for merge_ops in parameters['merge_ops']:
            print("     Merge: ", merge_ops)
            for cos_sim in parameters['cos_sim']:
              print("      Cosine Similarity: ", cos_sim)
              for dense_size in parameters['dense_size']:
                print("       Dense size: ", dense_size)
                lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
                      start_lr,
                      decay_steps=100000,
                      decay_rate=0.96,
                      staircase=True)
                optim = Adam(learning_rate=lr_schedule)
                embed_model = models[model_name](embedding_vector_length, dim)
                model = Classifier(embed_model, model_name, merge_ops, 
                                  cosine_similarity=cos_sim,dense_size=dense_size)
                model.compile(loss='binary_crossentropy', optimizer=optim, 
                              metrics=['accuracy'])

                history = model.fit([x_train[0], x_train[1]], y_train, 
                          epochs=epochs, batch_size=batch_size, verbose=0)
                
                scores = evaluate_f1(model, [x_val[0], x_val[1]], y_val)
                print("     Scores: ", scores)
                if scores['f1-score'] > best_scores[2]:
                  if scores['f1-score'] > best_scores[1]:
                    best_scores[2] = best_scores[1]
                    best_scores[1] = scores['f1-score']
                    best_params[2] = best_params[1]
                    best_params[1] = {'epochs': epochs, 'batch_size': batch_size, 
                                      'dim': dim, 'start_lr': start_lr, 
                                      'model_name': model_name, 
                                      'merge_ops': merge_ops, 'cos_sim': cos_sim,
                                      'dense_size':dense_size}
                  else:
                    best_scores[2] = scores['f1-score']
                    best_params[2] = {'epochs': epochs, 'batch_size': batch_size, 
                                      'dim': dim, 'start_lr': start_lr, 
                                      'model_name': model_name, 
                                      'merge_ops': merge_ops, 'cos_sim': cos_sim,
                                      'dense_size':dense_size}
print(best_scores)
print(best_params)
                    

# Test

In [None]:
models = {'firstModel': firstModel, 'secondModel': secondModel, 
          'thirdModel': thirdModel, 'fourthModel': fourthModel}

best_embed = best_params[1]['model_name']
best_epochs = best_params[1]['epochs']
best_batch_size = best_params[1]['batch_size']
best_dim = best_params[1]['dim']
best_start_lr = best_params[1]['start_lr']
best_merge_ops = best_params[1]['merge_ops']
best_cos_sim = best_params[1]['cos_sim']
best_dense_size = best_params[1]['dense_size']

In [None]:
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
      best_start_lr,
      decay_steps=100000,
      decay_rate=0.96,
      staircase=True)
optim = Adam(learning_rate=lr_schedule)
embed_model = models[best_embed](embedding_vector_length, best_dim)
base_model = Classifier(embed_model, best_embed, best_merge_ops, 
                        cosine_similarity=best_cos_sim,dense_size=best_dense_size)
base_model.compile(loss='binary_crossentropy', optimizer=optim, 
                   metrics=['accuracy'])
history = base_model.fit(x=[x_train[0], x_train[1]], y=y_train, 
                         validation_data=([x_val[0], x_val[1]], y_val), 
                         epochs=best_epochs, batch_size=best_batch_size)

In [None]:
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

In [None]:
predictions = base_model.predict([x_val[0], x_val[1]])
print(classification_report(y_val, np.round(predictions)))

In [None]:
base_model.evaluate([x_test[0], x_test[1]], y_test)

In [None]:
predictions = base_model.predict([x_test[0], x_test[1]])
print(classification_report(y_test, np.round(predictions)))

# Majority voting

In [None]:
# A Multi input classification evaluation
def inputClassificationEvaluation(y, predictions):
  return classification_report(np.array(y), np.array(predictions), 
                               target_names=['refutes','supports'],
                               labels=[0,1]);

# B Claim verification evaluation
def claim_verification_evaluation(X, y, predictions):
  y_final=[]
  predictions_final=[]
  ris_label = -1
  ris_predicted = -1

  X['Label'] = y
  X['Predicted'] = predictions
  claims =  X.Claim.unique()

  for el in claims:
    # get every row with same Claim
    rows = X.loc[X['Claim'] == el]
    # get an array of the real label
    label = np.array(rows.Label)
    # get an array with the predictions
    predicted = np.array(rows.Predicted)
    # check the higher number of vote
    if sum(label) >= label.size/2:
      ris_label = 1
    else:
      ris_label = 0
    if sum(predicted) >=  predicted.size/2:
      ris_predicted = 1
    else:
      ris_predicted = 0
    # append to the final result 
    y_final.append(ris_label)
    predictions_final.append(ris_predicted)
  return inputClassificationEvaluation(y_final,predictions_final)

In [None]:
predictions = base_model.predict([x_test[0], x_test[1]])
print(claim_verification_evaluation(X_test, y_test, np.round(predictions)))