In [5]:
verb_label_tuple_to_sentences_list = {}

with open('eval_set.txt') as f:
    verb_label_tuple = None
    for line in f:
        # skip if empty line, if only one space, split by space, otherwise take whole line
        try:
            if line.strip() == '':
                continue
            elif len(line.split()) == 2:
                line_list = line.strip().split()
                label = int(line_list[1])
                verb = line_list[0]
                verb_label_tuple = (verb, label)
                verb_label_tuple_to_sentences_list[verb_label_tuple] = []
            else:
                verb_label_tuple_to_sentences_list[verb_label_tuple].append(line.strip())
        except:
            print(line)
            raise Exception

In [6]:
complete_data_folder = None

def get_annotated_sentences(verb, label):
    verb_file = complete_data_folder + '/' + verb + '.csv'
    sentences = verb_label_tuple_to_sentences_list[(verb, label)]
    sentences_to_annotated_sentences = {}
    with open(verb_file) as f:
        for line in f:
            if len(sentences) == 0:
                raise Exception('No sentences for verb ' + verb + ' and label ' + str(label))
            for sentence in sentences:
                if sentence in line:
                    sentences_to_annotated_sentences[sentence] = line.strip()
            if len(sentences_to_annotated_sentences) == len(sentences):
                return sentences_to_annotated_sentences
            
    print([sentence for sentence in sentences if sentence not in sentences_to_annotated_sentences])
    raise Exception('Not enough sentences for verb ' + verb + ' and label ' + str(label))

In [7]:
adpositions = ['across', 'at', 'down', 'from', 'in', 'inside', 'into', 'near', 'off', 'off of', 'on', 'onto', 'out of', 'out to', 'outside', 'outside of', 'over', 'through', 'to', 'towards', 'under', 'up', 'within']

def check_dobj(i, dependencies, verb):
    if i >= len(dependencies):
        return None
    text, lemma, pos, dep, head, head_i = dependencies[i]

    if pos in ['VERB', 'AUX']:
        return None
    
    if dep == 'ROOT':
        return None

    if dep == 'dobj' and head == verb:
        return i

    return check_dobj(head_i, dependencies, verb) 


def check_adp(start_i, dependencies, adpositions): #note that this does not include dependencies because spacy can't be trusted on those
    if start_i + 1 < len(dependencies):
        longer_adp = dependencies[start_i][0] + " " + dependencies[start_i + 1][0]
        if longer_adp in adpositions:
            return ([start_i, start_i + 1], longer_adp)
    
    if start_i < len(dependencies) and dependencies[start_i][0] in adpositions:
        return ([start_i], dependencies[start_i][1])

    return (None, None)

def check_pobj(dependencies, adp_index):
    pobj_token_list = [(lemma, pobj_i) for pobj_i, (text, lemma, pos, dep, head, head_i) in enumerate(dependencies) if head_i in adp_index and dep == 'pobj' and pos not in ['NUM', 'PUNCT']]
    if len(pobj_token_list) > 1:
        raise Exception
    if len(pobj_token_list) == 1:
        return pobj_token_list[0]
    return (None, None)

# Why     why     SCONJ   advmod  is      1
def is_cm(dependencies, nlp, sentence):
    for i, (token_text, lemma, pos, dep, head, head_i) in enumerate(dependencies):
        if pos == 'VERB':
            # Look for a direct object (dobj) immediately following the verb
            dobj_i = check_dobj(i + 1, dependencies, token_text)
            if dobj_i:
                dobj_lemma = dependencies[dobj_i][1]
                # Check if the following token is an adposition (ADP) from the list
                (adp_index, adp_lemma) = check_adp(dobj_i + 1, dependencies, adpositions)
                if adp_index:
                    # do another spacy dependency parse
                    doc = nlp(sentence)
                    dependencies = [(token.text, token.lemma_, token.pos_, token.dep_, token.head.text, token.head.i) for token in doc]
                    pobj_lemma, pobj_i = check_pobj(dependencies, adp_index)
                    if pobj_lemma:
                        result_dict = {'verb': lemma, 'verb_i' : i, 'direct_object': dobj_lemma, 'direct_object_i': dobj_i, 'preposition': adp_lemma, 'preposition_i': adp_index, 'prepositional_object': pobj_lemma, 'prepositional_object_i': pobj_i}
                        return result_dict
                    # Look for a prepositional object (pobj) child of the adp



    return None

In [12]:
import spacy
nlp = spacy.load("en_core_web_sm")

def dependency_parsing(sentences):
    parsed_sentences = []
    for sentence in sentences:
        doc = nlp(sentence)
        dependencies = [(token.text, token.lemma_, token.pos_, token.dep_, token.head.text, token.head.i) for token in doc]
        parsed_sentences.append({"sentence": sentence, "dependencies": dependencies})
    return parsed_sentences

def get_annotated_sentences(verb, label):
    sentences = verb_label_tuple_to_sentences_list[(verb, label)]
    parsed_sentences = dependency_parsing(sentences)
    sentences_to_annotated_sentences = {}
    
    for sentence, parsed_sentence in zip(sentences, parsed_sentences):
        result = is_cm(parsed_sentence['dependencies'], nlp, sentence)
        if result:
            sentences_to_annotated_sentences[sentence] = f"{sentence},{result['verb']},{result['direct_object']},{result['preposition']},{result['prepositional_object']},{result['verb_i']},{result['direct_object_i']},{result['preposition_i']},{result['prepositional_object_i']}"
        else:
            print('No result for sentence: ' + sentence)
            raise Exception
    return sentences_to_annotated_sentences

In [14]:
from tqdm import tqdm
out_file = "annotated_eval_set.csv"
for verb_label_tuple in tqdm(verb_label_tuple_to_sentences_list):
    verb = verb_label_tuple[0]
    label = verb_label_tuple[1]
    sentences_to_annotated_sentences = get_annotated_sentences(verb, label)
    if not sentences_to_annotated_sentences:
        raise Exception ('No sentences for verb ' + verb + ' and label ' + str(label))
    with open(out_file, 'a') as f:
        for sentence in sentences_to_annotated_sentences:
            f.write(sentences_to_annotated_sentences[sentence] + "," + str(label) + '\n')

100%|██████████| 107/107 [00:07<00:00, 14.47it/s]
