# Process Physicists Raw Data

Now I aim to convert the *JSON lines* format [notable physicists raw data](../data/raw/notable_physicists.jsonl.gz) into an intermediate format *pandas dataframe* that is more convenient to work with. This intermediate format will be close to the final format of the data that I'll be working with for analysis.

My goal is to parse the JSON data in order to extract the interesting fields of information such as *name*, *birth date*, *death date*, *citizenships*, *workplaces*, *awards*, *alma mater*, *academic advisors*, *doctoral students*, *DBpedia categories*, *description*, etc. I believe that this information may be useful in helping to predict whether or not a physicist is awarded a Nobel Prize.

## Setting up the Environment

An few initialization steps are needed to setup the environment:
- The top-level module directory of the repository needs to be added to the system path to enable the loading of python modules.
- The locale needs to be set for all categories to the user’s default setting (typically specified in the LANG environment variable) to enable correct sorting of words with accents.
- [spacy](https://spacy.io/) is a python package that is used for *natural language processing*. It relies on statistical [language models](https://spacy.io/usage/models) which must be loaded before use. So here I load `en_core_web_sm` which is the default English language model in spacy.  

In [None]:
import sys
import locale
import spacy

repo_dir = '../'
if repo_dir not in sys.path:
    sys.path.append(repo_dir)
    
locale.setlocale(locale.LC_ALL, '')

nlp = spacy.load('en_core_web_sm')

In [None]:
import collections
import gzip
import os
import re
import shutil
import time
from datetime import datetime
from urllib import parse

import jsonlines
import pandas as pd
from pandas.io.json import json_normalize

from src.data.url_utils import DBPEDIA_RESOURCE_URL
from src.data.url_utils import get_filename_from_url
from src.data.url_utils import get_pathname_from_url
from src.data.url_utils import get_redirect_urls
from src.data.url_utils import quote_url
from src.data.url_utils import urls_progress_bar

## Reading in the JSON Lines Data

First let's extract the JSON lines data and read it into a list so that we can parse it later.

In [None]:
gzip_file = '../data/raw/notable_physicists.jsonl.gz'
jsonl_file = gzip_file[:-3]
with gzip.open(gzip_file, 'rb') as src, open(jsonl_file, 'wb') as dest:
    shutil.copyfileobj(src, dest)

json_lines = []
with jsonlines.open(jsonl_file, mode='r') as reader:
    for json_line in reader:
        json_lines.append(json_line)
os.remove(jsonl_file)

## Defining the Fields of Interest

Here I define the keys and values that I wish to extract from the JSON lines data and also some that I explicitly wish to exclude. These keys and values are in the form of *semantic URLs* which allows anyone to visit the resource in a web browser and see their meaning. The URLs are in 5 namespaces:

- [DBpedia Ontology](https://wiki.dbpedia.org/services-resources/ontology)
- DBpedia Property
- PURL
- W3
- FOAF

In [None]:
DBPEDIA_JSON_KEYS = [
    # DBpedia ontology
    'http://dbpedia.org/ontology/abstract',
    'http://dbpedia.org/ontology/academicAdvisor',
    'http://dbpedia.org/ontology/almaMater',
    'http://dbpedia.org/ontology/award',
    'http://dbpedia.org/ontology/birthDate',
    'http://dbpedia.org/ontology/birthName',
    'http://dbpedia.org/ontology/birthPlace',
    'http://dbpedia.org/ontology/child',
    'http://dbpedia.org/ontology/citizenship',
    'http://dbpedia.org/ontology/deathDate',
    'http://dbpedia.org/ontology/deathPlace',
    'http://dbpedia.org/ontology/doctoralAdvisor',
    'http://dbpedia.org/ontology/doctoralStudent',
    'http://dbpedia.org/ontology/education',
    'http://dbpedia.org/ontology/field',
    'http://dbpedia.org/ontology/influenced',
    'http://dbpedia.org/ontology/knownFor',
    'http://dbpedia.org/ontology/nationality',
    'http://dbpedia.org/ontology/notableStudent',
    'http://dbpedia.org/ontology/parent',
    'http://dbpedia.org/ontology/residence',
    'http://dbpedia.org/ontology/spouse',
    'http://dbpedia.org/ontology/thumbnail',
    'http://dbpedia.org/ontology/wikiPageID',
    'http://dbpedia.org/ontology/wikiPageRevisionID',

    # DBpedia property
    'http://dbpedia.org/property/academicAdvisor',
    'http://dbpedia.org/property/almaMater',
    'http://dbpedia.org/property/award',
    'http://dbpedia.org/property/birthDate',
    'http://dbpedia.org/property/birthName',
    'http://dbpedia.org/property/birthPlace',
    'http://dbpedia.org/property/child',
    'http://dbpedia.org/property/children',
    'http://dbpedia.org/property/citizenship',
    'http://dbpedia.org/property/deathDate',
    'http://dbpedia.org/property/deathPlace',
    'http://dbpedia.org/property/doctoralAdvisor',
    'http://dbpedia.org/property/doctoralStudent',
    'http://dbpedia.org/property/education',
    'http://dbpedia.org/property/field',
    'http://dbpedia.org/property/influenced',
    'http://dbpedia.org/property/knownFor',
    'http://dbpedia.org/property/nationality',
    'http://dbpedia.org/property/notableStudent',
    'http://dbpedia.org/property/parent',
    'http://dbpedia.org/property/residence',
    'http://dbpedia.org/property/signature',
    'http://dbpedia.org/property/spouse',
    'http://dbpedia.org/property/thesisTitle',
    'http://dbpedia.org/property/thesisUrl',
    'http://dbpedia.org/property/thesisYear',
    'http://dbpedia.org/property/thumbnail',
    'http://dbpedia.org/property/workInstitutions',
    'http://dbpedia.org/property/workplaces',

    # PURL
    'http://purl.org/dc/terms/description',

    # W3
    'http://www.w3.org/2000/01/rdf-schema#comment',
    'http://www.w3.org/ns/prov#wasDerivedFrom',

    # FOAF
    'http://xmlns.com/foaf/0.1/gender',
    'http://xmlns.com/foaf/0.1/givenName',
    'http://xmlns.com/foaf/0.1/isPrimaryTopicOf',
    'http://xmlns.com/foaf/0.1/name',
    'http://xmlns.com/foaf/0.1/surname'
]

DBPEDIA_JSON_VALUES = [
    # DBpedia ontology
    'http://dbpedia.org/ontology/influenced',  # target is influencedBy
    'http://dbpedia.org/ontology/influencedBy',  # target is influenced

    # DBpedia property
    'http://dbpedia.org/property/theorized'
]

DBPEDIA_IGNORE_URLS = [
    'http://dbpedia.org/resource/Bachelor_of_Arts',
    'http://dbpedia.org/resource/Bachelor_of_Science',
    'http://dbpedia.org/resource/Doctor_of_Philosophy',
    'http://dbpedia.org/resource/Master_of_Arts',
    'http://dbpedia.org/resource/Master_of_Philosophy',
    'http://dbpedia.org/resource/Master_of_Science',
    'http://dbpedia.org/resource/None',
    'http://dbpedia.org/resource/PhD',
    'http://dbpedia.org/resource/Ph.D',
    'http://dbpedia.org/resource/Ph.D.'
]

As you can see, there are a lot of duplicate keys in the ontology and property namespaces (e.g. citizenship). Also, there are a lot of slightly different named keys that sound like they hold similar information (e.g. alma mater and education). Clearly, a set of rules will need to be defined about how to handle these duplicate keys and named variants.

## Creating the Physicists Dictionaries

Now I parse the JSON lines data to create dictionaries. The following rules apply when creating the dictionaries:

1. Values in the DBpedia Ontology namespace takes precedence over those in the DBpedia Property namespace since, as described in section 4.3 of [DBpedia datasets](https://wiki.dbpedia.org/services-resources/datasets/dbpedia-datasets), it contains the cleanest data. Property namespace values are only used when there are no corresponding ontology namespace values.

2. Several keys holding similar data are merged together. *Education* is merged into *alma mater*, *work institutions* is merged into *workplaces* and *children* is merged into *child*. *InfluencedBy* and *Influenced* are also merged appropriately.

3. Some of the fields (e.g. the *abstract*) is multilingual. In such cases, only the English is extracted.

4. The data is messy and some cleanup is done on the fields involving *dates*, *nationality*, *citizenship*, *spouse* and *children*. However, a lot of noise remains in the data. The sources of the noise are: 
    - Fields containing semantic URLs that are redirected.
    - Semi-structured text which contains no information content (e.g. *stopwords*).
    - Semi-structured text containing valuable information that is not easy for a machine to understand. Either the text mixes concepts which should be in multiple fields or contains variant representations that essentially refer to the same "thing".
    
I will handle some of these issues now and some of them later prior to and when generating features for machine learning.

OK let's now generate the dictionaries and take a look at the first entry.

In [None]:
def create_physicist_data(json_line):
    """Create physicist data from json_line data.

    Args:
        json_line (dict): JSON dict.
    Returns:
        dict: Dictionary of physicist data.

    """

    # flatten the json
    flat_json = json_normalize(json_line)

    # find the resource, source and fullName
    resource = _find_resource(flat_json)
    source = _get_source(resource)
    full_name = get_filename_from_url(resource).replace('_', ' ')

    # construct the dictionary
    dict_ = {'resource': resource, 'source': source, 'fullName': full_name,
             **_process_json_keys(resource, flat_json),
             **_process_json_values(resource, flat_json),
             **_process_categories(flat_json)}

    # merge keys
    dict_ = _merge_keys(dict_)

    # clean fields
    dict_ = _clean_fields(dict_)

    return dict_


def _process_json_keys(resource, flat_json):
    dict_ = {}

    # loop over the keys
    for json_key in DBPEDIA_JSON_KEYS:
        flat_json_key = resource + '.' + json_key
        filename = get_filename_from_url(json_key)
        dict_key = filename[filename.rfind('#') + 1:]  # take fragment
        # sanitize for later merging in _merge_influences
        if dict_key == 'influenced':
            dict_key = 'influenced_'
        if flat_json_key not in flat_json:
            continue

        # loop and get the values
        for list_ in flat_json[flat_json_key].values:
            val_list = []
            for val in list_:
                if not _val_is_english(val):
                    continue

                if isinstance(val['value'], int):
                    value = val['value']
                else:  # str
                    value = val['value'].lstrip('* ')

                if value in DBPEDIA_IGNORE_URLS:
                    continue

                if not _value_has_information_content(value):
                    continue

                if isinstance(value, str) and value.startswith(
                    'http://wikidata.dbpedia.org/'):
                        continue
                val_list.append(value)

            if not val_list:
                continue
            elif len(val_list) == 1 and dict_key not in dict_:
                dict_[dict_key] = val_list[0]
            else:
                if not all(isinstance(v, str) for v in val_list):
                    # multiple values must all be of 'str' type
                    # (some child and spouse have int values)
                    val_list = [v for v in val_list if
                                not isinstance(v, int)]
                if dict_key not in dict_:
                    val_list.sort(key=locale.strxfrm)
                    dict_[dict_key] = '|'.join(val_list)
    return dict_


def _process_json_values(resource, flat_json):
    dict_ = {}

    # loop over the values
    for json_value in DBPEDIA_JSON_VALUES:
        # loop and get the keys
        key_list = []
        for json_key in flat_json:
            sep_position = json_key.rfind('.http')
            key, val = json_key[:sep_position], json_key[sep_position + 1:]
            # only consider keys other than the resource
            if json_value == val and not key == resource:
                dict_key = key
                key_list.append(dict_key)
                dict_val = get_filename_from_url(val).replace('_', ' ')
        if not key_list:
            continue
        elif len(key_list) == 1:
            dict_[dict_val] = key_list[0]
        else:
            key_list.sort(key=locale.strxfrm)
            dict_[dict_val] = '|'.join(key_list)
    return dict_


def _process_categories(flat_json):
    dict_ = {}

    val_list = []
    for list_ in flat_json.values:
        for item in list_:
            for val in item:
                value = val['value']
                if (not isinstance(value, str) or not value.startswith(
                        DBPEDIA_RESOURCE_URL + 'Category:')):
                    continue
                val_list.append(value)
    if val_list:
        val_list.sort(key=locale.strxfrm)
        dict_['categories'] = '|'.join(val_list)
    return dict_


def _merge_keys(dict_):
    merged_dict = dict_.copy()

    merged_dict = _merge_influences(merged_dict)
    merged_dict = _merge_right_key_into_left_key(
        merged_dict, 'almaMater', 'education')
    merged_dict = _merge_right_key_into_left_key(
        merged_dict, 'workplaces', 'workInstitutions')
    merged_dict = _merge_right_key_into_left_key(
        merged_dict, 'child', 'children')
    return merged_dict


def _clean_fields(dict_):
    cleaned_dict = dict_.copy()

    cleaned_dict = _clean_dates(cleaned_dict, 'birthDate')
    cleaned_dict = _clean_dates(cleaned_dict, 'deathDate')
    cleaned_dict = _clean_citizenship_nationality(cleaned_dict,
                                                  'citizenship')
    cleaned_dict = _clean_citizenship_nationality(cleaned_dict,
                                                  'nationality')
    cleaned_dict = _clean_spouse_child(cleaned_dict, 'spouse')
    cleaned_dict = _clean_spouse_child(cleaned_dict, 'child')
    cleaned_dict = _clean_almaMater(cleaned_dict)
    return cleaned_dict


def _find_resource(flat_json):
    # resource is the most common value
    vals = [val.split('.h')[0] for val in flat_json.columns.values]
    return collections.Counter(vals).most_common(1)[0][0]


def _get_source(resource):
    parsed_url = parse.urlparse(resource)
    filename = get_filename_from_url(resource)
    return ('{uri.scheme}://{uri.netloc}/'.format(uri=parsed_url) +
            'data/' + filename + '.json')



def _val_is_english(val):
    language = val.get('lang')
    if not language or language == 'en':
        return True
    return False


def _value_has_information_content(value):
    if not isinstance(value, str):
        return True
    if (not value or
            value == '*' or
            value.startswith('* \n') or
            value.endswith('\n*') or
            value.startswith('--') or
            '_family' in value or  # e.g. child contains this
            value in spacy.lang.en.STOP_WORDS):
        return False
    return True


def _merge_influences(dict_):
    merged_dict = dict_.copy()

    influencedBy = merged_dict.get('influencedBy')
    influenced = merged_dict.get('influenced')
    influenced_ = merged_dict.get('influenced_')

    if influenced_:
        if influencedBy:
            merged_dict['influencedBy'] += '|' + influenced_
        else:
            merged_dict['influencedBy'] = influenced_
        del merged_dict['influenced_']

    influencedBy = merged_dict.get('influencedBy')
    if influencedBy and influenced:
        merged_dict['influencedBy'], merged_dict['influenced'] = \
            merged_dict['influenced'], merged_dict['influencedBy']
    elif influencedBy and not influenced:
        merged_dict['influenced'] = influencedBy
        del merged_dict['influencedBy']
    elif not influencedBy and influenced:
        merged_dict['influencedBy'] = influenced
        del merged_dict['influenced']
    
    influencedBy = merged_dict.get('influencedBy')
    influenced = merged_dict.get('influenced')
    if influencedBy:
        merged_dict['influencedBy'] = '|'.join(sorted(
            merged_dict['influencedBy'].split('|'), key=locale.strxfrm))
    if influenced:
        merged_dict['influenced'] = '|'.join(sorted(
            merged_dict['influenced'].split('|'), key=locale.strxfrm))

    return merged_dict


def _merge_right_key_into_left_key(dict_, left_key, right_key):
    merged_dict = dict_.copy()

    left = merged_dict.get(left_key)
    right = merged_dict.get(right_key)

    if not left and right:
        merged_dict[left_key] = right
    merged_dict.pop(right_key, None)
    return merged_dict


def _clean_citizenship_nationality(dict_, key):
    cleaned_dict = dict_.copy()

    key_present = cleaned_dict.get(key)
    if (key_present and not
        key_present == 'United Kingdom of Great Britain and Ireland'):
        cleaned_dict[key] = (
            key_present
            .replace(' / ', '|')
            .replace(' , ', '|')
            .replace(', ', '|')
            .replace(' and then ', '|')
            .replace(' and ', '|')
            .replace('Citizenship in the ', '')
            .replace('Citizen of ', '')
            .replace(' citizen', '')
            .replace(' law', '')
            .replace('List of ', '')
            .replace('Nationality law of the ', '')
            .replace('Nationality of the ', '')
            .replace(' nationality law', '')
            .replace(' nationality', '')
            .replace('People of ', '')
            .replace(' people', '')
            .replace(' subject', '')
        )
        cleaned_dict[key] = '|'.join(sorted(cleaned_dict[key].split('|'),
                                            key=locale.strxfrm))
    return cleaned_dict


def _clean_almaMater(dict_):
    cleaned_dict = dict_.copy()

    key_present = cleaned_dict.get('almaMater')
    if key_present:
        cleaned_dict['almaMater'] = (
             key_present
            .replace('|Theoretical Physics', '')
            .replace('|Physics', '')
        )
    return cleaned_dict


def _clean_spouse_child(dict_, key):
    cleaned_dict = dict_.copy()

    key_present = cleaned_dict.get(key)
    if key_present and isinstance(key_present, str):
        cleaned_dict[key] = (
            key_present
            .replace(', and ', '|')
            .replace(' and ', '|')
            .replace(', ', '|')
            .replace(' ; 1 child', '')
            .replace('|divorced', '')
            .replace('divorced|', '')
            .replace('sons ', '')
        )
        cleaned_dict[key] = '|'.join(sorted(cleaned_dict[key].split('|'),
                                            key=locale.strxfrm))
    return cleaned_dict


def _clean_dates(dict_, key):
    cleaned_dict = dict_.copy()

    date = dict_.get(key)
    if date:
        # replace invalid months and days with first day and first month
        # the dates only need to be approximately correct as years lived
        # will be estimated from them
        if isinstance(date, int):  # year only
            first_date = str(date) + '-01-01'
        else:
            first_date = date.split('|')[0]
            first_date = first_date.replace('-0-0', '-01-01')
            if first_date.endswith('-0'):
                first_date = first_date[:-2] + '-1'
            if 'c. ' in date:  # deal with circa
                first_date = date.replace('c. ', '') + '-01-01'
            # ignore dates with a year below 1000 as datetime does not
            # handle these
            if (first_date.startswith('-') or
                int(first_date.split('-')[0]) < 1000):
                return cleaned_dict
        cleaned_dict[key] = str(
            datetime.strptime(first_date, '%Y-%m-%d').date())
    return cleaned_dict

In [None]:
data = [create_physicist_data(json_line) for json_line in json_lines]
data[0]

You can see that many of the values in the dictionary contain lists of semantic URLs which are each meant to refer to a unique "thing" such as a person or a place. If the URL is redirected, it is important to know where it is redirected to so that identical "things" in fact resolve to the same URL. It is very important to do this so that the machine learning models have clean data in order to differentiate signal from noise.

## Imputing Redirects in the Physicists Dictionaries

In order to impute redirects in the dictionaries I need to perform the following steps:

1. *Parse the dictionaries to obtain a list of all URLs.* I restrict this to the redirect keys in the list that you see below since these are the only fields of interest involving URLs that features will be extracted from.
2. *Submit HTTP requests to fetch the URLs and determine their redirect URLs.* A cache is kept mapping the URLs to the redirect URLs and as a consequence a HTTP request is only made to fetch a URL if the URL is not found in the cache. This greatly helps with performance.
3. *Replace the URLs in the dictionaries with the redirect URLs.* In fact I use just the filename since the paths are identical for every URL.

In [None]:
REDIRECT_KEYS = ['academicAdvisor', 'almaMater', 'award', 'birthPlace',
                 'categories', 'child', 'citizenship', 'deathPlace',
                 'doctoralAdvisor', 'doctoralStudent', 'field', 'influenced',
                 'influencedBy', 'knownFor', 'nationality', 'notableStudent',
                 'parent', 'residence', 'spouse', 'theorized', 'workplaces']

In [None]:
def contruct_urls(data):
    """Construct DBpedia resource URLs from physicists data.

    Args:
        physicists (dict): Dict containing physicists data.

    Returns:
        list of `str`: List of URLs.

    """
    
    urls_to_check = set()
    for key in REDIRECT_KEYS:
         for datum in data:
            text = datum.get(key)
            if not text or isinstance(text, int):
                continue
            texts = text.split('|')
            urls = (item for item in texts if item.startswith(
                DBPEDIA_RESOURCE_URL))
            for url in urls:
                quoted_url = quote_url(url)
                urls_to_check.add(quoted_url)
    
    urls_to_check = list(urls_to_check)
    return urls_to_check

In [None]:
urls_to_check = contruct_urls(data)
len(urls_to_check)

In [None]:
url_cache_path = '../data/raw/dbpedia-redirects.csv'
redirects = get_redirect_urls(
    urls_to_check, url_cache_path=url_cache_path, max_workers=20,
    timeout=30, progress_bar=urls_progress_bar(len(urls_to_check)))

In [None]:
len(urls_to_check) - len(redirects)

You can see that only one of the redirected URLs was not found. It is safe to ignore and proceed as this is not important for feature extraction.

Now I sort and persist the URL cache to disk in case any new URLs are found.

In [None]:
dbpedia_redirects = pd.DataFrame(
    sorted(list(zip(redirects.keys(), redirects.values())),
           key=lambda x: locale.strxfrm(x[0])), columns=['url', 'redirect_url'])
dbpedia_redirects.to_csv(url_cache_path, index=False)
dbpedia_redirects.head()

Now I replace the URLs in the dictionaries with the redirect URLs making sure to just use the filename since the paths are identical for every URL.

In [None]:
def impute_redirects(data, redirects):
    """Impute the filenames from redirected URLs in the physicists data.

    Args:
        data (list of `dict`): List of dicts containing physicists
            data.
        redirects (dict): The DBpedia redirected URLs. The key is
            original URL and the value is the redirected URL.

    Returns:
        list of `dict`: List of dicts containing physicists data.

        Identical to `data` except that it contains the filenames
        from the redirected URLs.
    """
    
    imputed_data = data.copy()
    
    for key in REDIRECT_KEYS:
        for datum in data:
            text = datum.get(key)
            if not text or isinstance(text, int):
                continue

            # split up fields with these symbols
            text = text.replace(' \n* ', '|')
            text = text.replace('\n* ', '|')
            text = text.replace(' \n', '|')
            texts = text.split('|')
            
            impute_texts = set()
            for text in texts:
                if text.startswith(DBPEDIA_RESOURCE_URL):
                    if text in redirects:
                        name = get_filename_from_url(
                            redirects[text]).replace('_', ' ')
                    else:
                        name = get_filename_from_url(
                            text).replace('_', ' ')
                else:
                    name = text
                if name.startswith('Category:'):
                    name = name.replace('Category:', '')
                impute_texts.add(name)
            impute_texts = list(impute_texts)
            impute_texts.sort(key=locale.strxfrm)
            datum[key] = '|'.join(impute_texts) 
   
    return imputed_data

In [None]:
imputed_data = impute_redirects(data, redirects)
imputed_data[0]

## Creating the Physicists Dataframe

Now I use the dictionaries of imputed data to create a dataframe. Let's confirm that it contains the expected number of physicists and take a look at it.

In [None]:
physicists = pd.DataFrame(imputed_data)
assert(len(physicists) == 1049)
with pd.option_context('display.max_rows', 1100):
    display(physicists)

## Persisting the Data

Now I have the dataframe, I'd like to persist it for later analysis. So I'll write out the contents to a csv file.

In [None]:
physicists.to_csv('../data/interim/notable_physicists.csv', index=False)

## Cleaning Up

A clean up step is needed:

- Remove the top-level module directory of the repository from the system path.

In [None]:
sys.path.remove(repo_dir)