In [2]:
import json
from impresso_commons.path.path_s3 import IMPRESSO_STORAGEOPT
from impresso_commons.utils.s3 import fixed_s3fs_glob
from dask import bag as db
# from tabulate import tabulate
#import matplotlib
#from matplotlib import pyplot as plt

In [3]:
import os
from dask_k8 import DaskCluster
from dask import bag as db
from impresso_commons.path.path_s3 import read_s3_issues
from impresso_commons.text.rebuilder import rebuild_issues, compress, upload, cleanup

In [4]:
import dask

In [5]:
dask.__version__

'2.3.0'

## Set up dask kubernetes cluster

In [6]:
from impresso_commons.utils.kube import (
    make_scheduler_configuration,
    make_worker_configuration,
)

In [7]:
image_uri = "ic-registry.epfl.ch/dhlab/impresso_pycommons:v1"

In [8]:
memory = "30G"

In [9]:
cluster = DaskCluster(
            namespace="dhlab",
            cluster_id="impresso-entity-linking",
            scheduler_pod_spec=make_scheduler_configuration(),
            worker_pod_spec=make_worker_configuration(
                docker_image=image_uri, memory=memory
            ),
        )

In [10]:
cluster.create()

Scheduler: tcp://10.90.47.30:8884
Dashboard: http://10.90.47.30:7705


In [44]:
cluster.close()

In [11]:
cluster.scale(100, blocking=False)

In [12]:
dask_client = cluster.make_dask_client()

In [13]:
dask_client.get_versions(check=True)

{'scheduler': {'host': (('python', '3.7.3.final.0'),
   ('python-bits', 64),
   ('OS', 'Linux'),
   ('OS-release', '4.15.0-70-generic'),
   ('machine', 'x86_64'),
   ('processor', ''),
   ('byteorder', 'little'),
   ('LC_ALL', 'C.UTF-8'),
   ('LANG', 'C.UTF-8'),
   ('LOCALE', 'en_US.UTF-8')),
  'packages': {'required': (('dask', '2.3.0'),
    ('distributed', '2.3.2'),
    ('msgpack', '0.6.1'),
    ('cloudpickle', '1.2.1'),
    ('tornado', '6.0.3'),
    ('toolz', '0.10.0')),
   'optional': (('numpy', '1.16.2'),
    ('pandas', '0.24.2'),
    ('bokeh', '1.3.4'),
    ('lz4', None),
    ('dask_ml', None),
    ('blosc', '1.8.1'))}},
 'workers': {},
 'client': {'host': [('python', '3.6.7.final.0'),
   ('python-bits', 64),
   ('OS', 'Linux'),
   ('OS-release', '4.15.0-46-generic'),
   ('machine', 'x86_64'),
   ('processor', 'x86_64'),
   ('byteorder', 'little'),
   ('LC_ALL', 'None'),
   ('LANG', 'en_US.UTF-8'),
   ('LOCALE', 'en_US.UTF-8')],
  'packages': {'required': [('dask', '2.3.0'),
    

In [14]:
dask_client

0,1
Client  Scheduler: tcp://10.90.47.30:8884  Dashboard: http://10.90.47.30:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 60.00 GB


## Process named entity mentions

In [22]:
files = fixed_s3fs_glob(f"s3://processed-canonical-data/ne-mentions/public_release/*.bz2")

In [21]:
len(files)

397

In [72]:
mentions_bag = db.read_text(
    files,
    storage_options=IMPRESSO_STORAGEOPT
).map(json.loads).persist()

In [27]:
dask_client.cancel(mentions_bag)

In [74]:
mentions_bag.take(1)

({'id': 'LSE-1897-09-25-a-i0032',
  's3v': None,
  'ts': '2019-06-17T11:00:30Z',
  'sys_id': 'nerb-0001',
  'nes': [{'type': 'Person',
    'surface': 'RUE DU GRENIER',
    'name': 'RUE DU GRENIER',
    'firstname': 'RUE',
    'surname': 'DU GRENIER',
    'lOffset': 146,
    'rOffset': 160,
    'mrule': 'person1_basic_6_passed',
    'confidence': 'high',
    'id': 'LSE-1897-09-25-a-i0032:146:160:person:nerb-0001'}]},)

In [76]:
mentions_bag.count().compute()

30233580

### try join between dataframes

Merging between dataframes is more performant than between bags.

In [15]:
from impresso_commons.utils.s3 import alternative_read_text

In [22]:
dask_client

0,1
Client  Scheduler: tcp://10.90.47.30:8884  Dashboard: http://10.90.47.30:8787/status,Cluster  Workers: 100  Cores: 100  Memory: 3.00 TB


In [23]:
mentions_bucket = "s3://processed-canonical-data/mentions/v01/"
files = fixed_s3fs_glob(f"{mentions_bucket}*.bz2")

In [24]:
# files = fixed_s3fs_glob(f"s3://processed-canonical-data/mentions_public/*.bz2")

In [25]:
mentions_df = db.read_text(
    files,
    storage_options=IMPRESSO_STORAGEOPT
).map(json.loads).to_dataframe().set_index('id').persist()

In [26]:
mentions_df.index

Dask Index Structure:
npartitions=397
BDC-1839-01-20-a-i0001          object
BDC-1839-03-28-a-i0005             ...
                                 ...  
waeschfra-1884-05-24-a-i0006       ...
waeschfra-1884-07-05-a-i0017       ...
Name: id, dtype: object
Dask Name: sort_index, 794 tasks

In [17]:
rebuilt_bucket = "s3://canonical-rebuilt-release-evenized-light"
rebuilt_files = fixed_s3fs_glob(f"{rebuilt_bucket}/*.bz2")

In [56]:
from random import shuffle

In [57]:
shuffle(rebuilt_files)

In [58]:
rebuilt_files[:5]

['s3://canonical-rebuilt-release-evenized-light/1450.jsonl.bz2',
 's3://canonical-rebuilt-release-evenized-light/0079.jsonl.bz2',
 's3://canonical-rebuilt-release-evenized-light/0984.jsonl.bz2',
 's3://canonical-rebuilt-release-evenized-light/0199.jsonl.bz2',
 's3://canonical-rebuilt-release-evenized-light/0605.jsonl.bz2']

In [20]:
# s3://evenized-light-canonical-rebuilt-pubrelease
rebuilt_df = db.from_sequence(
    rebuilt_files,
    partition_size=2
).map(
    alternative_read_text,
    IMPRESSO_STORAGEOPT
).flatten().map(json.loads).to_dataframe().set_index('id').persist()

In [21]:
rebuilt_df.index

Dask Index Structure:
npartitions=1099
BDC-1839-01-20-a-i0001          object
BLB-1846-09-12-a-i0004             ...
                                 ...  
waeschfra-1884-03-30-a-i0015       ...
waeschfra-1884-07-05-a-i0021       ...
Name: id, dtype: object
Dask Name: sort_index, 2198 tasks

In [27]:
merged_dfs = mentions_df.merge(rebuilt_df, left_index=True, right_index=True)

In [22]:
merged_dfs.head()

KeyboardInterrupt: 

In [28]:
merged_bag = merged_dfs[['ft', 'nes', 'sys_id']].to_bag(index=True)

In [29]:
dask_client.cancel(merged_bag)

In [29]:
def prepare_input(ci_id, fulltext, mentions, system_id):
    start = 0
    prepared_input = ""
    
    for mention in mentions:
        
        if isinstance(fulltext, float):
            continue
        
        if isinstance(mention, float):
            continue
        
        prepared_input += fulltext[start:mention['lOffset']]
        prepared_input += f"[[{fulltext[mention['lOffset']:mention['rOffset']]}]]"
        start = mention['rOffset']
    return {
        "id": ci_id,
        "ft": fulltext,
        "input": prepared_input,
        "nes": mentions,
        "sys_id": system_id
    }
        

In [None]:
prepared_input = merged_bag.starmap(prepare_input).map(json.dumps).to_textfiles(
    f"s3://processed-canonical-data/entities/v01/aida-input/*.bz2",
    storage_options=IMPRESSO_STORAGEOPT
)

In [30]:
prepared_input = merged_bag.starmap(prepare_input).map(json.dumps).to_textfiles(
    f"/scratch/*.bz2"
)

In [30]:
len(prepared_input)

3198

In [32]:
cluster.close()

## try disambiguating

**NB**: `findExtractedMentions()` doesn't seem to work when the character preceding a mention is an apostrophe (e.g. l'[[Egypte]]).

In [118]:
from py4j.java_gateway import JavaGateway, GatewayParameters
gateway = JavaGateway()

aida_server = gateway.entry_point.getAida()

In [214]:
x = prepared_input.take(21, npartitions=100)[-1]

In [215]:
test_sentence = x['input']

In [216]:
x['input']

"RÉPUBLIQUE [[SUISSE]]. De St. Gall, le 28 juillet. L'on recommence de nouveau, d'après le système ifincamération trichienne-, à procéder contre toutes les propriétés helvétiques situées de l'autre côté du Rhin, & déjà les possessions des communes du Rheinthal, qui sont d'une valeur très-considérable, viennent d'être séquestrées. De Berne, le 29 juillet. Parmi le grand nombre de voyageurs de marque qui ont passé ici, l'on distingue le général Sebastiani, connu par son rapport sur l'[[Egypte]]. Il n'a fait qu'un très-petit séjour dans notre ville, qu'il a en majeure partie . passé chez son excellence le ministre français. Il y a eu en outre plusieurs suédois, danois & allemands, qui tous sont allés voir l'institut de Pestalozzi, à Bouchsée. — Dans la séance de la diète du 14, à l'occasion de la discussion sur les troubles du canton de [[Zurich]], la proposition de faire une loi organique en explication du 20. art. de fafte de médiation, a été mise à l'ordre du jour. Cet article est de l

In [217]:
mentions = gateway.entry_point.findExtractedMentions(test_sentence)
annotations = aida_server.disambiguate(test_sentence, mentions, "fullSettings")

In [218]:
for mention in annotations.keySet():
    wikipedia_entity_id = f"http://en.wikipedia.org/wiki/{annotations.get(mention).getName()}"
    print(mention.getMention(), wikipedia_entity_id)

Zurich http://en.wikipedia.org/wiki/Zürich
SUISSE http://en.wikipedia.org/wiki/Suisse,_Moselle
Fribourg http://en.wikipedia.org/wiki/Fribourg


In [166]:
prepared_input.count().compute()

4990

In [220]:
cluster.close()

## Disambiguation test

In [5]:
from dask.distributed import Client, progress
from py4j.java_gateway import JavaGateway
from time import strftime

In [6]:
dask_client = Client()

In [7]:
dask_client

0,1
Client  Scheduler: tcp://127.0.0.1:36657,Cluster  Workers: 4  Cores: 8  Memory: 33.65 GB


In [17]:
gateway = JavaGateway()

In [115]:
def aida_disambiguate_documents(documents, gateway_port):

    # do py4j business
    gateway = JavaGateway(
        gateway_parameters=GatewayParameters(port=gateway_port)
    )
    aida_server = gateway.entry_point.getAida()

    output = []

    for document in documents:

        prepared_input = document['input']
        mentions = gateway.entry_point.findExtractedMentions(prepared_input)
        annotations = aida_server.disambiguate(
            prepared_input,
            mentions,
            "fullSettings"
        )

        output_doc = {
            "id": document['id'],
            "sys_id": "aidalight-fullSettings",
            "cdt": strftime("%Y-%m-%d %H:%M:%S")
        }
        linked_entities = []

        annotations = [
            {
                "surface": ann.getMention(),
                "entity_id": annotations.get(ann).getName(),
                "normalized_name": annotations.get(ann).getNMEnormalizedName(),
                "char_offset": ann.getCharOffset(),
                "mention_entity_similarity": annotations.get(ann).getMentionEntitySimilarity()
            }
            for ann in annotations.keySet()
        ]

        for mention in document['nes']:

            # since some character offsets are broken we rely
            # on the surface to realign mention and disambiguations
            matching_entity = [
                annotation
                for annotation in annotations
                if annotation['surface'] == mention['surface']
            ]

            if len(matching_entity) == 0:
                continue
            else:
                matching_entity = matching_entity[0]

            # import ipdb; ipdb.set_trace()

            entity_id = matching_entity['entity_id']
            linked_entities.append(
                {
                    "mention_id": mention['id'],
                    "surface": mention['surface'],
                    "entity_id": entity_id,
                    "entity_link": f"http://en.wikipedia.org/wiki/{entity_id}",
                    "normalized_name": matching_entity['normalized_name'],
                    "mention_entity_similarity": matching_entity["mention_entity_similarity"]
                }
            )
        output_doc['ne_links'] = linked_entities
        output.append(output_doc)

    return output

In [66]:
!s3cmd ls s3://processed-canonical-data/entities/aida-output/

In [11]:
aida_input_files = fixed_s3fs_glob("s3://processed-canonical-data/entities/aida-input/000*.bz2")

In [12]:
aida_input_files

['s3://processed-canonical-data/entities/aida-input/0000.bz2',
 's3://processed-canonical-data/entities/aida-input/0001.bz2',
 's3://processed-canonical-data/entities/aida-input/0002.bz2',
 's3://processed-canonical-data/entities/aida-input/0003.bz2',
 's3://processed-canonical-data/entities/aida-input/0004.bz2',
 's3://processed-canonical-data/entities/aida-input/0005.bz2',
 's3://processed-canonical-data/entities/aida-input/0006.bz2',
 's3://processed-canonical-data/entities/aida-input/0007.bz2',
 's3://processed-canonical-data/entities/aida-input/0008.bz2',
 's3://processed-canonical-data/entities/aida-input/0009.bz2']

In [13]:
len(aida_input_files)

10

In [19]:
aida_input_bag = db.read_text(
    aida_input_files,
    storage_options=IMPRESSO_STORAGEOPT
).map(json.loads).persist()

In [29]:
test = aida_input_bag.take(10)

In [31]:
aida_disambiguate_documents(test, gateway)

[{'id': 'BDC-1839-01-20-a-i0001',
  'sys_id': 'aidalight-fullSettings',
  'cdt': '2019-06-20 18:12:12',
  'ne_links': [{'mention_id': 'BDC-1839-01-20-a-i0001:0:9:person:nerb-0001',
    'surface': 'AVIS Dans',
    'entity_id': '--NME--',
    'entity_link': 'http://en.wikipedia.org/wiki/--NME--',
    'normalized_name': '--NME--',
    'mention_entity_similarity': -1.0},
   {'mention_id': 'BDC-1839-01-20-a-i0001:1438:1444:location:nerb-0001',
    'surface': 'Suisse',
    'entity_id': 'Suisse,_Moselle',
    'entity_link': 'http://en.wikipedia.org/wiki/Suisse,_Moselle',
    'normalized_name': 'Suisse,_Moselle',
    'mention_entity_similarity': -1.0}]},
 {'id': 'BDC-1839-01-20-a-i0002',
  'sys_id': 'aidalight-fullSettings',
  'cdt': '2019-06-20 18:12:12',
  'ne_links': [{'mention_id': 'BDC-1839-01-20-a-i0002:2691:2698:location:nerb-0001',
    'surface': 'Maurice',
    'entity_id': 'Maurice_Cox',
    'entity_link': 'http://en.wikipedia.org/wiki/Maurice_Cox',
    'normalized_name': 'Maurice_Cox

In [32]:
local_input_dir = "/media/romanell/4T/matteo/impresso-entities/aida-input/"
local_output_dir = "/media/romanell/4T/matteo/impresso-entities/aida-output/"

In [62]:
range_start = 0
range_end = 1000

In [66]:
aida_input_files = [
    os.path.join(local_input_dir, file)
    for file in os.listdir(local_input_dir)
    if int(file.replace(".bz2", "")) >= range_start and int(file.replace(".bz2", "")) < range_end
]

In [67]:
aida_output_files = [
    file.replace('aida-input', 'aida-output')
    for file in aida_input_files
]

In [68]:
len(aida_input_files)

1000

In [69]:
aida_input_files[:10]

['/media/romanell/4T/matteo/impresso-entities/aida-input/0013.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-input/0128.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-input/0973.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-input/0058.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-input/0095.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-input/0601.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-input/0099.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-input/0433.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-input/0721.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-input/0756.bz2']

In [71]:
aida_output_files[:10]

['/media/romanell/4T/matteo/impresso-entities/aida-output/0013.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/0128.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/0973.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/0058.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/0095.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/0601.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/0099.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/0433.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/0721.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/0756.bz2']

In [72]:
aida_input_bag = db.read_text(
    aida_input_files,
    storage_options=IMPRESSO_STORAGEOPT
).map(json.loads)\
.map_partitions(aida_disambiguate_documents)\
.map(json.dumps)\
.to_textfiles(
    aida_output_files
    # storage_options=IMPRESSO_STORAGEOPT
)

KeyboardInterrupt: 

## process remaining partitions

In [84]:
aida_input_files = fixed_s3fs_glob("s3://processed-canonical-data/entities/aida-input/*.bz2")

In [85]:
len(aida_input_files)

4394

In [94]:
aida_output_files = fixed_s3fs_glob("s3://processed-canonical-data/entities/aida-output/*.bz2")

In [95]:
len(aida_output_files)

4173

In [None]:
# find out remaining file
# change destination path

In [99]:
input_filenames = [
    os.path.basename(file)
    for file in aida_input_files
]

In [100]:
output_filenames = [
    os.path.basename(file)
    for file in aida_output_files
]

In [101]:
len(input_filenames)

4394

In [102]:
len(output_filenames)

4173

In [103]:
remaining_filenames = set(input_filenames).difference(set(output_filenames))

In [104]:
len(remaining_filenames)

221

In [106]:
aida_input_remaining = [
    file
    for file in aida_input_files
    if os.path.basename(file) in remaining_filenames
]

In [None]:
aida_input_remaining = [
    file
    for file in aida_input_files
    if os.path.basename(file) in remaining_filenames
]

In [107]:
len(aida_input_remaining)

221

In [114]:
aida_output_remaining

['/media/romanell/4T/matteo/impresso-entities/aida-output/1008.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/1010.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/1021.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/1028.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/1034.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/1044.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/1062.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/1069.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/1089.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/1102.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/1104.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/1109.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/1114.bz2',
 '/media/romanell/4T/matteo/impresso-entities/aida-output/1116.bz2',
 '/media/romanell/4T/matteo/impres

In [111]:
aida_output_remaining = [
    os.path.join(local_output_dir, os.path.basename(file))
    for file in aida_input_remaining
]

In [113]:
assert len(aida_input_remaining) == len(aida_output_remaining)

In [None]:
result = db.read_text(
    aida_input_remaining,
    storage_options=IMPRESSO_STORAGEOPT
).map(json.loads)\
.map_partitions(aida_disambiguate_documents, 25333)\
.map(json.dumps)\
.to_textfiles(
    aida_output_remaining
)

## Disambiguation (s3 input)

In [17]:
# aida_input_path = f"s3://canonical-rebuilt/{np}/*.jsonl.bz2"

aida_input = "s3://processed-canonical-data/entities/aida-input/GDL/*.bz2"
aida_input_bag = db.read_text(
    aida_input,
    storage_options=IMPRESSO_STORAGEOPT
).map(json.loads)\
.map(aida_disambiguate)

In [19]:
%%time
aida_results = aida_input_bag\
    .map(json.dumps)\
    .to_textfiles(
        f"s3://processed-canonical-data/entities/aida-output/GDL/*.bz2",
        storage_options=IMPRESSO_STORAGEOPT
    )

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/romanell/.local/share/virtualenvs/impresso-ne-linking-3S6qaXXb/lib/python3.6/site-packages/py4j/java_gateway.py", line 1188, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/romanell/.local/share/virtualenvs/impresso-ne-linking-3S6qaXXb/lib/python3.6/site-packages/py4j/java_gateway.py", line 1014, in send_command
    response = connection.send_command(command)
  File "/home/romanell/.local/share/virtualenvs/impresso-ne-linking-3S6qaXXb/lib/python3.6/site-packages/py4j/java_gateway.py", line 1193, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving


CPU times: user 1h 23min 39s, sys: 9min 14s, total: 1h 32min 54s
Wall time: 19h 11min 15s
