In [None]:
import pandas as pd
import collections
import ast
from wikimapper import WikiMapper
from pathlib import Path
import urllib.parse
import json
import html
from pigeon import annotate

In [None]:
# Load datasets:
# * bho_dataset: processed BHO
# * wikidata_gazetteer: gazetteer built from wikidata
# * temp_linking: output of "align_bho_cands_to_wikidata.ipynb", having used DeezyMatch

bho_dataset = pd.read_csv("/home/mcollardanuy/PlaceLinking/bho/bho.csv", index_col="id", low_memory=False)
wikidata_gazetteer = pd.read_csv("/home/mcollardanuy/PlaceLinking/wikidata/british_isles.csv", index_col=0, low_memory=False)
temp_linking = pd.read_pickle("/home/mcollardanuy/PlaceLinking/toponym_resolution/bho_wikidata/bho_queries_britwikidata_candidates+ids_wikigaz_en_002.pkl")

In [None]:
bho_dataset.head()

In [None]:
wikidata_gazetteer.head()

In [None]:
wikidata_gazetteer.columns

In [None]:
temp_linking.head()

In [None]:
def combine_dicts(func, *dicts):
    default = collections.defaultdict(set)
    for d in dicts:
        for k, v in d.items():
            default[k].add(v)
    return {k: func(v) for k, v in default.items()}

almost_exact_threshold = 0.5 # Max similarity threshold for almost-exact match
likely_match_threshold = 5 # Max similarity threshold for likely match
unlikely_match_threshold = 10 # Min similarity threshold for unlikely match

description = []
wikidata_candidates = []
for i, row in bho_dataset.iterrows():
    if row["redirected"] == False:
        toponyms = ast.literal_eval(row["toponyms"])
        wikidata_cands = dict()
        for t in toponyms:
            temp_wikidata_cands = temp_linking[temp_linking["query"] == t.strip()]["wikidata_cands"]
            if not temp_wikidata_cands.empty:
                temp_wikidata_cands = temp_wikidata_cands.item()
                wikidata_cands = combine_dicts(min, wikidata_cands, temp_wikidata_cands)
        wikidata_cands = dict(sorted(wikidata_cands.items(), key=lambda item: item[1]))
        cand_keys = list(wikidata_cands.keys())
        if len(wikidata_cands) >= 1:
            # Case 1: multiple almost exact matches
            if wikidata_cands[cand_keys[0]] < almost_exact_threshold and wikidata_cands[cand_keys[1]] < almost_exact_threshold:
                description.append("multiple_exact")
                wikidata_candidates.append(wikidata_cands)
            # Case 2: one almost exact match, other competitive potential matches
            elif wikidata_cands[cand_keys[0]] < almost_exact_threshold and wikidata_cands[cand_keys[1]] < likely_match_threshold:
                description.append("unique_exact_with_competition")
                wikidata_candidates.append(wikidata_cands)
            # Case 3: one almost exact match, other matches non competitive
            elif wikidata_cands[cand_keys[0]] < almost_exact_threshold and wikidata_cands[cand_keys[1]] > likely_match_threshold:
                description.append("unique_exact_no_competition")
                wikidata_candidates.append(wikidata_cands)
            # Case 4: no almost-exact matches, but likely potential matches
            elif wikidata_cands[cand_keys[0]] < likely_match_threshold:
                description.append("no_exact_potential_match")
                wikidata_candidates.append(wikidata_cands)
            # Case 5: no almost-exact matches, only less likely potential matches
            elif wikidata_cands[cand_keys[0]] > likely_match_threshold and wikidata_cands[cand_keys[0]] <= unlikely_match_threshold:
                description.append("potential_no_match")
                wikidata_candidates.append(wikidata_cands)
            # Case 6: only unlikely potential matches
            elif wikidata_cands[cand_keys[0]] > unlikely_match_threshold:
                description.append("unlikely_match")
                wikidata_candidates.append(wikidata_cands)
        # Case 7: no candidates
        else:
            description.append("no_candidates")
            wikidata_candidates.append({})
    else:
        description.append("redirection")
        wikidata_candidates.append({})
        
bho_dataset["linking_scenario"] = description
bho_dataset["wikidata_cands"] = wikidata_candidates

In [None]:
bho_dataset.head()

In [None]:
bho_dataset[bho_dataset["linking_scenario"] == "unlikely_match"]

In [None]:
bho_dataset["linking_scenario"].value_counts()

In [None]:
bho_dataset_to_annotate = bho_dataset[bho_dataset["linking_scenario"] != "redirection"]

In [None]:
bho_dataset_to_annotate["linking_scenario"].value_counts()

In [None]:
sampled_df = bho_dataset_to_annotate.groupby('linking_scenario').apply(lambda x: x.sample(50, replace=True)).reset_index(drop=True)
sampled_df = sampled_df.drop_duplicates(subset=["title", "toponyms", "contextwords"])

In [None]:
sampled_df.shape

In [None]:
mapper = WikiMapper("/resources/wikidata2wikipedia/index_enwiki-20190420.db")

def map_wikidata2wikidump(wikidataId):
    path = "/resources/wikipedia/extractedResources/Aspects/"
    wikititles = mapper.id_to_titles(wikidataId)
    wikititles = [urllib.parse.quote(title.replace("_"," ")) for title in wikititles]
    wikidata_text = ""
    for title in wikititles:
        if Path(path + title+".json").is_file():
            wikidump = path + title+".json"
            with open(wikidump) as f:
                data = json.load(f)
                tmp_wkdt_text = " ".join(data["Main"]["content"][1:5])
                if len(tmp_wkdt_text) > len(wikidata_text):
                    wikidata_text = tmp_wkdt_text
    if wikidata_text == "":
        wikidata_text = "[No Wikipedia page]"
    return wikidata_text

In [None]:
mapper = WikiMapper("/resources/wikidata2wikipedia/index_enwiki-20190420.db")

resolutions = []
for i, row in sampled_df.iterrows():
    bho_title = row["title"]
    bho_content = ast.literal_eval(row["content"].strip())
    bho_wkcandidates = dict()
    wkcds = row["wikidata_cands"]
    if wkcds:
        min_value = min(wkcds.values())
        best_wikidata_matches = [key for key, value in wkcds.items() if value == min_value]
        for wkcd in best_wikidata_matches:
            wkdf = wikidata_gazetteer[wikidata_gazetteer["wikidata_id"] == wkcd]
            wkcd_hc = []
            if not wkdf.empty:
                # Get location's historical counties:
                hcounties = ast.literal_eval(wkdf.iloc[0]["hcounties"])
                for hc in hcounties:
                    hcountydf = wikidata_gazetteer[wikidata_gazetteer["wikidata_id"] == hc]
                    if not hcountydf.empty:
                        wkcd_hc.append(hcountydf.iloc[0]["english_label"])
                # Wikidata candidate disambiguators are:
                # * English label
                # * Wikidata description
                # * Wikipedia's content first sentences
                # * Historical counties from wikidata
                wkcand_disambiguators = (wkdf.iloc[0]["english_label"], wkdf.iloc[0]["description_set"], map_wikidata2wikidump(wkcd), wkcd_hc)
                bho_wkcandidates[wkdf.iloc[0]["wikidata_id"]] = wkcand_disambiguators
    resolutions.append([bho_title, bho_content, bho_wkcandidates])

In [None]:
resolution_strings = []
for r in resolutions:
    longstring = "=========================\nBHO ENTRY: "
    longstring += r[0]
    longstring += "\n"
    longstring += "=========================\n"
    longstring += r[1][0].strip()
    longstring += "\n"
    longstring += "\n"
    longstring += "=========================\n"
    longstring += "WIKIDATA CANDIDATES\n"
    longstring += "=========================\n"
    if r[2]:
        for cd in r[2]:
            if type(r[2][cd][0]) == str:
                longstring += "\n-------------------------\n"
                longstring += "* " + cd + " (" + r[2][cd][0] + ")"
                longstring += "\n-------------------------\n"
                if r[2][cd][3]:
                    longstring += "[Historical county] " + ", ".join(r[2][cd][3]) + "\n"
                description = ""
                if r[2][cd][2] != "### No Wikipedia page ###":
                    description = r[2][cd][2].strip()
                else:
                    description = r[2][cd][1].strip()
                longstring += "[Description] " + description + "\n"
                
    longstring += "=========================\n"
    resolution_strings.append(longstring)

In [None]:
annotations = annotate(
    [r.splitlines() for r in resolution_strings]
)