In [30]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [31]:
import tensorflow as tf
import pandas as pd
import nltk
import numpy as np
import re
import os
import requests
import zipfile
import json
import glob
from urllib import request
from tqdm import tqdm
import itertools
from functools import reduce


nltk.download('punkt')


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [32]:
# Load dataset
input_file_name = 'ED_trial.csv'

test_data = pd.read_csv(input_file_name)

In [33]:
REPLACE_BY_SPACE_RE = re.compile(r'[/(){}\[\]\|@`\']')
REMOVE_BEGINNING_EVIDENCES_RE = re.compile(r'\b[0-9]{1,}')
REMOVE_REF_EVIDENCE_RE = re.compile(r'\[REF|ref\]?\.?')
SPLIT_COMPOUND_RE = re.compile(r'\w+(-)\w+')
GOOD_SYMBOLS_RE = re.compile(r'[^0-9a-z \.;]')

def lower(text: str) -> str:
    """
    Transforms given text to lower case.
    Example:
    Input: 'I really like New York city'
    Output: 'i really like new your city'
    """
    return text.lower()

def remove_beginning_evidence(text: str) -> str:
    """
    Removes the beginning evidence from the text.
    Example:
    Input: '1. I really like New York city'
    Output: 'I really like New York city'
    """
    return REMOVE_BEGINNING_EVIDENCES_RE.sub('', text)

def split_compound_words(text: str) -> str:
    """
    Splits compound words in the text.
    Example:
    Input: 'I really like Winston-Salem city'
    Output: 'I really like Winston Salem city'
    """
    return SPLIT_COMPOUND_RE.sub(' ', text)

def replace_special_characters(text: str) -> str:
    """
    Replaces special characters, such as paranthesis,
    with spacing character
    """
    return REPLACE_BY_SPACE_RE.sub(' ', text)

def remove_reference_markers(text: str) -> str:
    """
    Remove [REF] or [REF at the at of evidence texts
    """
    if isinstance(text, str):
        return REMOVE_REF_EVIDENCE_RE.sub('', text)
    else:
        return ""

def filter_out_uncommon_symbols(text: str) -> str:
    """
    Removes any special character that is not in the
    good symbols list (check regular expression)
    """
    return GOOD_SYMBOLS_RE.sub('', text)

def strip_text(text: str) -> str:
    """
    Removes any left or right spacing (including carriage return) from text.
    Example:
    Input: '  This assignment is cool\n'
    Output: 'This assignment is cool'
    """
    return text.strip()

PREPROCESSING_PIPELINE_CLAIM = [
                          lower,
                          replace_special_characters,
                          split_compound_words,
                          filter_out_uncommon_symbols,
                          strip_text
                          ]

PREPROCESSING_PIPELINE_EVIDENCE = [
                          remove_reference_markers,
                          remove_beginning_evidence,
                          lower,
                          replace_special_characters,
                          split_compound_words,
                          filter_out_uncommon_symbols,
                          strip_text
                          ]


def text_prepare(text, filter_methods):
    """
    Applies a list of pre-processing functions in sequence (reduce).
    Note that the order is important here!
    """
    return reduce(lambda txt, f: f(txt), filter_methods, text)


#Preprocess the Claim and Evidence the same way that the training dataset was preprocessed on which the model was trained.

print('Pre-processing text...')

test_data['Claim'] = test_data['Claim'].apply(lambda txt: text_prepare(txt, PREPROCESSING_PIPELINE_CLAIM))
test_data['Evidence'] = test_data['Evidence'].apply(lambda txt: text_prepare(txt, PREPROCESSING_PIPELINE_EVIDENCE))

print("Pre-processing completed!")

Pre-processing text...
Pre-processing completed!


In [34]:
class NotAdaptedError(Exception):
    pass


class TextVectorizer:
    def __init__(
        self,
        glove_url="http://nlp.stanford.edu/data/glove.6B.zip",
        embedding_dim=100,
        embedding_folder="glove"
    ):
        """
        This class parses the GloVe embeddings, the input documents are expected
        to be in the form of a list of lists.
        [["word1", "word2", ...], ["word1", "word2", ...], ...]

        Parameters
        ----------
        glove_url : The url of the GloVe embeddings.
        embedding_dim : The dimension of the embeddings (pick one of 50, 100, 200, 300).
        embedding_folder : folder where the embedding will be downloaded
        """
        self.embedding_dim = embedding_dim
        self.download_glove_if_needed(
            glove_url=glove_url, embedding_folder=embedding_folder
        )

        # create the embeddings vocabulary
        self.vocabulary = self.parse_glove(embedding_folder)

    def download_glove_if_needed(self, glove_url, embedding_folder):
        """
        Downloads the glove embeddings from the internet

        Parameters
        ----------
        glove_url : The url of the GloVe embeddings.
        embedding_folder: folder where the embedding will be downloaded
        """
        # create embedding folder if it does not exist
        if not os.path.exists(embedding_folder):
            os.makedirs(embedding_folder)

        # extract the embedding if it is not extracted
        if not glob.glob(
            os.path.join(embedding_folder, "**/glove*.txt"), recursive=True
        ):

            # download the embedding if it does not exist
            embedding_zip = os.path.join(embedding_folder, glove_url.split("/")[-1])
            print(embedding_zip)
            if not os.path.exists(embedding_zip):
                print("Downloading the GloVe embeddings...")
                request.urlretrieve(glove_url, embedding_zip)
                print("Successful download!")

            # extract the embedding
            print("Extracting the embeddings...")
            with zipfile.ZipFile(embedding_zip, "r") as zip_ref:
                zip_ref.extractall(embedding_folder)
                print("Successfully extracted the embeddings!")
            os.remove(embedding_zip)

    def parse_glove(self, embedding_folder):
        """
        Parses the GloVe embeddings from their files, filling the vocabulary.

        Parameters
        ----------
        embedding_folder : folder where the embedding files are stored

        Returns
        -------
        dictionary representing the vocabulary from the embeddings
        """
        print("Creating glove vocabulary...")
        vocabulary = {"<pad>": np.zeros(self.embedding_dim)}
        embedding_file = os.path.join(
            embedding_folder, "glove.6B." + str(self.embedding_dim) + "d.txt"
        )
        print(embedding_file)
        with open(embedding_file, encoding="utf8") as f:
            for line in f:
                word, coefs = line.split(maxsplit=1)
                coefs = np.fromstring(coefs, "f", sep=" ")
                vocabulary[word] = coefs
        return vocabulary
    def adapt(self, dataset, columns):
        """
        Computes the OOV words for a single data split, and adds them to the vocabulary and recreate the dictionary of index encodings.
        Then build the embedding matrix.

        Parameters
        ----------
        dataset : The data split (might be training set, validation set, or test set).
        columns : The columns to be adapted.

        Returns
        ----------
        The embedding matrix of shape (vocabulary_size, embedding_dim)
        """
        # create a set containing words from the documents in a given data split
        words = {word for column in columns for sentence in dataset[column] for word in nltk.word_tokenize(sentence)}
        oov_words = words - self.vocabulary.keys()

        # add the OOV words to the vocabulary giving them a random encoding
        for word in oov_words:
            self.vocabulary[word] = np.random.uniform(-1, 1, size=self.embedding_dim)

        # create the dictionary of index encodings for the words in the embedding vocabulary (idx 0 is reserved for padding)
        self.word_to_idx = {word: i for i, word in enumerate(self.vocabulary.keys())}
        self.idx_to_word = {i: word for i, word in enumerate(self.vocabulary.keys())}

        # the embedding matrix shape will be (vocabulary_size, embedding_dim)
        self.embedding_matrix = np.array(list(self.vocabulary.values()))
        print(f"Generated embeddings for {len(oov_words)} OOV words.")

    def transform(self, dataset, columns):
        """
        Transform the data into the input structure for the training. This method should be used always after the adapt method.

        Parameters
        ----------
        dataset : The data split (might be training set, validation set, or test set).
        columns : The columns to be transformed.

        Returns
        -------
        Pair of docuemnts into idx sequences
        """
        X_claim, X_evidence = [], []
        for _, row in tqdm(dataset.iterrows(), total=len(dataset), desc="Converting data into idx sequences..."):
            X_claim.append(self._transform_document_to_encoding(row["Claim"]))
            X_evidence.append(self._transform_document_to_encoding(row["Evidence"]))
        return X_claim, X_evidence


    def _transform_document_to_encoding(self, document):
        """
        Transforms a single document to a list of word encodings.

        Parameters
        ----------
        document : The document to be transformed.

        Returns
        -------
        List of word encodings
        """
        try:
            return [self.word_to_idx[word] for word in nltk.word_tokenize(document)]
        except KeyError:
            raise NotAdaptedError(
                f"The whole document is not in the vocabulary. Please adapt the vocabulary first."
            )

In [35]:
def encode_input(dataset, columns, vectorizer, is_training=False, max_tokens=None):
    """
    Convert the text into a given dataset split into idx sequeces.

    Parameters
    ----------
    dataset : The data split (might be training set, validation set, or test set).
    columns : The columns to be converted.
    vectorizer : The vectorizer to be used.
    is_training : Whether input texts are from the training split or not
    max_tokens : The max token sequence previously computed with

    Return
    ---------
    X_claim: a numpy array of shape (num_documents, max_tokens) representing claims
    X_evidence: a numpy array of shape (num_documents, max_tokens) representing evidences
    max_seq_length: the max token sequence computed with training texts.
    """

    # compute embeddings for terms in the dataset that are out of vocabulary and add them
    vectorizer.adapt(dataset, columns)

    # use the vocabulary of word_to_idx built to convert the claim and the evidences into idx sequences
    X_claim, X_evidence = vectorizer.transform(dataset, columns)

    # compute max_tokens
    if is_training:
        max_tokens = int(np.quantile([len(seq) for seq in X_claim+X_evidence], 0.999))
    else:
        assert max_tokens is not None

    # apply padding to idx sequences
    X_claim = [seq + [0] * (max_tokens - len(seq)) for seq in X_claim]
    X_evidence = [seq + [0] * (max_tokens - len(seq)) for seq in X_evidence]
    X_claim = np.stack([seq[:max_tokens] for seq in X_claim])
    X_evidence = np.stack([seq[:max_tokens] for seq in X_evidence])


    if is_training:
        return X_claim, X_evidence, max_tokens
    else:
        return X_claim, X_evidence


def encode_target(target_series):
    """
    Encodes the target column of the dataset
    """
    return target_series.apply(lambda x: 1 if x == 1 else 0)

In [36]:
# initialize the vectorizer
embedding_dim = 300
max_tokens = 118 # Determined from the training samples when encoding them
PATH_TO_GLOVE = './drive/MyDrive/glove'

vectorizer = TextVectorizer(embedding_dim=embedding_dim, embedding_folder=PATH_TO_GLOVE) # Remove the embedding_folder and it will download from standford but sometimes the website is down in which case the glove embeddings saved in google drive are used

input_columns = ["Evidence", "Claim"]
target_column = "label"

# Encoding the test data
print("\nTEST SET:")
X_claim_test, X_evidence_test = encode_input(test_data, columns=input_columns, vectorizer=vectorizer, max_tokens=max_tokens) # Encoder the testing data

print("\nEmbedding matrix shape: {}".format(vectorizer.embedding_matrix.shape))

Creating glove vocabulary...
./drive/MyDrive/glove/glove.6B.300d.txt

TEST SET:
Generated embeddings for 6 OOV words.


Converting data into idx sequences...: 100%|██████████| 50/50 [00:00<00:00, 1749.43it/s]


Embedding matrix shape: (400008, 300)





In [37]:
models_path_2 = './drive/MyDrive/'
model_base = tf.keras.models.load_model(os.path.join(models_path_2, "bi_lstm_pooling"))

In [38]:
import csv
from collections import Counter

def evaluate_model(model, X_claim_test, X_evidence_test, file_name='Group_16_B.csv'):
    """
    Show classification report using model to predict output on X_test,
    write predictions to a CSV file, and count the number of predictions as 1s and 0s.
    """
    # Predicting the outputs
    y_pred = model.predict({"claim": X_claim_test, "evidence": X_evidence_test})
    y_pred = [1 if y > 0.5 else 0 for y in y_pred]

    # Writing predictions to a CSV file
    with open(file_name, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['prediction'])  # Writing the header
        for pred in y_pred:
            writer.writerow([pred])  # Writing each prediction on a new row

    # Counting the number of 1s and 0s
    counts = Counter(y_pred)
    print(f"Number of 1s (SUPPORTED): {counts[1]}")
    print(f"Number of 0s (REFUTED): {counts[0]}")
    return y_pred

In [39]:
predictions_test = evaluate_model(model_base,X_claim_test,X_evidence_test,"predictions.csv")

Number of 1s (SUPPORTED): 15
Number of 0s (REFUTED): 35


In [40]:
# y_test = encode_target(test_data[target_column])

In [41]:
# from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# # Calculate Accuracy
# accuracy = accuracy_score(y_test, predictions_test)
# print(f"Accuracy: {accuracy:.2f}")

# # Calculate Precision
# precision = precision_score(y_test, predictions_test)
# print(f"Precision: {precision:.2f}")

# # Calculate Recall
# recall = recall_score(y_test, predictions_test)
# print(f"Recall: {recall:.2f}")

# # Calculate F1 Score
# f1 = f1_score(y_test, predictions_test)
# print(f"F1 Score: {f1:.2f}")


Accuracy: 0.92
Precision: 0.87
Recall: 0.87
F1 Score: 0.87
