<a href="https://colab.research.google.com/github/ankur19030/EntityNeighbours/blob/master/EntityNeighbours.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from qwikidata.json_dump import WikidataJsonDump
from qwikidata.linked_data_interface import get_entity_dict_from_api
from qwikidata.entity import WikidataItem, WikidataProperty
import lmdb
from contextlib import closing
import six
from uuid import uuid1
from multiprocessing.pool import Pool


from functools import partial
import zlib
import pickle
from tqdm import tqdm
import multiprocessing as mp

In [2]:
!pip install qwikidata

Collecting qwikidata
  Downloading https://files.pythonhosted.org/packages/a9/40/4273aaaacd7269f80d8ce475aff7115ab8fce31488ba08f3eaca776d110a/qwikidata-0.4.0-py3-none-any.whl
Collecting mypy-extensions
  Downloading https://files.pythonhosted.org/packages/5c/eb/975c7c080f3223a5cdaff09612f3a5221e4ba534f7039db34c35d95fa6a5/mypy_extensions-0.4.3-py2.py3-none-any.whl
Installing collected packages: mypy-extensions, qwikidata
Successfully installed mypy-extensions-0.4.3 qwikidata-0.4.0


In [4]:
!pip install lmdb



In [0]:
chunk_file = open("/content/drive/My Drive/chunk_0","rb")
chunk_dict= pickle.load(chunk_file)
chunk_file.close()


In [3]:
!pip install sparqlwrapper

Collecting sparqlwrapper
  Downloading https://files.pythonhosted.org/packages/00/9b/443fbe06996c080ee9c1f01b04e2f683b2b07e149905f33a2397ee3b80a2/SPARQLWrapper-1.8.5-py3-none-any.whl
Collecting rdflib>=4.0
[?25l  Downloading https://files.pythonhosted.org/packages/d0/6b/6454aa1db753c0f8bc265a5bd5c10b5721a4bb24160fb4faf758cf6be8a1/rdflib-5.0.0-py3-none-any.whl (231kB)
[K     |████████████████████████████████| 235kB 4.1MB/s 
Collecting isodate
[?25l  Downloading https://files.pythonhosted.org/packages/9b/9f/b36f7774ff5ea8e428fdcfc4bb332c39ee5b9362ddd3d40d9516a55221b2/isodate-0.6.0-py2.py3-none-any.whl (45kB)
[K     |████████████████████████████████| 51kB 7.8MB/s 
[?25hInstalling collected packages: isodate, rdflib, sparqlwrapper
Successfully installed isodate-0.6.0 rdflib-5.0.0 sparqlwrapper-1.8.5


In [0]:
import sys
from SPARQLWrapper import SPARQLWrapper, JSON

endpoint_url = "https://query.wikidata.org/sparql"
entities_failed_list = []



def get_results(endpoint_url, query):
    user_agent = "WDQS-example Python/%s.%s" % (sys.version_info[0], sys.version_info[1])
    # TODO adjust user agent; see https://w.wiki/CX6
    sparql = SPARQLWrapper(endpoint_url, agent=user_agent)
    sparql.setQuery(query)
    sparql.setReturnFormat(JSON)
    return sparql.query().convert()


def preprocess_multiple_results(results):


    result_dict = {}

    for result in results:

        if 'http://www.wikidata.org/prop/direct/' in result['p']['value'] and 'http://www.wikidata.org/entity/' in \
                result['v']['value']:
            entity = result['researcher']['value']
            if entity not in result_dict:
                result_dict[entity] = {}
            property = result['p']['value'].split("/")[-1]
            value = result['v']['value'].split("/")[-1]
            result_dict[entity].setdefault(property, []).append(value)


    all_results = []

    for key, value in result_dict.items():
      all_results.append((key.encode("utf-8"),zlib.compress(pickle.dumps(value))))

    return all_results

def process_multiple_entities(entities):

    all_entities = "\nwd:".join(entities)
    all_entities = "\nwd:" + all_entities

    query = """SELECT ?researcher ?researcherLabel ?p ?v ?vLabel WHERE {
          VALUES ?researcher {
            wd:""" + all_entities + """
          } 
          ?researcher ?p ?v.
          ?x wikibase:directClaim ?p.
          FILTER(CONTAINS(STR(?v), "http://www.wikidata.org/entity/"))
          SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
        }"""

    results = get_results(endpoint_url, query)

    return preprocess_multiple_results(results["results"]["bindings"])


def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]


def process_all_entities(out_file, pool_size, chunk_size,
          init_map_size=1000000000000, buffer_size=3000):
  
    all_results = []
    dump_reader = chunks(chunk_dict,1)
    total_lines = 850000

    with closing(lmdb.open(out_file, subdir=False, map_async=True, map_size=init_map_size,
                           max_dbs=3)) as env:
        map_size = [init_map_size]
        meta_db = env.open_db(b'__meta__')
        with env.begin(db=meta_db, write=True) as txn:
            txn.put(b'id', six.text_type(uuid1().hex).encode('utf-8'))

            txn.put(b'version', six.text_type("").encode('utf-8')
            )

        entity_db = env.open_db(b'__entity_neighbours__')

        def write_db(db, data):
            try:
                with env.begin(db=db, write=True) as txn:
                    txn.cursor().putmulti(data)

            except lmdb.MapFullError:
                map_size[0] *= 2
                env.set_mapsize(map_size[0])

                write_db(db, data)    

        with closing(Pool(pool_size)) as pool:
            entity_buf = [] # Writing into the buffer first, to have multiple transaction written to the database simultaneously.
            f = partial(process_multiple_entities)

            with tqdm(total=total_lines, mininterval=0.5) as bar:
                for entities in pool.imap_unordered(f, dump_reader, chunksize=chunk_size):

                    for entity in  entities:
                            entity_buf.append(entity)

                    if len(entity_buf) >= buffer_size:
                        write_db(entity_db, entity_buf)
                        entity_buf = []

                    bar.update(1)

                if entity_buf:
                    write_db(entity_db, entity_buf)

    result_file = open(out_file,"wb")
    pickle.dump(all_results,result_file)
    result_file.close()


In [20]:
process_all_entities("ENTITY_NEIGHBOURS_DB",mp.cpu_count(),1)

entities_failed_list_file = open("/content/drive/My Drive/entities_failed_list","wb")
pickle.dump(entities_failed_list,entities_failed_list_file)
entities_failed_list_file.close()


  0%|          | 3590/850000 [06:14<24:31:12,  9.59it/s]Process ForkPoolWorker-25:
Process ForkPoolWorker-26:

Traceback (most recent call last):


KeyboardInterrupt: ignored

Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "<ipython-input-19-6e4b3f98ec74>", line 56, in process_multiple_entities
    results = get_results(endpoint_url, query)
  File "<ipython-input-19-6e4b3f98ec74>", line 56, in process_multiple_entities
    results = get_results(endpoint_url, query)
  File "<ipython-input-19-6e4b3f98ec74>", line 15, in get