<a href="https://colab.research.google.com/github/adefgreen98/NLU2021-Assignment1/blob/main/code/Assignment1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Natural Language Understanging Assignment 1 - Dependency Grammars
_Federico Pedeni_, 223993

### Requirements & Test Sentences

In [1]:
import spacy
from typing import Union

nlp_spacy = spacy.load('en')

In [2]:
sentences = "i saw the man with the telescope"
doc_spacy = nlp_spacy(sentences)

test_sentences = {
    'subtree': ["saw", "I saw a woman that saw a man who saw me yesterday"],
    'check': ["with the telescope", "telescope with the"],
    'head': "the man with the telescope",
    'info': "I gave Pooh all my honey. Also, I told Tigger a good bedtime story. Cristopher Robin was relieved to see all these friends."
}

### 1) Extract a path of dependency relations from ROOT to a token
This function extracts a list of paths that allow to traverse the dependency graph from the root to each node of the tree. Each dependency relation is stated as a `3-tuple` containing `(head, child, type of dependency)`; for this reason, for each list, the `child` value of a tuple is also the `head` value of the subsequent tuple.

For each sentence in the given SpaCy `Doc`, it is created a `dict` that maps each token to its path; for the `root` node it is reported only a recursive relation. Tokens that occurr multiple times inside a sentence are distinguished thanks to __token offsets__ that are part of the dictionary's keys.

Dictionaries' values are lists of dependency relations: while iterating over a sentence, for each key is initialized an empty list and new relations are progressively addedd in a backwards-recursive manner, substituting the token with its head until the root node is reached. At this point, the root-to-itself relation is added and the list is reversed.

This function returns a list where dictionaries at each index refer to the corresponding sentences in the `Doc`.

In [3]:
def get_dependencies(doc:spacy.tokens.Doc=doc_spacy):
    # iterates over all sentences in the doc
    rlist = []
    for sentence in doc.sents:
        res = {}
        rt = sentence.root
        # gets offset for the sentence, so that keys will be indexed independently for each sentence
        offset = sentence[0].i
        for wd in sentence:
            token = wd
            # forms the key
            k = wd.text + f'<{wd.i - offset}>'
            res[k] = []
            while rt != token:
                res[k].append((token.head.text, token.text, token.dep_))
                token = token.head
            # at this point the root should add itself
            res[k].append((token.head.text, token.text, token.dep_))
            res[k].reverse()
        rlist.append(res)
    return rlist

print(*get_dependencies()[0].items(), sep='\n')

('i<0>', [('saw', 'saw', 'ROOT'), ('saw', 'i', 'nsubj')])
('saw<1>', [('saw', 'saw', 'ROOT')])
('the<2>', [('saw', 'saw', 'ROOT'), ('saw', 'man', 'dobj'), ('man', 'the', 'det')])
('man<3>', [('saw', 'saw', 'ROOT'), ('saw', 'man', 'dobj')])
('with<4>', [('saw', 'saw', 'ROOT'), ('saw', 'man', 'dobj'), ('man', 'with', 'prep')])
('the<5>', [('saw', 'saw', 'ROOT'), ('saw', 'man', 'dobj'), ('man', 'with', 'prep'), ('with', 'telescope', 'pobj'), ('telescope', 'the', 'det')])
('telescope<6>', [('saw', 'saw', 'ROOT'), ('saw', 'man', 'dobj'), ('man', 'with', 'prep'), ('with', 'telescope', 'pobj')])


### 2) Extract a subtree of dependants given a token

This exercise can be solved by creating a wrapper function accepting as parameter a `Span` object, so that it can be used again in exercise 3. 

Indeed, the external function does only the sentence parsing and the detection of specifed token in the `Doc` sentences. Since a token can appear multiple times in a sentence, its occurrence are distinguished through their offset (as in exercise 1) and the relative subtrees are returned as a mapping between the __token-occurrence string__ and the __subtree__ of dependants, representend as list of `Token` objects (ordered according to sentence order). Matches between dictionary's keys are computed by taking the first part of the dict's key (the one that comes before the offset specification).

The internal function accepts as input a single sentence and initializes a dict that is used to map each token in the sentence with its subtree. Each subtree is representend as a __list of `Token`__ objects ordered according to sentence order: these are obtained thanks to a BFS-like exploration of the sentence graph; at each step, a queue is filled with a token's children (excluding the token itself) and these will be explored later in discovering order. The exploration continues until leaves are met, which do not add any child to the queue and therefore terminate the scan. 



In [4]:
def _get_subtree(doc:spacy.tokens.Span):
    rsdict = {
        doc.root.text + f'<{doc.root.i}>': list(doc.root.subtree)
    }
    q = list(doc.root.children)
    while len(q) > 0:
        token = q.pop(0)
        rsdict[token.text + f'<{token.i}>'] = list(token.subtree)
        q.extend(filter(lambda x: x.i != token.i, token.subtree))
    return rsdict

def get_subtree(token:str, doc:str, parser=nlp_spacy):
    _doc = parser(doc)
    res = []
    for sent in _doc.sents:
        res.append({})
        for k,v in _get_subtree(sent).items():
            if k.split('<')[0] == token:
                # fills the last added dictionary (for the current sentence) with subtree list
                res[-1][k] = v
    return res
    
print(*get_subtree(test_sentences['subtree'][0], test_sentences['subtree'][1])[0].items(), sep='\n')

('saw<1>', [I, saw, a, woman, that, saw, a, man, who, saw, me, yesterday])
('saw<5>', [that, saw, a, man, who, saw, me, yesterday])
('saw<9>', [who, saw, me, yesterday])


### 3) Check if a given list of tokens (segment of a sentence) forms a subtree

This function accepts a list of tokens (that can be specified as a string with spaces between each token, too) to verify if they are the _all and only_ components of a subtree which is contained inside a specified `Doc`. For first, it iterates over all the sentences in the `Doc`, trying to find if anyone of them contains all the specified tokens; if not, then they surely do not form a subtree for any sentence and thus it is returned `False`.

If at least one suitable sentence is found, then it iterates over all the subtrees of that sentence and return `True` if it finds one where the tokens' ordering matches the sentence ordering, by comparing the tokens' list and the list of `Token.text` for each subtree.

In [5]:
def check_subtree(tokens:Union[list, str], doc:spacy.tokens.Doc=doc_spacy):
    if type(tokens) is str: tokens = tokens.split()
    else: pass
    token_pool = set(tokens)

    for sentence in doc.sents:
        acc = token_pool.issubset(set(sentence.text.split()))
        if acc == True: 
            for subtr in _get_subtree(sentence).values():
                subtr = [el.text for el in subtr]
                if subtr == tokens: return True
    return False

for sent in test_sentences['check']:    
    print(f"Test for check_subtree('{sent}')", " ---> ", check_subtree(sent))

Test for check_subtree('with the telescope')  --->  True
Test for check_subtree('telescope with the')  --->  False


### 4) Identify head of a span, given its tokens

In this case, the input has been considered to be a string; the function also needs a pre-initialized parser, to perform the sentence parsing.

This function creates a new `Doc` containing the specified span and then returns the root token of the single `Span` object that composes that `Doc`, by accessing it directly thanks to __Python slicing__. 

The returned value is a `Token` object.

In [6]:
def head_of_span(sentence:str, parser=nlp_spacy):
    tmp = parser(sentence)
    return tmp[:].root

print(f"Head of sentence '{test_sentences['head']}': ", head_of_span(test_sentences['head']))

Head of sentence 'the man with the telescope':  man


### 5) Extract sentence subject, direct object and indirect object spans

This function defines a mapping between some types of dependency relations and the 3 requested categories. 

Going into details, a span is added to the corresponding list if and only if its head is labelled with one of the following dependencies:
+ subject: `nsubj`, `nsubjpass`;
+ object: `dobj`;
+ indirect object: `pobj`, `iobj`, `dative`.

The function iterates over the sentences in the `Doc` and for each one creates a dictionary mapping each requested dependency relation to the span of words in the __subtree__ of the token labeled with that relation. The subtree is obtained by casting to list the generator stored in `Token.subtree`. 

Since there may be multiple indirect objects, the values of the dictionary are implemented as lists (even for subject and direct object).


In [7]:
def extract_info(doc:spacy.tokens.Doc):
    relations = {
        'nsubj': 'subject',
        'nsubjpass': 'subject',
        'dobj': 'object',
        'pobj': 'indirect object',
        'dative': 'indirect object',
        'iobj': 'indirect object'
    }
    res = []
    for sentence in doc.sents:
        tmp = {
        'subject': None,
        'object': None,
        'indirect object': None
        }
        for word in sentence:
            if word.dep_ in relations.keys():
                tmp[relations[word.dep_]] = (list(word.subtree), word.dep_)
        res.append(tmp)
    return res

for sent, info in zip(nlp_spacy(test_sentences['info']).sents, extract_info(nlp_spacy(test_sentences['info']))):
    print("Current sentence: ", sent)
    print("Info: ")
    print(*info.items(), sep='\n')
    print()

Current sentence:  I gave Pooh all my honey.
Info: 
('subject', ([I], 'nsubj'))
('object', ([all, my, honey], 'dobj'))
('indirect object', ([Pooh], 'dative'))

Current sentence:  Also, I told Tigger a good bedtime story.
Info: 
('subject', ([I], 'nsubj'))
('object', ([a, good, bedtime, story], 'dobj'))
('indirect object', ([Tigger], 'dative'))

Current sentence:  Cristopher Robin was relieved to see all these friends.
Info: 
('subject', ([Cristopher, Robin], 'nsubjpass'))
('object', ([all, these, friends], 'dobj'))
('indirect object', None)



## Optional Section
### 1) Modify NLTK Transition parser's `Configuration` class to use better features
### 2) Evaluate the features comparing performances
### 3) Replace SVM classifier with an alternative


In [8]:
import nltk
from nltk.corpus import dependency_treebank
from nltk.parse import DependencyEvaluator
from nltk.parse.transitionparser import *

nltk.download('dependency_treebank') # 4k samples
origninal_tp = TransitionParser('arc-standard')
sentences = dependency_treebank.parsed_sents()[:150]

[nltk_data] Downloading package dependency_treebank to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping corpora/dependency_treebank.zip.


In [9]:
origninal_tp.train(sentences, 'original_tp.model')

 Number of training examples : 150
 Number of valid (projective) examples : 150
[LibSVM]

In [10]:
test_set = dependency_treebank.parsed_sents()[-10:]
parses = origninal_tp.parse(test_set, 'original_tp.model')
de = DependencyEvaluator(parses, test_set)
las, uas = de.eval()

print(las)
print(uas)

0.8083333333333333
0.8083333333333333


### Custom Parser Classes

In [11]:
class CustomConfig(Configuration):

    def extract_features(self):
        result = []
        # Todo : can come up with more complicated features set for better
        # performance.
        if len(self.stack) > 0:
            # Stack 0
            stack_idx0 = self.stack[len(self.stack) - 1]
            token = self._tokens[stack_idx0]
            if self._check_informative(token["word"], True):
                result.append("STK_0_FORM_" + token["word"])
            if "lemma" in token and self._check_informative(token["lemma"]):
                result.append("STK_0_LEMMA_" + token["lemma"])
            if self._check_informative(token["tag"]):
                result.append("STK_0_POS_" + token["tag"])
            if "feats" in token and self._check_informative(token["feats"]):
                feats = token["feats"].split("|")
                for feat in feats:
                    result.append("STK_0_FEATS_" + feat)
            
            ###################### Head check ###################### 
            # hd = self._tokens.get_by_address(token["head"]) 
            hd = self._tokens[token["head"]]["word"]
            if self._check_informative(hd):
                result.append("STK_0_HEAD_" + hd)
            ###################### ######################

            # Stack 1
            if len(self.stack) > 1:
                stack_idx1 = self.stack[len(self.stack) - 2]
                token = self._tokens[stack_idx1]
                if self._check_informative(token["tag"]):
                    result.append("STK_1_POS_" + token["tag"])

            # Left most, right most dependency of stack[0]
            left_most = 1000000
            right_most = -1
            dep_left_most = ""
            dep_right_most = ""
            for (wi, r, wj) in self.arcs:
                if wi == stack_idx0:
                    if (wj > wi) and (wj > right_most):
                        right_most = wj
                        dep_right_most = r
                    if (wj < wi) and (wj < left_most):
                        left_most = wj
                        dep_left_most = r
            if self._check_informative(dep_left_most):
                result.append("STK_0_LDEP_" + dep_left_most)
            if self._check_informative(dep_right_most):
                result.append("STK_0_RDEP_" + dep_right_most)

        # Check Buffered 0
        if len(self.buffer) > 0:
            # Buffer 0
            buffer_idx0 = self.buffer[0]
            token = self._tokens[buffer_idx0]
            if self._check_informative(token["word"], True):
                result.append("BUF_0_FORM_" + token["word"])
            if "lemma" in token and self._check_informative(token["lemma"]):
                result.append("BUF_0_LEMMA_" + token["lemma"])
            if self._check_informative(token["tag"]):
                result.append("BUF_0_POS_" + token["tag"])
            if "feats" in token and self._check_informative(token["feats"]):
                feats = token["feats"].split("|")
                for feat in feats:
                    result.append("BUF_0_FEATS_" + feat)
            
            ###################### Head check ######################
            # hd = self._tokens.get_by_address(token["head"]) 
            hd = self._tokens[token["head"]]["word"]
            if self._check_informative(hd):
                result.append("BUF_0_HEAD_" + hd)
            ###################### ######################

            # Buffer 1
            if len(self.buffer) > 1:
                buffer_idx1 = self.buffer[1]
                token = self._tokens[buffer_idx1]
                if self._check_informative(token["word"], True):
                    result.append("BUF_1_FORM_" + token["word"])
                if self._check_informative(token["tag"]):
                    result.append("BUF_1_POS_" + token["tag"])
            if len(self.buffer) > 2:
                buffer_idx2 = self.buffer[2]
                token = self._tokens[buffer_idx2]
                if self._check_informative(token["tag"]):
                    result.append("BUF_2_POS_" + token["tag"])
            if len(self.buffer) > 3:
                buffer_idx3 = self.buffer[3]
                token = self._tokens[buffer_idx3]
                if self._check_informative(token["tag"]):
                    result.append("BUF_3_POS_" + token["tag"])
            
            # Left most, right most dependency of buff[0]
            left_most = 1000000
            right_most = -1
            dep_left_most = ""
            dep_right_most = ""
            for (wi, r, wj) in self.arcs:
                if wi == buffer_idx0:
                    if (wj > wi) and (wj > right_most):
                        right_most = wj
                        dep_right_most = r
                    if (wj < wi) and (wj < left_most):
                        left_most = wj
                        dep_left_most = r
            if self._check_informative(dep_left_most):
                result.append("BUF_0_LDEP_" + dep_left_most)
            if self._check_informative(dep_right_most):
                result.append("BUF_0_RDEP_" + dep_right_most)

        return result


In [None]:
class CustomParser(TransitionParser):

    def _create_training_examples_arc_std(self, depgraphs, input_file):
        """
        Create the training example in the libsvm format and write it to the input_file.
        Reference : Page 32, Chapter 3. Dependency Parsing by Sandra Kubler, Ryan McDonal and Joakim Nivre (2009)
        """
        operation = Transition(self.ARC_STANDARD)
        count_proj = 0
        training_seq = []

        for depgraph in depgraphs:
            if not self._is_projective(depgraph):
                continue

            count_proj += 1
            conf = CustomConfig(depgraph)
            while len(conf.buffer) > 0:
                b0 = conf.buffer[0]
                features = conf.extract_features()
                binary_features = self._convert_to_binary_features(features)

                if len(conf.stack) > 0:
                    s0 = conf.stack[len(conf.stack) - 1]
                    # Left-arc operation
                    rel = self._get_dep_relation(b0, s0, depgraph)
                    if rel is not None:
                        key = Transition.LEFT_ARC + ":" + rel
                        self._write_to_file(key, binary_features, input_file)
                        operation.left_arc(conf, rel)
                        training_seq.append(key)
                        continue

                    # Right-arc operation
                    rel = self._get_dep_relation(s0, b0, depgraph)
                    if rel is not None:
                        precondition = True
                        # Get the max-index of buffer
                        maxID = conf._max_address

                        for w in range(maxID + 1):
                            if w != b0:
                                relw = self._get_dep_relation(b0, w, depgraph)
                                if relw is not None:
                                    if (b0, relw, w) not in conf.arcs:
                                        precondition = False

                        if precondition:
                            key = Transition.RIGHT_ARC + ":" + rel
                            self._write_to_file(key, binary_features, input_file)
                            operation.right_arc(conf, rel)
                            training_seq.append(key)
                            continue

                # Shift operation as the default
                key = Transition.SHIFT
                self._write_to_file(key, binary_features, input_file)
                operation.shift(conf)
                training_seq.append(key)

        print(" Number of training examples : " + str(len(depgraphs)))
        print(" Number of valid (projective) examples : " + str(count_proj))
        return training_seq


### Train & Test

In [12]:
custom_tp = CustomParser('arc-standard')
custom_tp.train(sentences, 'custom_tp.model')

 Number of training examples : 150
 Number of valid (projective) examples : 150
[LibSVM]

In [13]:
custom_parses = custom_tp.parse(test_set, 'custom_tp.model')
custom_de = DependencyEvaluator(custom_parses, test_set)
las, uas = de.eval()

print(las)
print(uas)

0.8083333333333333
0.8083333333333333
