In [1]:
import enum
from collections import namedtuple

import pandas as pd

import pattern.en as en
from pycorenlp import StanfordCoreNLP

import nltk.tokenize

In [2]:
stanford_nlp = StanfordCoreNLP('http://localhost:9000')

In [3]:
Constituency = namedtuple(
    "Constituency", ["tag", "startIndex", "endIndex", "depth", "parent_tag"]
)


def skip_blank_space(parse, pointer):
    while pointer < len(parse) and parse[pointer].isspace():
        pointer += 1
    return pointer


def read_tag(parse, pointer):
    tag_start = pointer
    while pointer < len(parse) and not parse[pointer].isspace():
        pointer += 1
    return parse[tag_start:pointer], skip_blank_space(parse, pointer)


def read_token(parse, pointer, tokens):
    token_start = pointer
    while pointer < len(parse) and parse[pointer] != ")":
        pointer += 1
    tokens.append(parse[token_start:pointer])
    return pointer
    
    
def read_body(parse, pointer, constituencies, tokens, depth, parent_tag):
    if parse[pointer] == "(":
        return read_constituency(parse, pointer, constituencies, tokens, depth, parent_tag)
    else:
        return None, read_token(parse, pointer, tokens)
        
        
def read_constituency(parse, pointer, constituencies, tokens, depth, parent_tag):
    assert parse[pointer] == "("
    pointer += 1
    tag, pointer = read_tag(parse, pointer)
    first_child, pointer = read_body(
        parse, pointer, constituencies, tokens, depth + 1, parent_tag=tag
    )
    if first_child is None:
        constituency = Constituency(
            tag=tag, 
            startIndex=len(tokens), 
            endIndex=len(tokens) + 1, 
            depth=depth,
            parent_tag=parent_tag
        )
        assert parse[pointer] == ")"
    else:
        child = first_child
        while parse[pointer] != ")":
            child, pointer = read_body(
                parse, pointer, constituencies, tokens, depth + 1, parent_tag=tag
            )
        constituency = Constituency(
            tag=tag, 
            startIndex=first_child.startIndex, 
            endIndex=child.endIndex, 
            depth=depth,
            parent_tag=parent_tag
        )
    pointer += 1
    constituencies.append(constituency)  
    return constituency, skip_blank_space(parse, pointer)


def read_constituencies(parse):
    constituencies = []
    tokens = []
    read_constituency(parse, 0, constituencies, tokens, 0, "")
    return constituencies

In [4]:
def get_subject(parse):
    constituencies = read_constituencies(parse)
    np_constituencies = [c for c in constituencies if c.tag == "NP"]
    if len(np_constituencies) == 0:
        return None
    else:
        top_np_constituency = sorted(np_constituencies, key=lambda c: c.depth)[0]
        return top_np_constituency.startIndex, top_np_constituency.endIndex

In [5]:
def create_resolution_map(corefs):
    resolution_map = {}

    for _, coref_group in corefs.items():
        representative_mention = None
        keys = []
        for coref in coref_group:
            if coref["isRepresentativeMention"]:
                representative_mention = coref["text"]
            keys.append((coref["sentNum"], coref["startIndex"], coref["endIndex"]))
        for key in keys:
            resolution_map[key] = representative_mention    
            
    return resolution_map


def resolve_subject(sentence_num, subject, resolution_map, tokens):
    key = (sentence_num, *subject)
    if key in resolution_map:
        return resolution_map[key]
    else:
        return " ".join(
            [tokens[index - 1]["originalText"] for index in range(*subject)]
        )

In [6]:
def get_resolved_subjects(text, stanford_nlp=stanford_nlp):
    result = stanford_nlp.annotate(
        text,
        properties={
           'annotators': 'parse,coref',
           'outputFormat': 'json',
           'timeout': 60000,
        }
    )
    
    resolution_map = create_resolution_map(result["corefs"])
    
    resolved_subjects = []
    for i, sent in enumerate(result["sentences"]):
        subject = get_subject(sent["parse"])
        if subject is None:
            resolved_subject = None
        else:
            resolved_subject = resolve_subject(
                i + 1,
                subject,
                resolution_map,
                result["sentences"][i]["tokens"]
            )
        resolved_subjects.append(resolved_subject)
    
    return resolved_subjects

In [7]:
def test__get_resolved_subjects():
    text = "This movie was actually neither that funny, nor super witty. The movie was meh. I liked watching that movie. If I had a choice, I would not watch that movie again."
    assert get_resolved_subjects(text, stanford_nlp) == ["This movie", "This movie", "I", "I"]
    

test__get_resolved_subjects()

In [9]:
# include previous sentences to do the coref resolution
def move_st(text, st, n_prev_sentences):
    sent_counter = 0
    while st > 0:
        if text[st] in {".", "!", "?"}:
            sent_counter += 1
            if sent_counter > n_prev_sentences:
                return st + 1, sent_counter - 1
        st -= 1
    return 0, sent_counter

In [10]:
def find_subject(text, st, end):
    new_st, sent_no = move_st(text, st, 5)
    resolved_subjects = get_resolved_subjects(text[new_st:end])
    if len(resolved_subjects) > 0:
        return resolved_subjects[-1]
    else:
        return None
    
    
def test__find_subject():
    assert find_subject(
               "Sam likes pizza. So he eats it frequently.",
               17, 
               42
           ) == "Sam"
    assert find_subject(
               "Sam likes pizza. So he eats it frequently.",
               0, 
               17
           ) == "Sam"

    
test__find_subject()

In [8]:
def normalise_verb(verb):
    return en.conjugate(
        verb, 
        tense = en.PRESENT,        # INFINITIVE, PRESENT, PAST, FUTURE
        person = 1,              # 1, 2, 3 or None
        number = en.SINGULAR,       # SG, PL
        mood = en.INDICATIVE,     # INDICATIVE, IMPERATIVE, CONDITIONAL, SUBJUNCTIVE
        negated = False,          # True or False
        parse = True
    )


def is_plural(word):
    if word == en.singularize(word):
        return False
    else:
        return True
    

class Tense(enum.Enum):
    PRESENT = 0
    PAST = 1

    
class Number(enum.Enum):
    SINGULAR = 0
    PLURAL = 1
    
    
def conjugate(verb, tense=Tense.PAST, person=1, number=Number.SINGULAR):
    if tense == Tense.PRESENT:
        en_tense = en.PRESENT
    elif tense == Tense.PAST:
        en_tense = en.PAST
    else:
        assert False
        
    if number == Number.SINGULAR:
        en_number = en.SINGULAR
    elif number == Number.PLURAL:
        en_number = en.PLURAL
    else: 
        assert False
        
    return en.conjugate(verb, tense=en_tense, person=person, number=en_number)


def find_first_verb(text):
    for i, (_, pos_tag) in enumerate(en.tag(text)):
        if pos_tag[:1] == 'V':
            return i
    return None


def find_first_adjective(text):
    for i, (_, pos_tag) in enumerate(en.tag(text)):
        if pos_tag == 'JJ':
            return i
    return None

In [None]:
en.

In [93]:
tokenized_odd_expressions = {
    tuple(nltk.tokenize.word_tokenize(odd_expression))
        for odd_expression in [
            "in fact",
            "as a matter of fact",
            "actually", 
            "indeed", 
            "also", 
            "besides", 
            "too", 
            "as well",
            "in particular", 
            "first of all", 
            "secondly", 
            "finally", 
            "to sum up", 
            "in conclusion", 
            "briefly", 
            "in short", 
            "on the whole", 
            "in general", 
            "in some cases", 
            "to some extent", 
            "broadly speaking", 
            "as a result", 
            "despite that", 
            "in spite of that", 
            "however", 
            "on the other hand", 
            "after all", 
            "on the contrary", 
            "of course", 
            "certainly", 
            "moreover", 
            "furthermore", 
            "further", 
            "in addition", 
            "also", 
            "besides", 
            "what is more", 
            "for instance", 
            "for example", 
            "e.g.", 
            "honestly", 
            "no doubt", 
            "that is to say", 
            "in other words", 
            "apparantly", 
            "really", 
            "actually", 
            "more or less", 
            "at least", 
            "in any case", 
            "in fact", 
            "already"
        ]
}
max_odd_expr_len = max([len(toe) for toe in tokenized_odd_expressions])

In [113]:
def process_and_join_tokens(sent, offset=1, resolution_map=None, verbose=False):
    output_words = []
    prev_word = ""
    i = offset - 1
    while i < len(sent["tokens"]):
        
        skipped_tokens = False
        
        k = max_odd_expr_len
        while not skipped_tokens and k >= 1:
            k_gram = tuple([t["word"].lower() for t in sent["tokens"][i:(i + k)]])
            if k_gram in tokenized_odd_expressions:
                if prev_word == ",":
                    assert len(output_words) >= 1
                    output_words.pop()
                    if len(output_words) > 0:
                        prev_word = output_words[-1]
                    else:
                        prev_word = ""
                if verbose:
                    print(f"Removing {' '.join(k_gram)}")
                i += k
                skipped_tokens = True
                if i < len(sent["tokens"]) and sent["tokens"][i]["word"] == ",":
                    i += 1
            k -= 1
                    
        if not skipped_tokens:        
            token = sent["tokens"][i]
            word = token["word"]
            if word == "-LRB-":
                word = "("
            elif word == "-RRB-":
                word = ")"
            if (
                word not in {".", ",", "?", "!", ";", ":", ")"}
                    and word[:1] != "'"
                    and prev_word != "("
            ):
                output_words.append(" ")
            if resolution_map is None:
                output_words.append(word)
            else:
                if word.lower() in {"he", "she", "it", "they", "him", "her", "them"}:
                    key = (sent["index"] + 1, token["index"], token["index"] + 1)
                    if not key in resolution_map:
                        output_words.append(word)
                    else:
                        if verbose:
                            print(
                                f"Substituting '{resolution_map[key]}'"
                                f" for '{token['word']}'."
                            )
                        output_words.append(resolution_map[key])
                else:
                    output_words.append(word)
            prev_word = word
            i += 1
            
    if len(output_words) > 0 and output_words[0] == " ":
        output_words = output_words[1:]
    return "".join(output_words)           


def test__process_and_join_tokens():
    def apply_to_sentence(sentence, offset=1, resolution_map=None):
        result = stanford_nlp.annotate(
            sentence,
            properties={
               'annotators': 'parse',
               'outputFormat': 'json',
               'timeout': 60000,
            }
        )
        return process_and_join_tokens(
            result["sentences"][0], offset, resolution_map
        )

    assert ( # Nothing is removed when there're no odd expressions
        apply_to_sentence("We, them, and you all got a solution.")
            == "We, them, and you all got a solution."
    )
    assert ( # Correctly handles several odd expressions following each other
             # Correctly handles commas
        apply_to_sentence("As a matter of fact, at least, we got a solution.")
            == "we got a solution."
    )
    assert ( # Correctly removes odd expr. at the end of the sentence
             # Correctly handles commas
        apply_to_sentence("We got a solution, at least")
            == "We got a solution"
    )
    assert ( # Correctly removes odd expr. in the middle of the sentence
             # Correctly handles commas
        apply_to_sentence("We, at least, got a solution.")
            == "We got a solution."
    )
    assert ( # Correctly removes odd expr. at the end of the sentence
             # Correctly handles the no-comma situation
        apply_to_sentence("We got a solution too")
            == "We got a solution"
    )
    assert ( # Correctly removes odd expr. in the middle of the sentence
             # Correctly handles the no-comma situation
        apply_to_sentence("We also got a solution.")
            == "We got a solution."
    )
    assert ( 
        apply_to_sentence("( 4 ) As a matter of fact, at least, we got a solution.")
            == "(4) we got a solution."
    )
    assert ( 
        apply_to_sentence("ABCDE He really wanted to go.", 2, {(1, 2, 3): "Sam"})
            == "Sam wanted to go."
    )
    
    
test__process_and_join_tokens()

In [116]:
def trim_and_fix_punctuation(sent):
    trimmed = sent.strip()
    if len(trimmed) == 0:
        return ""
    else:
        if trimmed[-1] in {",", ";", ":"}:
            return trimmed[:-1] + "."
        elif trimmed[-1] not in {"!", "?", "."}:
            return trimmed + "."
        else:
            return trimmed
    

def is_imperative_verb(token):
    if len(token["pos"]) == 0 or token["pos"][0] != "V":
        return False
    else:
        return normalise_verb(token["word"]) == token["word"].lower()
    
    
def find_comma_index(sent):
    for token in sent["tokens"]:
        if token["word"] == ",":
            return token["index"]
    return -1

 
def get_offset(sent, verbose=False):
    if verbose:
        print("-- syntactic parsing result\n", sent["parse"])
    if (
        sent["tokens"][0]["word"].lower() in {"if", "when", "as"}
            or is_imperative_verb(sent["tokens"][0])
    ):
        if verbose:
            print(
                "No tokens before the first NP will be removed because"
                " the sentence starts with if/when/as or an imperative verb."
            )
        return 1
    else:
        first_np = None
#             comma_index = find_comma_index(sent)
        constituencies = read_constituencies(sent["parse"])
        for c in constituencies:
            if c.tag == "NP" and c.parent_tag == "S": # and c.startIndex > comma_index:
                if first_np is None or first_np.startIndex > c.startIndex:
                    first_np = c
        if first_np is None:
            return 1
        else:
            return first_np.startIndex


def take_first_sentence_and_remove_leading_words(text, verbose=False):
    if verbose:
        print("Taking the first sentence and removing leading words:")
    
    result = stanford_nlp.annotate(
        text,
        properties={
           'annotators': 'parse',
           'outputFormat': 'json',
           'timeout': 60000,
        }
    )
    
    if len(result["sentences"]) == 0 or len(result["sentences"][0]) == 0:
        if verbose:
            print(
                "The parser didn't extract any sentences"
                " or the length of the first sentence is zero."
            )
        return None
    else:
        sent = result["sentences"][0]
        offset = get_offset(sent, verbose)
        new_text = trim_and_fix_punctuation(
            " ".join(
                [process_and_join_tokens(sent, offset)]
#                     + [join_tokens(s) for s in result["sentences"][1:]]
            )
        )
        if verbose:
            print(f"{text} \n---> \n{new_text}")
        return new_text

        
def test__take_first_sentence_and_remove_leading_words():
    assert (
        take_first_sentence_and_remove_leading_words("In fact, we expected it. We did!") 
            == "we expected it."
    )
    assert (
        take_first_sentence_and_remove_leading_words("In fact we expected it.  ") 
            == "we expected it."
    )
    assert (
        take_first_sentence_and_remove_leading_words("Do what I say! Now!") 
            == "Do what I say!"
    )
    assert (
        take_first_sentence_and_remove_leading_words("If so, let's skip it ( 4 ) :") 
            == "If so, let's skip it (4)."
    )
    assert (
        take_first_sentence_and_remove_leading_words("When in doubt, ask them, ") 
            == "When in doubt, ask them."
    )
    assert (
        take_first_sentence_and_remove_leading_words("As agreed, we'll write it down.") 
            == "As agreed, we'll write it down."
    )
    assert (
        take_first_sentence_and_remove_leading_words("At last they went to Spain, ") 
            == "they went to Spain."
    )
    assert take_first_sentence_and_remove_leading_words("") is None


test__take_first_sentence_and_remove_leading_words()

In [120]:
def take_last_sentence_and_resolve_pronouns(text, verbose=False):
    if verbose:
        print("Taking the last sentence and resolving pronouns:")
    
    st, _ = move_st(text, len(text) - 1, 4)
    result = stanford_nlp.annotate(
        text[-st:],
        properties={
           'annotators': 'parse,coref',
           'outputFormat': 'json',
           'timeout': 100000,
        }
    )
    
    resolution_map = create_resolution_map(result["corefs"])
    
    if len(result["sentences"]) == 0 or len(result["sentences"][-1]) == 0:
        if verbose:
            print(
                "The parser didn't extract any sentences"
                " or the length of the last sentence is zero."
            )
        return None
    else:
        sent = result["sentences"][-1]
        offset = get_offset(sent)
        new_text = trim_and_fix_punctuation(
            process_and_join_tokens(sent, offset, resolution_map, verbose)
        )
        if verbose:
            print(f"{text} \n---> \n{new_text}")
        return new_text
    
    
def test__take_last_sentence_and_resolve_pronouns():
    assert (
        take_last_sentence_and_resolve_pronouns(
            "Sam likes icecream. He eats it everyday."
        )
    ) == "Sam eats icecream everyday."

    assert (
        take_last_sentence_and_resolve_pronouns(
            "Sam likes icecream. See (5 )"
        )
    ) == "See (5)."
    
    assert take_last_sentence_and_resolve_pronouns("") is None
    
    assert (
        take_last_sentence_and_resolve_pronouns(
            "Sam likes icecream. But he does not eat it everyday."
        )
    ) == "Sam does not eat icecream everyday."
    
    
test__take_last_sentence_and_resolve_pronouns()

In [45]:
def print_parse_trees(text):
    result = stanford_nlp.annotate(
        text,
        properties={
           'annotators': 'parse',
           'outputFormat': 'json',
           'timeout': 60000,
        }
    )
    
    for i, sentence in enumerate(result["sentences"]):
        print(f"SENTENCE no. {i}:\n")
        print(sentence["parse"])
        print()

In [46]:
# -- Code for checking if the verbs of the parts of a relation belong to the same subject

def print_if_verbose(text, verbose):
    if verbose:
        print(text)


def make_constituencies_df(constituencies):
    return pd.DataFrame(
        [(c.tag, c.startIndex - 1, c.endIndex - 1, c.depth) for c in constituencies],
        columns=["type", "start", "end", "depth"]
    )


def count_tokens(text, st, end):
    result = stanford_nlp.annotate(
        text[st:end],
        properties={
           'annotators': 'tokenize',
           'outputFormat': 'json',
           'timeout': 60000,
        }
    )
    return len(result["tokens"])
# def count_tokens(text, st, end):
#     return len(
#         nltk.tokenize.word_tokenize(text[st:end])
#     )


def find_boundaries(text, left_st, left_end, right_st, right_end, verbose=False):
    sentence_st, _ = move_st(text, left_st, 0)
    print_if_verbose(f"Sentence starts at {sentence_st}.", verbose)
    
    first_left_token_no = count_tokens(text, sentence_st, left_st)  
    last_left_token_no  = (
        first_left_token_no + count_tokens(text, left_st, left_end)
    )
    first_right_token_no = (
        last_left_token_no + count_tokens(text, left_end, right_st)
    )
    last_right_token_no = (
        first_right_token_no + count_tokens(text, right_st, right_end)
    )
    return first_left_token_no, last_left_token_no, first_right_token_no, last_right_token_no


def get_vps_in_boundaries(constituencies_df, left_boundary, right_boundary):
    return constituencies_df.loc[
        (constituencies_df.type == "VP")
            & (
                (constituencies_df.start >= left_boundary)
                    & (constituencies_df.end <= right_boundary)
            )
    ]


def find_vp_boundaries(constituencies_df, left_boundary, right_boundary):
    vp_df = get_vps_in_boundaries(constituencies_df, left_boundary, right_boundary) 
    if len(vp_df) == 0:
        return None
    else:
        min_depth = vp_df.depth.min()
        vp = vp_df.loc[vp_df.depth == min_depth].iloc[0]
        return vp.start, vp.end

    
def find_umbrella_vps(constituencies_df, left_vp_boundaries, right_vp_boundaries):
    return constituencies_df.loc[
        (constituencies_df.type == "VP")
            & (
                (constituencies_df.start <= left_vp_boundaries[0])
                    & (constituencies_df.end >= right_vp_boundaries[1])
            )
    ]


def belong_to_one_vp(text, left_st, left_end, right_st, right_end, verbose=False):
    st, _ = move_st(text, left_st, 0)
    result = stanford_nlp.annotate(
        text[st:],
        properties={
           'annotators': 'parse',
           'outputFormat': 'json',
           'timeout': 60000,
        }
    )
    if (
        "sentences" not in result 
            or len(result["sentences"]) == 0 
            or "parse" not in result["sentences"][0]
    ):
        print_if_verbose("Failed to parse the sentence.", verbose)
        return False
    else:
        print_if_verbose(f"Parsing result:\n{result['sentences'][0]['parse']}\n", verbose)
            
    constituencies_df = make_constituencies_df(
        read_constituencies(result["sentences"][0]["parse"])
    )
    print_if_verbose(f"Constituencies:\n{constituencies_df}\n", verbose)
        
    boundaries = find_boundaries(
        text, left_st, left_end, right_st, right_end, verbose
    )
    print_if_verbose(f"Boundaries: {boundaries}", verbose)
        
    left_vp_boundaries = find_vp_boundaries(
        constituencies_df, boundaries[0], boundaries[1]
    )
    if left_vp_boundaries is None:
        print_if_verbose("Didn't find the left VP.", verbose)
        return False
    
    right_vp_boundaries = find_vp_boundaries(
        constituencies_df, boundaries[2], boundaries[3]
    )
    if right_vp_boundaries is None:
        print_if_verbose("Didn't find the right VP.", verbose)
        return False
    print_if_verbose(
        f"Left and right VP boundaries: {left_vp_boundaries}, {right_vp_boundaries}", verbose
    )
        
    umbrella_vps = find_umbrella_vps(
        constituencies_df, left_vp_boundaries, right_vp_boundaries
    )
    print_if_verbose(f"Umbrella VPs:\n{umbrella_vps}", verbose)
              
    return len(umbrella_vps) > 0


def test__belong_to_one_vp():
    text = (
        "I saw Beijing and climbed the Great Wall. "
        "This is our last chance to swim in the ocean and enjoy the warm weather."
    )

    assert belong_to_one_vp(
        text, 
        text.index("I"),
        text.index("Beijing") + 1,
        text.index("and"),
        text.index("Wall") + 1,
        False
    ) == True

    assert belong_to_one_vp(
        text, 
        text.index("to"),
        text.index("ocean") + 1,
        text.index("ocean") + 1,
        text.index("weather") + 1,
        False
    ) == True

    assert belong_to_one_vp(
        text, 
        text.index("I"),
        text.index("Beijing") + 1,
        text.index("This"),
        text.index("weather") + 1,
        False
    ) == False
    
    
test__belong_to_one_vp()