In [6]:
import spacy
from spacy.matcher import Matcher
from zipfile import ZipFile
from pathlib import Path
from tqdm import autonotebook as tqdm
import os
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from spacy.tokens import Doc
from spacy.matcher import Matcher
import re
from collections import Counter
from collections.abc import Iterable


In [7]:
nlp = spacy.load("en_core_web_sm")
data_dir = Path("./data/teaching-dataset")
with (data_dir / "relation_extraction_text_train.zip").open("rb") as file:
    zip_file = ZipFile(file)
    with zip_file.open("input.txt") as f:
        sentences = [
            sentence.split("\n") for sentence in f.read().decode("utf-8").split("\n\n")
        ]
with (data_dir / "relation_extraction_references_train.zip").open("rb") as file:
    zip_file = ZipFile(file)
    with zip_file.open("references.txt") as f:
        labels = []
        for line in f.read().decode("utf-8").split("\n"):
            relations = []
            for relation in re.finditer(r"\(\((\d+),(\d+)\),\((\d+),(\d+)\)\)", line):
                relation = (
                    (int(relation.group(1)), int(relation.group(2))),
                    (int(relation.group(3)), int(relation.group(4))),
                )
                relations.append(relation)
            labels.append(relations)
assert len(sentences) == len(labels)
doc = nlp(Doc(nlp.vocab, words=sentences[0]))
doc

The longest serving spacecraft goes into retirement . 

In [8]:
train_sentences, test_sentences, train_labels, test_labels =  train_test_split(sentences, labels, test_size=0.2, random_state=42)
print(len(train_sentences))
print(len(test_sentences))

748
188


In [9]:
# same here
class is_causal_predictor:
    def __init__(self, sentences, labels, n=None):
        self.sentences = sentences
        self.labels = labels
        self.n = n
        self.test_sentences = test_sentences
        self.test_labels = test_labels
        
        self.init_words()
        if n != None:
            self.causal_cues = self.get_causal_cues(self.n)


    def init_words(self):
        self.words = []
        self.nonCausalWords = []

        for label, sentence in zip(self.labels, self.sentences):
            if type(sentence) == list:
                sentence = ' '.join(sentence)
            if "B-EVENT" in label: #if sentence is causal
                doc = nlp(sentence)
                wordsHelp = [token.text for token in doc if not token.is_stop and not token.is_punct and token.pos_ != "NOUN" and token.pos_ != "ADJ"] 
                self.words.extend(wordsHelp) #append all words to a list if they are NOT nouns & NOT punctuation & NOT adjectives

            else: #if sentence is NOT causal
                doc = nlp(sentence)
                wordsHelp = [token.text for token in doc if not token.is_stop and not token.is_punct and token.pos_ != "NOUN" and token.pos_ != "ADJ"] 
                self.nonCausalWords.extend(wordsHelp)  
    
    # returns n best causal cues
    def get_causal_cues(self, n):
        def flatten(lis): #pretty ugly solution but we have to flatten the list since every new sentence adds "[]" which Counter can't deal with
            for item in lis:
                if isinstance(item, Iterable) and not isinstance(item, str):
                    for x in flatten(item):
                        yield x
                else:        
                    yield item
        
        def get_n_lemmata(causal_freq, n):
            # sort words
            sorted_words = np.array(causal_freq.most_common(len(causal_freq)))[:,0]
            converted_return = []
            for word in sorted_words:
                # lemmatize
                doc = nlp(str(word))
                word = " ".join([token.lemma_ for token in doc])
                if not word in converted_return:
                    converted_return.append(word)
                # break if n lemmata found
                if len(converted_return)==n:
                    break
            return converted_return

        causal_freq = Counter(self.words)
        nonCausal_freq = Counter(self.nonCausalWords)
        for word in causal_freq:
            causal_freq[word] = causal_freq[word]/(nonCausal_freq[word]+1)
        return get_n_lemmata(causal_freq, n)


    def predict_causality(self, sentence):
        if type(sentence) == list: # convert to str
            sentence = ' '.join(sentence)
        matcher = Matcher(nlp.vocab)
        pattern = [[{"LEMMA": cue}] for cue in self.causal_cues]
        matcher.add("CAUSAL", pattern)
        doc = nlp(sentence)
        matches = matcher(doc)
        return bool(matches)


    # Find best value for n given a testset and initialize causal cues according to best n
    def init_causal_cues_best_n(self, test_sentences, test_labels, step_size=20):
        new_labels = []
        for label in test_labels:
            if "B-EVENT" in label:
                new_labels.append(1)
            else:
                new_labels.append(0)

        old_f1 = 0
        f1 = 0
        n = 0
        while f1 >= old_f1:
            n = n+step_size
            predictions = []

            # predict
            self.causal_cues = self.get_causal_cues(n)
            for sentence, l in zip(test_sentences, test_labels):
                p = self.predict_causality(sentence)
                predictions.append(p)

            # evaluate
            old_f1 = f1
            tp = sum([int(p) == 1 and int(l) == 1 for p, l in zip(predictions, new_labels)])
            fp = sum([int(p) == 1 and int(l) == 0 for p, l in zip(predictions, new_labels)])
            fn = sum([int(p) == 0 and int(l) == 1 for p, l in zip(predictions, new_labels)])
            precision = tp / (tp + fp)
            recall = tp / (tp + fn)
            f1 = 2 * precision * recall / (precision + recall)

        self.f1 = old_f1
        self.n = n-20
        self.causal_cues = self.get_causal_cues(self.n)