## Imports

In [None]:
!git clone https://github.com/daniel-furman/Capstone.git
!pip install -r /content/Capstone/requirements.txt
#!pip install -r /content/Capstone/requirements_llama.txt

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import json
from json import JSONDecodeError
import urllib.parse
import urllib.request

import os

import re
from ftfy import fix_text
from string import punctuation

import spacy

In [None]:
os.chdir("/content/Capstone/notebooks/error_analysis")

## Load Entity Data and Spacy models

In [None]:
code_to_lang_dict = {
    "bg": "Bulgarian",
    "ca": "Catalan",
    "cs": "Czech",
    "da": "Danish",
    "de": "German",
    "en": "English",
    "es": "Spanish",
    "fr": "French",
    "hr": "Croatian",
    "hu": "Hungarian",
    "it": "Italian",
    "nl": "Dutch",
    "pl": "Polish",
    "pt": "Portuguese",
    "ro": "Romanian",
    "ru": "Russian",
    "sl": "Slovenian",
    "sr": "Serbian",
    "sv": "Swedish",
    "uk": "Ukrainian",
}

In [None]:
code_to_wiki_cleanup_dict = {
    "ca": "Referèncie",
    "da": "Litteratur",
    "de": "Literatur",
    "en": "References",
    "es": "Referencias",
    "fr": "Notes et références",
    "hr": "Izvori",
    "it": "Note",
    "nl": "Literatuur",
    "pl": "Przypisy",
    "pt": "Referências",
    "ro": "Note",
    "ru": "Примечания",
    "sv": "Källor",
    "uk": "Література",
}

In [None]:
# one could change this to a different model like their
# transformer based variant
# but that is not available for every language we want to work with
code_to_spacy_model_dict = {
    "ca": "ca_core_news_lg",
    "da": "da_core_news_lg",
    "de": "de_core_news_lg",
    "en": "en_core_web_lg",
    "es": "es_core_news_lg",
    "fr": "fr_core_news_lg",
    "hr": "hr_core_news_lg",
    "it": "it_core_news_lg",
    "nl": "nl_core_news_lg",
    "pl": "pl_core_news_lg",
    "pt": "pt_core_news_lg",
    "ro": "ro_core_news_lg",
    "ru": "ru_core_news_lg",
    "sv": "sv_core_news_lg",
    "uk": "uk_core_news_lg",
}

In [None]:
!python -m spacy download {code_to_spacy_model_dict["ca"]}
# !python -m spacy download {code_to_spacy_model_dict["da"]}
# !python -m spacy download {code_to_spacy_model_dict["de"]}
!python -m spacy download {code_to_spacy_model_dict["en"]}
# !python -m spacy download {code_to_spacy_model_dict["es"]}
!python -m spacy download {code_to_spacy_model_dict["fr"]}
# !python -m spacy download {code_to_spacy_model_dict["hr"]}
# !python -m spacy download {code_to_spacy_model_dict["it"]}
# !python -m spacy download {code_to_spacy_model_dict["nl"]}
# !python -m spacy download {code_to_spacy_model_dict["pl"]}
# !python -m spacy download {code_to_spacy_model_dict["pt"]}
# !python -m spacy download {code_to_spacy_model_dict["ro"]}
# !python -m spacy download {code_to_spacy_model_dict["ru"]}
# !python -m spacy download {code_to_spacy_model_dict["sv"]}
# !python -m spacy download {code_to_spacy_model_dict["uk"]}

In [None]:
def load_spacy_models(code_to_spacy_model_dict):
    container = {}
    for lang, model in code_to_spacy_model_dict.items():
        container[lang] = spacy.load(model)

    return container

In [None]:
spacy_models = load_spacy_models(code_to_spacy_model_dict)

In [None]:
lang_codes = list(code_to_lang_dict.keys())

In [None]:
entity_analysis_df = pd.read_csv(
    "../../data/error_analysis/entity_analysis_language_and_accuracy_by_entity.csv"
)

In [None]:
entity_analysis_df.head()

So we have 23k entities to work with. We're interested in how many times they get mentioned on wikipedia.

## Extract Entities and Their Translated Forms

In [None]:
print(len(entity_analysis_df["entity"]))

In [None]:
# get lookup that connects the english form of an entity to its multilingual version
# annoying that with the way the DF is set up right now, have to do manual cleanup to extract the translated forms
# should update the other NB so that it ouptuts a well formatted json into the column
target_entities_multiling = {}
for row in entity_analysis_df.iterrows():
    d = row[1].alternate_forms
    for code in code_to_lang_dict.keys():
        d = d.replace("'" + code + "'", '"' + code + '"')

    d = d.replace(": '", ': "')
    d = d.replace("',", '",')
    d = d.replace("'}", '"}')
    d = d.replace('""', '"')
    try:
        d = json.loads(d)
        target_entities_multiling[row[1].entity] = d

    except JSONDecodeError:
        print(f"couldn't parse {d}")

In [None]:
len(target_entities_multiling)

## Get Data From Wikipedia

In [None]:
# for a given language, randomly sample <n> articles (max of 500).
# return a dict of their id and title.
def get_wikipedia_pages(lang, debug=False):
    # construct URL for API call
    articles_url = f"https://{lang}.wikipedia.org/w/api.php?action=query&list=random&format=json&rnnamespace=0&rnlimit=50&format=json"

    # grab data
    url = urllib.request.urlopen(articles_url)

    # read data
    data = url.read()

    # set encoding and load into obj
    encoding = url.info().get_content_charset("utf-8")
    obj = json.loads(data.decode(encoding))

    if "query" not in obj or "random" not in obj["query"]:
        if debug:
            print(
                f"Unable to grab articles from {code_to_lang_dict[lang]} using URL {url}."
            )
        raise Exception

    mappings = obj["query"]["random"]
    ids = {}
    for m in mappings:
        ids[m["id"]] = m["title"]

    if debug:
        print(f"Fetched {len(ids)} articles from {code_to_lang_dict[lang]} wikipedia")
    return ids

In [None]:
"""
for an inputted article_id:title combination
we want to hit:
https://en.wikipedia.org/w/api.php?action=query&format=json&titles=Kerala&prop=extracts&explaintext
good response - {"batchcomplete":"","query":{"pages":{"14958":{"pageid":14958,"ns":0,"title":"Kerala"
bad response  - {"batchcomplete":"","query":{"pages":{"-1":{"ns":0,"title":"Kerala","missing":""}}}}
"""


def get_article_info(
    article_title, pageid, lang, code_to_wiki_cleanup_dict, debug=False
):
    # val
    if article_title == "" or article_title is None:
        if debug:
            print("Can't parse empty title.")
        return {}

    if lang == "" or lang is None:
        if debug:
            print("Input a language.")
        return {}

    lang = lang.lower()

    url = ""

    # format title via quote escapes to ensure non-ascii chars can get handed off properly
    quoted_title = urllib.parse.quote(article_title)

    # construct url
    # where lang is the language we are requested
    # and quoted title refers to our article
    info_url = f"https://{lang}.wikipedia.org/w/api.php?action=query&format=json&titles={quoted_title}&prop=extracts&explaintext&format=json"

    if debug:
        print(
            f"calling {info_url} to retrieve info about {article_title} from {lang} wiki."
        )

    # grab data
    try:
        url = urllib.request.urlopen(info_url)
    except UnicodeDecodeError:
        print(
            f"could not decode API call for {article_title} on {lang} wiki; url is {info_url}."
        )
        return {}

    # read content
    data = url.read()

    # set encoding and load into obj
    encoding = url.info().get_content_charset("utf-8")
    obj = json.loads(data.decode(encoding))

    if "query" not in obj or "pages" not in obj["query"]:
        if debug:
            print(f"Error parsing response for {article_title} from {lang} wiki.")
        raise Exception

    # check for a 'missing'/bad response
    if -1 in obj["query"]["pages"].keys():
        if debug:
            print(f"No wiki data found for {article_title} on {lang} wiki.")
        return {}

    # get pageid of the returned article
    data_pageid = list(obj["query"]["pages"].keys())[0]

    # double check pageid matches the one returned by API
    if data_pageid != pageid:
        if debug:
            print(
                f"id mismatch -- excpected {pageid} but retrieved {data_pageid} for {article_title} on {lang} wiki."
            )
        return {}

    # check if text is properly returned
    if "extract" not in obj["query"]["pages"][data_pageid]:
        if debug:
            print(
                f"could not retrieve text from {pageid} {article_title} on {lang} wiki."
            )
        return {}

    # get text
    content = obj["query"]["pages"][data_pageid]["extract"]

    # fix text
    content = fix_text(content)

    # remove references and whatever is below that as well
    references_line = code_to_wiki_cleanup_dict[lang]

    if "\n== " + references_line in content:
        content = content[0 : content.find("\n== " + references_line)]
    elif "\n=== " + references_line in content:
        content = content[0 : content.find("\n=== " + references_line)]
    else:
        if debug:
            print(
                f"Couldn't remove references for {article_title} with content {content} searching for {references_line}"
            )

    # light string substitutions
    content = content.replace("\n", " ")
    content = content.replace("=", " ")
    content = re.sub(r"\s{2,}", "", content)

    return {article_title: content}

In [None]:
# for inputted article content
# how many entities appear in the article?
# (total as well as unique)
# how many of our target entities appear in the text?
# (total as well as unique)
# how many words are in the article?


# always search english and the native language in-case of translation inconsistencies
def count_entities_in_article(
    target_entities_multiling, article_content, spacy_models, lang, debug=False
):
    nlp = spacy_models[lang]

    all_entities = {}

    if article_content is None:
        if debug:
            print("article content is empty.")
        return {}

    article_title = list(article_content.keys())[0]
    article_text = list(article_content.values())[0]

    if (
        article_title is None
        or article_title == ""
        or article_text is None
        or article_title == ""
    ):
        if debug:
            print(f"Could not parse article content -- {article_content}")
        return {}

    if debug:
        print(f"Parsing article {article_title}.")

    doc = nlp(article_text)

    word_count = 0
    for token in doc:
        if token.text not in punctuation:
            word_count += 1

    if debug:
        print(f"{article_title} has {word_count} words.")

    for ent in doc.ents:
        if ent.label_ == "MISC":
            continue
        formatted_entity = ent.text + "___" + ent.label_
        if formatted_entity not in all_entities:
            all_entities[formatted_entity] = 1
        else:
            all_entities[formatted_entity] += 1

    if debug:
        print(f"{article_title} mentions {len(all_entities)} unique entities.")
        print(
            f"{article_title} includes {sum(list(all_entities.values()))} entity mentions"
        )

    target_entities = {}

    # look through our target entity mapping
    # which connects an english entity to the languages its translated into and that form
    for target_entity_english, translated_info in target_entities_multiling.items():
        # for all the language / entity combos
        for code, translated_entity in translated_info.items():
            # get the data for the one we care about
            if code == lang:
                # for every entity that spacy tagged
                for e in all_entities:
                    # pull out the plain text form
                    doc_entity = e.split("___")[0]
                    # if either the translated version (of lang <lang>) is in our target set
                    # or the english version is in our target set
                    # record that this article contains that target entity
                    if (
                        translated_entity == doc_entity
                        or target_entity_english == doc_entity
                    ):
                        target_entities[target_entity_english] = all_entities[e]

    if debug:
        print(f"{article_title} mentions {len(target_entities)} target entities.")
        print(
            f"{article_title} includes {sum(list(target_entities.values()))} target entity mentions."
        )

    return word_count, all_entities, target_entities

In [None]:
# get_article_info('Скворцов Борис Дмитрович','3446530', 'uk', code_to_wiki_cleanup_dict, debug=True)
obama_info = get_article_info(
    "Barack Obama", "534366", "en", code_to_wiki_cleanup_dict, debug=True
)
# swedish_aricle_info = get_article_info("Lambula aethalocis", "2924062", "sv", code_to_wiki_cleanup_dict, debug=True)
# catalan_article_info = get_article_info("Cúmul de l'Ànec Salvatge", "260276", "ca", code_to_wiki_cleanup_dict, debug=True)
# french_article_info = get_article_info('Angleterre', '4925', 'fr', code_to_wiki_cleanup_dict, debug=True)

# get_article_info('Barack Obama','430434', 'es', debug=True)

In [None]:
(
    obama_word_count,
    obama_article_entities,
    obama_target_entities,
) = count_entities_in_article(
    target_entities_multiling, obama_info, spacy_models, "en", debug=True
)

In [None]:
c = 0
for k, v in obama_target_entities.items():
    print(k, v)
    c += 1
    if c == 10:
        break