In [96]:
import re
import numpy as np

from nltk.stem.porter import PorterStemmer
from typing import List, Set, Dict, Tuple, NewType, Optional
from scipy.sparse import dok_matrix
from sklearn.model_selection import ShuffleSplit
from sklearn.svm import SVC
from functools import partial
from nltk.corpus import stopwords
import csv

In [121]:
class NoStemmer():
    def __init__(self):
        pass
    
    def stem(self, token:str) -> str:
        return token

class BOW():
    def __init__(self):
        self.token_to_id = dict()
        self.id_to_token = dict()
    
    @property
    def nr_tokens(self) -> int:
        return len(self.token_to_id)
    
    @property
    def oov_token(self) -> int:
        return len(self.token_to_id)
    
    def transform_tokenized_sent(self, tokenized_sent:List[str]) -> List[int]:
        output = []
        for token in tokenized_sent:
            if token not in self.token_to_id:
                # print('OOV Token detected') # debug stmt
                output.append(self.oov_token)
            else:
                output.append(self.token_to_id[token])
        return output
    
    def add_transform_tokenized_sents(self, tokenized_sents:List[List[str]]) -> List[List[int]]:
        output = []
        for sent in tokenized_sents:
            self.add_list_of_tokens(sent)
            output.append(self.transform_tokenized_sent(sent))
        return output
    
    def transform_tokenized_sents(self, tokenized_sents:List[List[str]]) -> List[List[int]]:
        output = []
        for sent in tokenized_sents:
            output.append(self.transform_tokenized_sent(sent))
        return output
    
    def reverse_transform_sent(self, id_sent:List[int]) -> List[str]:
        output = []
        for token_id in id_sent:
            output.append(self.id_to_token[token_id])
        return output
    
    def reverse_transform_sents(self, id_sents:List[List[str]]) -> List[List[int]]:
        output = []
        for sent in id_sents:
            output.append(self.reverse_transform_sent(sent))
        return output
        
    def add_token(self, token:str):
        if token not in self.token_to_id:
            nr_tokens = len(self.token_to_id)
            self.token_to_id[token] = nr_tokens
            self.id_to_token[nr_tokens] = token
    
    def add_list_of_tokens(self, tokens:List[str]):
        for token in tokens:
            self.add_token(token)
            
    def get_token_id(self, token:str) -> int:
        if token not in self.token_to_id:
            print("Token " + token + " not in the BOW")
            return -1
        else:
            return self.token_to_id[token]
    
    def get_token_from_id(self, token_id:int) -> str:
        if token_id not in self.id_to_token:
            print("Token id " + str(token_id) + " is not in the BOW.")
            return ""
        else:
            return self.id_to_token[token]
        

class SimpleTokenizer():
    def __init__(self, pattern:str):
        """Initialise the regular expression which will be used to tokenize our expression.

        Args:
            pattern (str): pattern to be used.
        """
        self.regexp = re.compile(pattern, re.MULTILINE | re.DOTALL)
    
    def tokenize_text_lines(self, text_lines:List[str]) -> List[str]:
        """Accepts a list of strings. Tokenizes each string and creates a list of the tokens.

        Args:
            text_lines (List[str]): List of strings.

        Returns:
            List[str]: List of tokens produced from the input strings.
        """
        tokens = []
        for line in text_lines:
            tokens += self.regexp.findall(line)
        return tokens
    
    def tokenize_list_of_strings(self, string_list:List[str]) -> List[List[str]]:
        list_of_tokens = []
        for string in string_list:
            list_of_tokens.append(self.regexp.findall(string))
        return list_of_tokens

def construct_stopwords_set(stopwords_file_name:str) -> Set[str]:
    """Reads stopwords from stopwords_file_name and saves them in a set.

    Args:
        stopwords_file_name (str): Stop words file.

    Returns:
        Set[str]: [description]
    """
    with open(stopwords_file_name, 'r') as f:
        read_stopwords = f.read().splitlines()
    stopwords_set = set(read_stopwords)
    stopwords_set.update(stopwords.words("english"))
    return stopwords_set

class SimplePreprocessor():
    """Class for pre-processing text. Given a list of strings, it tokenizes them, removes stop words, lowercases and stems them.
    """
    def __init__(self, tokenizer:SimpleTokenizer, stop_words_set:Set[str], stemmer:PorterStemmer):
        self.tokenizer = tokenizer
        self.stop_words_set = stop_words_set
        self.stemmer = stemmer
    
    @staticmethod
    def lowercase_word(word:str) -> str:
        return str.lower(word)
    
    def remove_stop_words_lowercase_and_stem(self, tokens:List[str]) -> List[str]:
        final_tokens = []
        for token in tokens:
            lowercase_token = SimplePreprocessor.lowercase_word(token)
            if lowercase_token not in self.stop_words_set:
                stemmed_token = self.stemmer.stem(lowercase_token)
                final_tokens.append(stemmed_token)
        return final_tokens
    
    def process_text_lines(self, text_lines:List[str]) -> List[str]:
        tokens = self.tokenizer.tokenize_text_lines(text_lines)
        tokens = self.remove_stop_words_lowercase_and_stem(tokens)
        return tokens

def read_tsv_extract_corpora(tsv_file_name:str, corpus_names_to_int:Dict[str, int]) -> Dict[int, List[str]]:
    corpora = dict()
    for value in corpus_names_to_int.values():
        corpora[value] = []
    with open(tsv_file_name, mode='r', newline='\n') as f:
        read_tsv = csv.reader(f, delimiter="\t")
        for row in read_tsv:
            corpus_name = row[0]
            corpus_id = corpus_names_to_int[corpus_name]
            corpora[corpus_id].append(row[1])
    return corpora

def preprocess_corpora(corpora:Dict[int, List[str]], 
                       preprocessor:SimplePreprocessor) -> Dict[int, List[List[str]]]:
    preprocessed_corpora = dict()
    for key in corpora.keys():
        preprocessed_corpora[key] = []
        for document in corpora[key]:
            document_terms = preprocessor.process_text_lines([document])
            preprocessed_corpora[key].append(document_terms)
    return preprocessed_corpora

def tokenize_corpora(corpora:Dict[int, List[str]], 
                     tokenizer:SimpleTokenizer) -> Dict[int, List[List[str]]]:
    tokenized_corpora = dict()
    for key in corpora.keys():
        tokenized_corpora[key] = []
        for document in corpora[key]:
            document_terms = tokenizer.tokenize_text_lines([document])
            tokenized_corpora[key].append(document_terms)
    return tokenized_corpora

def docs_to_bow_sents(docs:List[List[str]], ref_bow:Optional[BOW]=None) -> Tuple[List[List[int]], BOW]:
    if ref_bow is None:
        bow = BOW()
        bow_sents = bow.add_transform_tokenized_sents(docs)
        return bow_sents, bow 
    else:
        bow_sents = ref_bow.transform_tokenized_sents(docs)
        return bow_sents, _

def bow_sents_to_dok(bow_sents:List[List[int]], bow:BOW) -> dok_matrix:
    nr_tokens = bow.nr_tokens + 1 # extra token for oov words.
    dok = dok_matrix((len(bow_sents), nr_tokens), dtype='int')
    for sent_number, sent in enumerate(bow_sents):
        for token_id in sent:
            dok[sent_number, token_id] += 1
    return dok

def split_train_dev_improved(train_dev_corpora:Dict[int, List[List[str]]], corpus_ids:Set[int], 
                    percentage_dev:Optional[float]=0.1) -> Tuple[List[List[str]], List[int], List[List[str]], List[int]]:
    train_set = []
    train_labels = []
    dev_set = []
    dev_labels = []
    splitter = ShuffleSplit(1, test_size=percentage_dev, random_state=0)
    for corpus_id in corpus_ids:
        corpus_docs = train_dev_corpora[corpus_id]
        train_indices, dev_indices = list(splitter.split(corpus_docs))[0]
        
        set_train_idx = set(train_indices)
        for dev_index in dev_indices:
            if dev_index in set_train_idx:
                print('WRONG YO')
            
        
        for train_index in train_indices:
            train_set.append(corpus_docs[train_index])
            train_labels.append(corpus_id)
        
        for dev_index in dev_indices:
            dev_set.append(corpus_docs[dev_index])
            dev_labels.append(corpus_id)
    
    return train_set, train_labels, dev_set, dev_labels

def split_train_dev_baseline(train_dev_corpora:Dict[int, List[List[str]]], corpus_ids:Set[int], 
                    percentage_dev:Optional[float]=0.1) -> Tuple[List[List[str]], List[int], List[List[str]], List[int]]:
    all_docs = []
    all_labels = []
    train_set = []
    train_labels = []
    dev_set = []
    dev_labels = []
    splitter = ShuffleSplit(1, test_size=percentage_dev, random_state=0)
    for corpus_id in corpus_ids:
        all_docs += train_dev_corpora[corpus_id]
        all_labels += [corpus_id] * len(train_dev_corpora[corpus_id])
        
    train_indices, dev_indices = list(splitter.split(all_docs))[0]
#    set_train_idx = set(train_indices)
#     for dev_index in dev_indices:
#         if dev_index in set_train_idx:
#             print('WRONG YO')  
    for train_index in train_indices:
        train_set.append(all_docs[train_index])
        train_labels.append(all_labels[train_index])
        
    for dev_index in dev_indices:
        dev_set.append(all_docs[dev_index])
        dev_labels.append(all_labels[dev_index])
    
    return train_set, train_labels, dev_set, dev_labels

def split_test(test_corpora:Dict[int, List[List[str]]], corpus_ids:Set[int]) -> Tuple[List[List[str]], List[int]]:
    test_docs = []
    test_labels = []
    for corpus_id in corpus_ids:
        for test_doc in test_corpora[corpus_id]:
            test_docs.append(test_doc)
            test_labels.append(corpus_id)
    return test_docs, test_labels

def compute_prf_scores(true_labels, pred_labels, value):
    TP = 0
    FN = 0
    FP = 0
    for idx in range(len(true_labels)):
        true = true_labels[idx]
        pred = pred_labels[idx]
        if true == value and pred == value:
            TP += 1
        elif true == value and pred != value:
            FN += 1
        elif true != value and pred == value:
            FP += 1
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)
    f1 = 2 * precision * recall / (precision + recall)
    return precision, recall, f1

def create_row(true:List[int], pred:List[int], system:str, split:str) -> str:
    list_row = [system, split]
    p_q, r_q, f_q = compute_prf_scores(true, pred, 0)
    p_ot, r_ot, f_ot = compute_prf_scores(true, pred, 1)
    p_nt, r_nt, f_nt = compute_prf_scores(true, pred, 2)
    p_macro = (p_q + p_ot + p_nt)/3
    r_macro = (r_q + r_ot + r_nt)/3
    f_macro = (f_q + f_ot + f_nt)/3
    values = [p_q, r_q, f_q, p_ot, r_ot, f_ot, p_nt, r_nt, f_nt, p_macro, r_macro, f_macro]
    round_3 = partial(round, ndigits=3)
    values = map(round_3, values)
    values = map(str, values)
    list_row += values
    row = ','.join(list_row)
    row += '\n'
    return row
    
def create_classification_output_file(output_file_name, 
                                      baseline_train_true, baseline_train_pred, 
                                      baseline_dev_true, baseline_dev_pred,
                                      baseline_test_true, baseline_test_pred, 
                                      improved_train_true, improved_train_pred,
                                      improved_dev_true, improved_dev_pred,
                                      improved_test_true, improved_test_pred
                                     ):
    dictionary = dict()
    keys_1 = ['baseline', 'improved']
    keys_2 = ['train', 'dev', 'test']
    dictionary['baseline'] = dict()
    dictionary['improved'] = dict()
    dictionary['baseline']['train'] = (baseline_train_true, baseline_train_pred)
    dictionary['baseline']['dev'] = (baseline_dev_true, baseline_dev_pred)
    dictionary['baseline']['test'] = (baseline_test_true, baseline_test_pred)
    dictionary['improved']['train'] = (improved_train_true, improved_train_pred)
    dictionary['improved']['dev'] = (improved_dev_true, improved_dev_pred)
    dictionary['improved']['test'] = (improved_test_true, improved_test_pred)
    # Current assumption: Quran:0, OT:1, NT:2.
    with open(output_file_name, "w") as f:
        f.write('system,split,p-quran,r-quran,f-quran,p-ot,r-ot,f-ot,p-nt,r-nt,f-nt,p-macro,r-macro,f-macro\n')
        for system in keys_1:
            for split in keys_2:
                true, pred = dictionary[system][split]
                row = create_row(true, pred, system, split)
                f.write(row)     

In [122]:
tokenizer = SimpleTokenizer('[a-zA-Z]+')

tsv_file_name = 'train_and_dev.tsv'
test_tsv_file_name = 'test.tsv'
stopwords_file_name = "englishST.txt"
output_file_name = "classification.csv"

corpus_names_to_int = {'Quran':0, 'OT':1, 'NT':2}
int_to_corpus_names = {0:'Quran', 1:'OT', 2:'NT'}
corpus_ids = set([0, 1, 2])
corpora = read_tsv_extract_corpora(tsv_file_name, corpus_names_to_int)
test_corpora = read_tsv_extract_corpora(test_tsv_file_name, corpus_names_to_int)

In [None]:
tokenized_corpora = tokenize_corpora(corpora, tokenizer)
test_tokenized_corpora = tokenize_corpora(test_corpora, tokenizer)

baseline_train_docs, baseline_train_labels, baseline_dev_docs, baseline_dev_labels = split_train_dev_baseline(tokenized_corpora, corpus_ids)
baseline_test_docs, baseline_test_labels = split_test(test_tokenized_corpora, corpus_ids)

baseline_train_bow_sents, bow = docs_to_bow_sents(baseline_train_docs)
baseline_dev_bow_sents = docs_to_bow_sents(baseline_dev_docs, bow)[0]
baseline_test_bow_sents = docs_to_bow_sents(baseline_test_docs, bow)[0]

baseline_train_dok = bow_sents_to_dok(baseline_train_bow_sents, bow)
baseline_dev_dok = bow_sents_to_dok(baseline_dev_bow_sents, bow)
baseline_test_dok = bow_sents_to_dok(baseline_test_bow_sents, bow)

In [111]:
baseline_model = SVC(C=1000)
baseline_model.fit(baseline_train_dok, baseline_train_labels)
baseline_train_pred = baseline_model.predict(baseline_train_dok)
baseline_dev_pred = baseline_model.predict(baseline_dev_dok)
baseline_test_pred = baseline_model.predict(baseline_test_dok)

In [126]:
tokenizer = SimpleTokenizer('[a-zA-Z]+')

preprocessed_corpora = tokenize_corpora(corpora, tokenizer)
test_preprocessed_corpora = tokenize_corpora(test_corpora, tokenizer)

improved_train_docs, improved_train_labels, improved_dev_docs, improved_dev_labels = split_train_dev_improved(preprocessed_corpora, corpus_ids)
improved_test_docs, improved_test_labels = split_test(test_preprocessed_corpora, corpus_ids)

improved_train_bow_sents, improved_bow = docs_to_bow_sents(improved_train_docs)
improved_dev_bow_sents = docs_to_bow_sents(improved_dev_docs, improved_bow)[0]
improved_test_bow_sents = docs_to_bow_sents(improved_test_docs, improved_bow)[0]

improved_train_dok = bow_sents_to_dok(improved_train_bow_sents, improved_bow)
improved_dev_dok = bow_sents_to_dok(improved_dev_bow_sents, improved_bow)
improved_test_dok = bow_sents_to_dok(improved_test_bow_sents, improved_bow)

In [127]:
improved_model = SVC(C=1000)
improved_model.fit(improved_train_dok, improved_train_labels)
improved_train_pred = improved_model.predict(improved_train_dok)
improved_dev_pred = improved_model.predict(improved_dev_dok)
improved_test_pred = improved_model.predict(improved_test_dok)

In [128]:
create_classification_output_file(output_file_name,
                                 baseline_train_labels, baseline_train_pred,
                                 baseline_dev_labels, baseline_dev_pred,
                                 baseline_test_labels, baseline_test_pred,
                                 improved_train_labels, improved_train_pred,
                                 improved_dev_labels, improved_dev_pred,
                                 improved_test_labels, improved_test_pred)