# 2b. Posting to Elasticsearch from (large) Wikidata dump
Dump has been created using [wikibase-dump-filter](https://github.com/maxlath/wikibase-dump-filter/).

In [1]:
import sys
sys.path.append("../..")

from heritageconnector.config import config

import json
from itertools import islice
from tqdm.auto import tqdm
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk

dump_path = "/Volumes/Kalyan SSD/SMG/wikidata/humans_en.ndjson"

Lazily create pages and push to ES using the [bulk ingest helper](https://github.com/elastic/elasticsearch-py/blob/master/examples/bulk-ingest/bulk-ingest.py):

In [2]:
num_docs = None

def get_data_length():
    # this takes ages
    with open(dump_path) as f:
        return sum([1 for _ in f]) - 1
    
def process_doc(doc, properties=['P31'], lang='en'):
    newdoc = {
        "id": doc['id']
    }
    
    # add label(s)
    if lang in doc['labels']:
        newdoc['labels'] = doc['labels'][lang]['value']

    # add descriptions(s)
    if lang in doc['descriptions']:
        newdoc['descriptions'] = doc['descriptions'][lang]['value']
        
    # add aliases
    if (len(doc['aliases']) > 0) and (lang in doc['aliases']):
        newdoc['aliases'] = [i['value'] for i in doc['aliases'][lang]]
    else:
        newdoc['aliases'] = []
        
    # add claims (property values)
    newdoc['claims'] = {}
    
    for p in properties:
        if p in doc['claims']:
            newdoc['claims'][p] = [i['mainsnak']['datavalue']['value']['id'] for i in doc['claims'][p]]
        
    return newdoc

def generate_actions():
    with open(dump_path) as f:
        objects = (json.loads(line) for line in f)

        # optionally limit number that are loaded
        if num_docs is not None:
            objects = islice(objects, num_docs)
                
        for item in objects:
            # TODO: clean up with english-only dump
            doc = process_doc(item)
            
            yield doc #doc/item

In [3]:
next(generate_actions())

{'id': 'Q1868',
 'labels': 'Paul Otlet',
 'descriptions': 'Belgian author, librarian and colonial thinker',
 'aliases': ['Paul Marie Ghislain Otlet', 'Paul Marie Otlet'],
 'claims': {'P31': ['Q5']}}

In [4]:
es_index = 'wikidump_humans'
chunk_size = 1000 # default 500
queue_size = 8

es = Elasticsearch(
        [config.ELASTIC_SEARCH_CLUSTER],
        http_auth=(config.ELASTIC_SEARCH_USER, config.ELASTIC_SEARCH_PASSWORD),
    )
es.indices.create(index=es_index, ignore=400)

print("Indexing documents...")

successes = 0
errors = []
for ok, action in tqdm(parallel_bulk(client=es, index=es_index, actions=generate_actions(), chunk_size=chunk_size, queue_size=queue_size), total=num_docs):
    if not ok:
        print(action)
        errors.append(action)
    successes += ok
    
print("Indexed %d/%d documents" % (successes, num_docs))

Indexing documents...


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))




TypeError: %d format: a number is required, not NoneType