# Entity ElasticSearch Index

Here we build up an ElasticSearch cluster for linking to the FB5M knowledge base.

In [11]:
FB5M = '../../data/simple_qa/freebase-FB5M.txt'

# DOWNLOADED FROM: https://www.dropbox.com/s/yqbesl07hsw297w/FB5M.name.txt
MID_TO_NAME = '../../data/simple_qa/FB5M.name.txt'

In [23]:
from collections import defaultdict

import random
import os
import pprint

from tqdm import tqdm

object_to_fact = defaultdict(list)
for i, line in tqdm(enumerate(open(FB5M, 'r')), total=12010500):
    split = line.split('\t')
    assert len(split) == 3, 'Malformed row'
    object_ = split[0].replace('www.freebase.com/m/', '').strip()
    property_ = split[1].replace('www.freebase.com/', '').strip()
    subjects = [url.replace('www.freebase.com/m/', '').strip() for url in split[2].split()]
    object_to_fact[object_].append({'property': property_, 'subjects': subjects })

pp = pprint.PrettyPrinter(indent=2)
print()
print('Number of Objects:', len(object_to_fact))
print('Sample:', pp.pformat(random.sample(object_to_fact.items(), 5)))

100%|██████████| 12010500/12010500 [01:09<00:00, 173896.43it/s]



Number of Objects: 1972702
Sample: [ ( '01hmz0g',
    [ { 'property': 'music/album/genre',
        'subjects': ['07sbbz2', '06by7', '02yv6b', '0155w', '0xhtw']},
      {'property': 'music/album/album_content_type', 'subjects': ['0l14g2']},
      {'property': 'music/album/release_type', 'subjects': ['02lx2r']},
      {'property': 'music/album/artist', 'subjects': ['07mvp']}]),
  ( '04y82y6',
    [ { 'property': 'people/deceased_person/place_of_death',
        'subjects': ['01pr6n']},
      {'property': 'people/person/gender', 'subjects': ['05zppz']},
      {'property': 'soccer/football_player/position_s', 'subjects': ['0dgrmp']},
      {'property': 'people/person/nationality', 'subjects': ['02jx1']}]),
  ( '0gys2jp',
    [ { 'property': 'film/film/language',
        'subjects': ['03115z', '03_9r', '02h40lc', '02k30q']},
      { 'property': 'film/film/genre',
        'subjects': ['07s9rl0', '082gq', '03hn0']},
      {'property': 'film/film/directed_by', 'subjects': ['014hdb']},
      {'

In [5]:
mid_to_name = {}
for i, line in enumerate(open(MID_TO_NAME)):
    split = line.strip().split('\t')
    print(split)
    break
print('Got %d mappings' % len(mid_to_name))

Got 1954466 mappings


In [6]:
from elasticsearch_dsl.connections import connections

# Define a default Elasticsearch client
connections.create_connection(hosts=['localhost'])

# Display cluster health
print('Health: %s', connections.get_connection().cluster.health())

Health: %s {'active_primary_shards': 15, 'number_of_nodes': 1, 'number_of_in_flight_fetch': 0, 'number_of_data_nodes': 1, 'relocating_shards': 0, 'initializing_shards': 0, 'cluster_name': 'elasticsearch_petrochuk', 'active_shards': 15, 'timed_out': False, 'number_of_pending_tasks': 0, 'delayed_unassigned_shards': 0, 'unassigned_shards': 15, 'task_max_waiting_in_queue_millis': 0, 'status': 'yellow', 'active_shards_percent_as_number': 50.0}


In [7]:
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search

client = Elasticsearch()
ENTITY_INDEX = 'fb5m_entities'

# check if last entity exists, then do not refetch
num_entities = 0
if client.indices.exists(index=ENTITY_INDEX):
    search = Search(using=client, index=ENTITY_INDEX)
    query = search.query("match_all")
    num_entities = query.count()
    print('Found %d documents in index "%s"' % (query.count(), ENTITY_INDEX))
else:
    print('%s index does not exist' % ENTITY_INDEX)

Found 1113000 documents in index "fb5m_entities"


In [8]:
from elasticsearch_dsl import DocType
from elasticsearch_dsl import Index
from elasticsearch_dsl import Integer
from elasticsearch_dsl import Nested
from elasticsearch_dsl import Search
from elasticsearch_dsl import String

class FreebaseEntity(DocType):
    mid = String(index='not_analyzed')
    name = String()
    facts = Nested(required=True,
                   properties={'subjects': String(index='not_analyzed'),
                               'property': String(index='not_analyzed')})

    class Meta:
        index = ENTITY_INDEX

    def save(self, **kwargs):
        return super().save(**kwargs)

In [None]:
from datetime import datetime

import os

from elasticsearch.helpers import streaming_bulk

# save entities to elastic search
def get_entities():
    for mid in all_mids:
        if mid in mid_to_name and mid in object_to_fact:
            yield {
                'mid': mid,
                'name': mid_to_name[mid],
                'facts': object_to_fact[mid]
            }
    

def serialize_entity(mid, name, facts):
    """ serialize the instance into a dictionary so that it can be saved in elasticsearch. """
    return FreebaseEntity(
        mid=mid,
        facts=facts,
        name=name,
        meta={'id': mid}).to_dict(True)


def save_entities():
    """ efficiently save entities in bulk using `streaming_bulk` and `serialize_entity` """
    elasticsearch_connection = connections.get_connection()
    task_generator = (serialize_entity(**kwargs) for kwargs in get_entities())
    for i, (ok, item) in enumerate(streaming_bulk(elasticsearch_connection, task_generator,
                                                  chunk_size=100, request_timeout=120)):
        if i % 100 == 0:
            sys.stdout.write("\r%d - Done" % (i))
            sys.stdout.flush()
        if not ok:
            print(item)
    print()        


# save entities if not already saved
def create_index():
    input_ = input('WARNING - Delete %d entities? [YES/no] ' % num_entities)
    if input_ == 'YES':
        client.indices.delete(index=ENTITY_INDEX)
        # create the mappings in elasticsearch
        FreebaseEntity.init()
        save_entities()
    else:
        print('Not Deleting Index! Wohoo!')
    print('Done!')


create_index()

1086000 - Done