# Universal Dependencies

In [1]:
import os
import re
import glob
import pickle
import logging
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)
from io import StringIO
from datetime import datetime
from collections import Counter
from tqdm import tqdm
tqdm.monitor_interval = 0
import spacy
import numpy as np
import pandas as pd

logging.basicConfig(filename='logs/UD.log', filemode='w', level=logging.INFO, 
                        format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')

EN_LINKS = ['https://github.com/UniversalDependencies/UD_English-ParTUT.git',
             'https://github.com/UniversalDependencies/UD_English-GUM.git',
             'https://github.com/UniversalDependencies/UD_English-EWT.git',
             'https://github.com/UniversalDependencies/UD_English-PUD.git',
             'https://github.com/UniversalDependencies/UD_English-LinES.git']

FR_LINKS = ['https://github.com/UniversalDependencies/UD_French-ParTUT.git',
             'https://github.com/UniversalDependencies/UD_French-GSD.git',
             'https://github.com/UniversalDependencies/UD_French-Sequoia.git',
             'https://github.com/UniversalDependencies/UD_French-PUD.git',
             #'https://github.com/UniversalDependencies/UD_French-FTB.git'
            ]

IT_LINKS = ['https://github.com/UniversalDependencies/UD_Italian-ISDT.git',
             'https://github.com/UniversalDependencies/UD_Italian-ParTUT.git',
             'https://github.com/UniversalDependencies/UD_Italian-PUD.git']

ES_LINKS = ['https://github.com/UniversalDependencies/UD_Spanish-AnCora.git',
            'https://github.com/UniversalDependencies/UD_Spanish-GSD.git',
            'https://github.com/UniversalDependencies/UD_Spanish-PUD.git']


DATASETS = {'en': EN_LINKS,
            'fr': FR_LINKS,
            'it': IT_LINKS,
            'es':  ES_LINKS}

UD_DIR = 'UD'

In [2]:
def download(url, path):
    cmd = 'git clone {} {}'.format(url, path)
    os.system(cmd)

def fetch_datasets(datasets):
    for lg, links in datasets.items():
        for link in tqdm(links):
            dirname = os.path.splitext(os.path.basename(link))[0]
            local_path = os.path.join(lg, UD_DIR, dirname)
            if not os.path.exists(local_path):
                download(link, local_path)

fetch_datasets(DATASETS)

100%|██████████| 5/5 [00:00<00:00, 21013.55it/s]
100%|██████████| 4/4 [00:00<00:00, 16777.22it/s]
100%|██████████| 3/3 [00:00<00:00, 13400.33it/s]
100%|██████████| 3/3 [00:00<00:00, 15988.45it/s]


In [3]:
class Sentence:
    
    TEXT_PATTERN = re.compile(r'#\s+?text\s+?=\s+?(.*)\n')
    WORDLINES_PATTERN = re.compile(r'\n(1.*\n)\n', re.DOTALL)
    TENSE_PATTERN = re.compile(r'Tense=([A-Z0-9][a-zA-Z0-9]*)')
    VERBFORM_PATTERN = re.compile(r'VerbForm=([A-Z0-9][a-zA-Z0-9]*)')
    NULL_VALUE = '_'
    COLUMNS = ['id', 'form', 'lemma', 'upos', 'xpos', 'feats', 'head', 'deprel', 'deps', 'misc']
    
    def __init__(self, raw):
        self.text = self.TEXT_PATTERN.search(raw).group(1)
        self.words = self.parse_words(raw)
        self.root_id = self.words[self.words['head'] == '0'].index[0]
        self.tense_id = self.find_tense_id()
        self.extract_features()
    
    def parse_words(self, raw):
        words = self.WORDLINES_PATTERN.search(raw).group(1)
        words = pd.read_csv(StringIO(words), sep='\t', names=self.COLUMNS, quoting=3, dtype={'id': str, 'head': str})
        is_emptynode = words['id'].str.contains(r'\.')
        words = words[~is_emptynode]
        words.set_index('id', inplace=True)
        words['tense'] = words['feats'].str.extract(self.TENSE_PATTERN)
        words['verbform'] = words['feats'].str.extract(self.VERBFORM_PATTERN)
        words.fillna('_', inplace=True)
        words['multiword'] = self.NULL_VALUE
        is_range = words.index.str.contains('-')
        for idx in words.index[is_range]:
            start, end = idx.split('-')
            ids = map(str, range(int(start), int(end)+1))
            for i in ids:
                words.loc[i]['multiword'] = idx
        return words.drop(['xpos', 'feats', 'deps', 'misc'], axis=1)
         
    def extract_features(self):
        self.length = len(self.words[self.words['multiword'] == self.NULL_VALUE])
        if self.tense_id is not None:
            self.tense = self.words.loc[self.tense_id]['tense']
            self.tense_wordform = self.get_tensed_wordform()
            self.tense_lemma = self.words.loc[self.tense_id]['lemma']
            self.relation_to_head = self.words.loc[self.tense_id]['deprel']
            
            all_tensed_words = self.words[self.words['tense'] != self.NULL_VALUE]
            self.other_tensed_words = len(all_tensed_words) - 1
            self.other_tensed_words_conflicting = (all_tensed_words['tense'] != self.tense).sum()
            
            next_word = str(int(self.tense_id)+1)
            if int(next_word) > int(self.words.index[-1]):
                self.dist_from_right = 0
                self.other_tensed_words_right = 0
                self.other_tensed_words_conflicting_right = 0
            else:
                words_to_the_right = self.words.loc[next_word:]
                self.dist_from_right = (words_to_the_right['multiword'] == self.NULL_VALUE).sum()
                tensed_words_to_the_right = words_to_the_right[words_to_the_right['tense'] != self.NULL_VALUE]
                self.other_tensed_words_right = len(tensed_words_to_the_right)
                self.other_tensed_words_conflicting_right = (tensed_words_to_the_right['tense'] != self.tense).sum()
             
        else:
            self.tense = None
            self.tense_wordform = None
            self.tense_lemma = None
            self.relation_to_head = None
            self.dist_from_right = None 
            self.other_tensed_words = None
            self.other_tensed_words_conflicting = None
            self.other_tensed_words_right = None
            self.other_tensed_words_conflicting_right = None
    
    def get_tensed_wordform(self):
        multiword_id = self.words.loc[self.tense_id]['multiword']
        if multiword_id != self.NULL_VALUE:
            return self.words.loc[multiword_id]['form']
        return self.words.loc[self.tense_id]['form']

class EnglishSentence(Sentence):
    
    def find_tense_id(self):
        """
        1) AUX direct dependent of root, related by (aux, cop or aux:pass), is finite and has tense
        2) If found above but no tense, 
            if lemma is will, take
            otherwise UD is wrong
        3) VERB that is root, is finite and has tense
        """
        is_aux = self.words['upos'] == 'AUX'
        is_dep_head = self.words['head'] == self.root_id
        is_aux_cop_deprel = self.words['deprel'].isin(['aux', 'aux:pass', 'cop'])
        is_finite = self.words['verbform'] == 'Fin'
        has_tense = self.words['tense'] != self.NULL_VALUE
        finite_aux = self.words[is_aux & is_dep_head & is_aux_cop_deprel & is_finite]
        
        if len(finite_aux):
            tensed_finite_aux = finite_aux[has_tense]
            if len(tensed_finite_aux):
                return tensed_finite_aux.index[0]
            elif finite_aux['lemma'][0] == 'will':
                return finite_aux.index[0]
            else:
                return None
        else:
            root = self.words.loc[self.root_id]
            if (root['upos'] == 'VERB') and (root['verbform'] == 'Fin') and (root['tense'] != self.NULL_VALUE):
                return self.root_id
        return None

class ItalianSentence(Sentence):
    
    def find_tensed_word(self):
        """
        1) Root of the sentence if it's a verb and has tense
        2) The first direct dependent of the root that is related to the root by cop
        3) The first direct dependent of the root that is related to the root by aux
        4) The first direct dependent of the root that is an auxiliary
        """
        root = self.words[self.words['head'] == 0].iloc[0]
        if (root['upos'] == 'VERB') and (isinstance(root['tense'], str)):
            return root
        root_id = root['id']
        is_dep_of_root = self.words['head'] == root_id
        is_copula = self.words['deprel'] == 'cop'
        tmp = self.words[is_dep_of_root & is_copula]
        if len(tmp) > 0:
            return tmp.iloc[0]
        is_aux = self.words['deprel'] == 'aux'
        tmp = self.words[is_dep_of_root & is_aux]
        if len(tmp) > 0:
            return tmp.iloc[0]
        is_aux = self.words['upos'] == 'AUX'
        tmp = self.words[is_dep_of_root & is_aux]
        if len(tmp) > 0:
            return tmp.iloc[0]
        return None

class FrenchSentence(Sentence):
    
    def find_tense_id(self):
        """
        1) Root is verb, is finite and has tense
        2) AUX points to head as aux:pass or cop, has tense and is finite
        3) AUX points to head, is aux, has tense, is finite
            Root is verb, has tense and is part <-
            AUX points to head as cop, has tense and is part <-
        
        4) Root is verb, is non-finite
            AUX points to head, is aux, has tense, is finite
                if lemma is "aller", future
                otherwise AUX
        """
        root_is_verb = self.words.loc[self.root_id]['upos'] == 'VERB'
        root_is_fin = self.words.loc[self.root_id]['verbform'] == 'Fin'
        root_has_tense = self.words.loc[self.root_id]['tense'] != self.NULL_VALUE
        if root_is_verb and root_is_fin and root_has_tense:
            return self.root_id
        is_aux = self.words['upos'] == 'AUX'
        is_dep_head = self.words['head'] == self.root_id
        is_finite = self.words['verbform'] == 'Fin'
        has_tense = self.words['tense'] != self.NULL_VALUE
        finite_aux = self.words[is_aux & is_dep_head & is_finite & has_tense]
        if len(finite_aux):
            is_auxpass_cop_deprel = self.words['deprel'].isin(['aux:pass', 'cop'])
            aux_cop_finite_aux = finite_aux[is_auxpass_cop_deprel]
            if len(aux_cop_finite_aux):
                return aux_cop_finite_aux.index[0]
            else:
                if root_is_verb and root_has_tense and self.words.loc[self.root_id]['verbform'] == 'Part':
                    return self.root_id
                else:
                    is_cop = self.words['deprel'] == 'cop'
                    is_participle = self.words['verbform'] == 'Part'
                    aux = self.words[is_aux & is_dep_head & is_cop & has_tense & is_participle]
                    if len(aux):
                        return aux.index[0]
        if root_is_verb and self.words.loc[self.root_id]['verbform'] == 'Inf':
            is_aux_deprel = self.words['deprel'] == 'aux'
            aux = self.words[is_aux & is_dep_head & is_aux_deprel & has_tense & is_finite]
            if len(aux):
                return aux.index[0] # doesn't take aller into account
        return None

class SpanishSentence(Sentence):
    
    def find_tensed_word(self):
        """
        1) Root of the sentence if it's finite and has tense
        2) Root of the sentence if it's a verb and has tense
        3) The first direct dependent of the root that is related to the root by cop
        4) The first direct dependent of the root that is related to the root by aux
        5) The first direct dependent of the root that is an auxiliary
        """
        root = self.words[self.words['head'] == 0].iloc[0]
        if (root['verbform'] == 'Fin') and (isinstance(root['tense'], str)):
            return root
        if (root['upos'] == 'VERB') and (isinstance(root['tense'], str)):
            return root
        root_id = root['id']
        is_dep_of_root = self.words['head'] == root_id
        is_copula = self.words['deprel'] == 'cop'
        tmp = self.words[is_dep_of_root & is_copula]
        if len(tmp) > 0:
            return tmp.iloc[0]
        is_aux = self.words['deprel'] == 'aux'
        tmp = self.words[is_dep_of_root & is_aux]
        if len(tmp) > 0:
            return tmp.iloc[0]
        is_aux = self.words['upos'] == 'AUX'
        tmp = self.words[is_dep_of_root & is_aux]
        if len(tmp) > 0:
            return tmp.iloc[0]
        return None

class CoNLLUReader:
    
    SENTENCE_PATTERN = re.compile(r'# sent_id.*?\n\n', re.DOTALL)
    
    SENTENCE_READERS = {'en': EnglishSentence,
                        'it': ItalianSentence,
                        'fr': FrenchSentence,
                        'es': SpanishSentence}
    
    def __init__(self, lg, fname):
        self.lg = lg
        self.sentences = self.parse_sentences(self.read(fname))
        
    def read(self, fname):
        with open(fname, encoding='utf-8') as f:
            return f.read()
        
    def parse_sentences(self, raw):
        SentenceReader = self.SENTENCE_READERS[self.lg]
        sentences = []
        for sent in self.SENTENCE_PATTERN.finditer(raw):
            sent = SentenceReader(sent.group(0))
            sentences.append(sent)
        return sentences

In [4]:
class UDReader:
    
    FEATURES = ['text', 'length', 'tense', 'tense_wordform', 'tense_lemma', 'relation_to_head',
                'dist_from_right', 'other_tensed_words', 'other_tensed_words_conflicting',
                'other_tensed_words_right', 'other_tensed_words_conflicting_right']
    BAD_POS = ['INTJ', 'SYM', 'X']
    
    def __init__(self, lg):
        self.lg = lg
        self.nlp = spacy.load(lg)
        self.fnames = glob.glob('{}/**/*.conllu'.format(lg))
        
    def prepare(self):
        self.tensed_types = set() # built up during prepare_gt_and_tensed_types()
        self.ground_truth = self.prepare_gt_and_tensed_types()
        self.freq_dist = self.prepare_freq_dist()
        self.responsible_types = self.prepare_responsible()
        self.write()
    
    def prepare_gt_and_tensed_types(self):
        gt = []
        for fname in tqdm(self.fnames):
            reader = CoNLLUReader(self.lg, fname)
            for s in reader.sentences:
                if s.tense:
                    g = self.extract(s, self.FEATURES)
                    gt.append(g)
                    self.add_tensed_words(s)
        return pd.DataFrame(gt, columns=self.FEATURES)
        
    def prepare_responsible(self):
        return set(self.ground_truth['tense_wordform'])
    
    def add_tensed_words(self, sent):
        ids = sent.words[sent.words['tense'] != Sentence.NULL_VALUE].index
        for i in ids:
            multiword_i = sent.words.loc[i]['multiword']
            if multiword_i != Sentence.NULL_VALUE:
                i = multiword_i
            t = sent.words.loc[i]['form'].lower()
            self.tensed_types.add(t)
    
    def prepare_freq_dist(self):
        freq_dist = Counter()
        for sent in self.ground_truth['text']:
            doc = self.nlp(sent)
            tokens = self.preprocess(doc)
            freq_dist.update(tokens)
        return freq_dist
    
    def preprocess(self, doc):
        result = []
        for t in doc:
            is_bad_pos = t.pos_ in self.BAD_POS
            is_bad_punct = t.is_punct and (len(t.text) > 1)
            is_bad_token = t.like_url or t.like_email or t.is_space or is_bad_pos or is_bad_punct
            is_num = t.is_digit or t.like_num
            if is_bad_token:
                break
            elif is_num:
                text = 'num'
            else:
                text = t.text.lower()
            result.append(text)
        return result
            
    def extract(self, sent, features):
        return [getattr(sent, ft) for ft in features]
    
    def write(self):
        ofile = os.path.join(UD_DIR, self.lg, 'ground_truth.csv')
        self.ground_truth.to_csv(ofile, index=False)
        ofile = os.path.join(UD_DIR, self.lg, 'metadata.pkl')
        obj = {'freq_dist': self.freq_dist, 'tensed_types': self.tensed_types, 
               'responsible_types': self.responsible_types}
        with open(ofile, mode='wb') as f:
            pickle.dump(obj, f)

In [5]:
def main(lgs):
    for lg in lgs:
        start = datetime.now()
        reader = UDReader(lg)
        reader.prepare()
        end = datetime.now()
        msg = 'Processing {} took {}'.format(lg, end-start)
        logging.info(msg)

In [6]:
#main(['en', 'fr', 'it', 'es'])