# Preamble

In [1]:
from flair.models import SequenceTagger
from flair.data import Sentence, Token
from nltk import word_tokenize
import stanfordnlp
import nltk
import json
import spacy
import re

In [2]:
PATH_WIKIPEDIA_EXTRACTION = "../../data/causality-graphs/extraction/"
PATH_WIKIPEDIA_EXTRACTION += "wikipedia/wikipedia-extraction.tsv-backup"
PATH_FLAIR_FOLDER = "../../data/flair-models/lists/"

PATH_NLTK_RESOURCES = "../../data/external/nltk/"
PATH_STANFORD_RESOURCES = "../../data/external/stanfordnlp/"

PATH_OUTPUT_GRAPH = "../../data/causality-graphs/spotting/wikipedia/list-graph.json"

In [3]:
nltk.download('punkt', PATH_NLTK_RESOURCES)
nltk.data.path.append(PATH_NLTK_RESOURCES)

[nltk_data] Downloading package punkt to
[nltk_data]     ../../data/downloads/nltk_data/...
[nltk_data]   Package punkt is already up-to-date!


In [4]:
stanfordnlp.download('en', PATH_STANFORD_RESOURCES)
stanford_nlp = stanfordnlp.Pipeline(processors='tokenize,pos',
                                    models_dir=PATH_STANFORD_RESOURCES,
                                    treebank='en_ewt',
                                    use_gpu=True)

stanford_nlp_pretokenized = stanfordnlp.Pipeline(processors='tokenize,pos',
                                    models_dir=PATH_STANFORD_RESOURCES,
                                    treebank='en_ewt',
                                    tokenize_pretokenized=True,
                                    use_gpu=True)

Using the default treebank "en_ewt" for language "en".
Use device: gpu
	Torch-GPU-ID: 0
---
Loading: tokenize
With settings: 
{'model_path': '../../data/downloads/stanfordnlp_resources/en_ewt_models/en_ewt_tokenizer.pt', 'lang': 'en', 'shorthand': 'en_ewt', 'mode': 'predict'}
---
Loading: pos
With settings: 
{'model_path': '../../data/downloads/stanfordnlp_resources/en_ewt_models/en_ewt_tagger.pt', 'pretrain_path': '../../data/downloads/stanfordnlp_resources/en_ewt_models/en_ewt.pretrain.pt', 'lang': 'en', 'shorthand': 'en_ewt', 'mode': 'predict'}
Done loading processors!
---
Use device: gpu
	Torch-GPU-ID: 0
---
Loading: tokenize
With settings: 
{'model_path': '../../data/downloads/stanfordnlp_resources/en_ewt_models/en_ewt_tokenizer.pt', 'pretokenized': True, 'lang': 'en', 'shorthand': 'en_ewt', 'mode': 'predict'}
---
Loading: pos
With settings: 
{'model_path': '../../data/downloads/stanfordnlp_resources/en_ewt_models/en_ewt_tagger.pt', 'pretrain_path': '../../data/downloads/stanfordn

In [5]:
# Spacy version: 2.1.8
# Model version: 2.1.0
# pip install https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-2.1.0/en_core_web_sm-2.1.0.tar.gz
spacy_nlp = spacy.load('en_core_web_sm')

print(spacy.__version__)
print(spacy_nlp.meta['version'])

2.1.8
2.1.0


# Loading Lists

In [6]:
list_set = []

for line in open(PATH_WIKIPEDIA_EXTRACTION, encoding="utf-8"):
    parts = line.strip().split('\t')
    if parts[0] != 'wikipedia_list':
        continue
    assert len(parts) == 9
    list_data = {
        "list": json.loads(parts[8]),
        "type": "wikipedia_list",
        "payload": {
            "wikipedia_page_id": parts[1],
            "wikipedia_page_title": parts[2],
            "wikipedia_revision_id": parts[3],
            "wikipedia_revision_timestamp": parts[4],
            "list_toc_parent_title": parts[5][1:-1],
            "list_toc_section_heading": parts[6][1:-1],
            "list_toc_section_level": parts[7]
        }
    }
    list_set.append(list_data)

# Preprocessing

In [7]:
def is_valid_article(title):
    forbidden_title_parts = ['Wikipedia:', 'Template:', 'File:',
                             'Portal:', 'Category:', 'Draft:',
                             'List of', 'disambiguation']

    contains_forbidden_title_part = False
    for forbidden_title_part in forbidden_title_parts:
        if forbidden_title_part in title:
            contains_forbidden_title_part = True
            break

    return not contains_forbidden_title_part

In [8]:
def is_in_causal_section(section_title, section_level):
    allowed_sections = ['Cause', 'Causes', 'Risk factor',
                        'Risk factors' 'Symptom', 'Symptoms',
                        'Signs and symptoms']
    return section_title in allowed_sections and section_level == '2'

In [9]:
list_data_preprocessed = []

for list_data in list_set:
    if not is_valid_article(list_data['payload']['wikipedia_page_title']):
        continue

    list_toc_section_heading = list_data['payload']['list_toc_section_heading']
    list_toc_section_level = list_data['payload']['list_toc_section_level']
    if not is_in_causal_section(list_toc_section_heading,
                                list_toc_section_level):
        continue

    list_data_preprocessed.append(list_data)

In [10]:
def get_pos(sentence):
    tags = []
    for token in sentence.tokens:
        for word in token.words:
            tags.append((word.text, word.pos))
    return tags


def pos_tagging(doc):
    tags = []
    for sentence in doc.sentences:
        for pos in get_pos(sentence):
            tags.append(pos)
    return tags

In [11]:
def is_verb(pos_tag):
    '''
    List of verb-POS-Tags:
    VB - Verb, base form
    VBD - Verb, past tense
    VBG - Verb, gerund or present participle
    VBN - Verb, past participle
    VBP - Verb, non-3rd person singular present
    VBZ - Verb, 3rd person singular present
    '''
    return pos_tag in ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']

In [12]:
def preprocess(string):
    # split newlines
    newline_split = [elem.strip()
                     for elem in string.split('\n')
                     if elem.strip() != '']

    # remove brackets
    for i in range(len(newline_split)):
        newline_split[i] = re.sub(r"\s?\([^)]*\)\s?",
                                  "",
                                  newline_split[i]).strip()

    '''
        - remove 'See ...'
        - remove paragraphs
        - (multiple sentences; contains more than a sentence -> is paragraph)
        - remove whole sentences (phrases with verb)
    '''
    result = []
    for string in newline_split:

        # Remove 'See ...'
        if re.match(r"^See\s.*", string) is not None:
            continue

        # Remove leading list headings like "Initially: ... Later: ..."
        # heading at beginning
        string = re.sub(r"^\w+:\s?", "", string).strip()
        # heading within the string
        string = re.sub(r"\w+:\s?", "", string).strip()

        if string is None or string == '':
            continue

        doc = stanford_nlp(string)

        # Remove paragraphs
        if len(doc.sentences) != 1:
            continue

        # Remove whole sentences (with verb)
        contains_verb = False
        pos_tags = pos_tagging(doc)
        for i in range(len(pos_tags)):
            if is_verb(pos_tags[i][1]):
                contains_verb = True
                break
        if contains_verb:
            continue

        result.append(string)
    return result

In [13]:
for list_data in list_data_preprocessed:
    list_data['list'] = preprocess(list_data['list'])

In [14]:
list_data_for_spotting = []
for list_data in list_data_preprocessed:
    for data in list_data['list']:
        result = {'list': data,
                  "sources": [{'type': list_data['type'],
                               'payload': list_data['payload']}]}
        list_data_for_spotting.append(result)

# POS-Tagging

In [15]:
def get_offset_of_tags_in_sentence(sentence, tags):
    # go from left to right and determine tag offsets
    offsets = []
    total_offset = 0
    for tag in tags:
        label = tag[0]
        local_offset = sentence.find(label)
        offset = total_offset + local_offset
        offsets.append(offset)

        # prepare for next iteration
        sentence = sentence[local_offset + len(label):]
        total_offset = offset + len(label)
    return offsets


def get_pos_tags_of_sentence(sentence):
    tags = []
    for token in sentence.tokens:
        for word in token.words:
            tags.append((word.text, word.pos))
    return tags


def calculate_pos_tags_for_string(doc):
    tags = []
    for sentence in doc.sentences:
        sentence_pos = []
        for pos in get_pos_tags_of_sentence(sentence):
            sentence_pos.append(pos)
        tags.append(sentence_pos)
    return tags


def pos_tagging(strings):
    batch = '\n\n'.join([' '.join(word_tokenize(string))
                         for string in strings])
    doc = stanford_nlp_pretokenized(batch)
    tags = calculate_pos_tags_for_string(doc)
    assert len(tags) == len(strings)

    result = []
    for i in range(len(strings)):
        offsets = get_offset_of_tags_in_sentence(strings[i], tags[i])
        result.append([(tags[i][x][0], tags[i][x][1], str(offsets[x]))
                       for x in range(len(tags[i]))])
    return result

In [16]:
strings = []
for sample in list_data_for_spotting:    
    strings.append(sample['list'])
tags = pos_tagging(strings)

In [17]:
for i in range(len(list_data_for_spotting)):
    sample = list_data_for_spotting[i]
    sample['list:POS'] = tags[i]

# List-Spotter: Prediction 

In [18]:
def get_index(chunk):
    return [int(chunk.tokens[0].get_tag('idx').value),
            int(chunk.tokens[-1].get_tag('idx').value)
            + len(chunk.tokens[-1].text)]

In [19]:
classifier = SequenceTagger.load(PATH_FLAIR_FOLDER + 'final-model.pt')

2020-08-14 21:58:24,028 loading file ../../data/flair-data/lists/final-model.pt


In [20]:
for sample in list_data_for_spotting:
    sentence = Sentence(use_tokenizer=False)

    for pos in sample['list:POS']:
        token = Token(pos[0])
        token.add_tag('POS', pos[1])
        token.add_tag('idx', pos[2])
        sentence.add_token(token)
    
    classifier.predict(sentence)
    chunks = [get_index(chunk) for chunk in sentence.get_spans('chunk_BIO')]

    extraction = []
    for chunk in chunks:
        extraction.append(sample['list'][chunk[0]:chunk[1]])

    sample['extraction'] = extraction

# Post-processing

In [21]:
def spacy_tagging(sentence):
    doc = spacy_nlp(sentence)
    tagging = []
    for token in doc:
        tagging.append((token.text, token.tag_, token.idx))
    return tagging

In [22]:
def post_process_value(value):
    if value is None:
        return value

    tags = spacy_tagging(value)

    left = 0
    right = len(tags)-1

    punctuation = ['.', ',', ';', '(', ')', '``', "''"]
    cutoff = ['CC', 'DT', 'PRP', 'PRP$'] + punctuation

    for tag in tags:
        if tag[1] in cutoff:
            left += 1
        else:
            break

    for tag in reversed(tags):
        if tag[1] in cutoff:
            right -= 1
        else:
            break

    try:
        return value[tags[left][2]:tags[right][2] + len(tags[right][0])]
    except:
        return value

In [23]:
def get_relation(section_heading, parent_title, value):

    if section_heading in ['Symptom', 'Symptoms', 'Signs and symptoms']:
        return (parent_title, value)

    if section_heading in ['Cause', 'Causes']:
        return (value, parent_title)

    if section_heading in ['Risk factor', 'Risk factors']:
        return (value, parent_title)

    raise Exception("Not handled.")

In [24]:
list_graph = []

for sample in list_data_for_spotting:
    for value in sample['extraction']:
        value = post_process_value(value)

        if len(value) == 0:
            continue

        source = sample['sources'][0]['payload']
        relation = get_relation(
            source['list_toc_section_heading'],
            source['list_toc_parent_title'],
            value)

        causal_relation = {'causal_relation': {
            'cause': {'concept': relation[0]},
            'effect': {'concept': relation[1]},
        }, 'sources': sample['sources']}

        list_graph.append(causal_relation)

# Save List-Graph

In [25]:
jsonarray = json.dumps(list_graph)
file_list_graph = open(PATH_OUTPUT_GRAPH, "w+")
file_list_graph.write(jsonarray)
file_list_graph.close()