In [1]:
import numpy as np
import os
import bz2
import random
import json
import qwikidata
from qwikidata.json_dump import WikidataJsonDump
from qwikidata.linked_data_interface import get_entity_dict_from_api
from collections import Counter
from qwikidata.entity import WikidataItem, WikidataProperty, WikidataLexeme
import sqlite3
from itertools import islice
import time
from pprint import pprint
import traceback

In [2]:
wjd = WikidataJsonDump('latest-all.json.bz2')

In [4]:
db = sqlite3.connect('wikidata_claims_refs_parsed.db')
cursor = db.cursor()

cursor.execute('''
    CREATE TABLE IF NOT EXISTS claims(
        entity_id TEXT,
        claim_id TEXT,
        claim_rank TEXT,
        property_id TEXT,
        datatype TEXT,
        datavalue TEXT,
        PRIMARY KEY (
            claim_id
        )
)''')

cursor.execute('''
    CREATE TABLE IF NOT EXISTS claims_refs(
        claim_id TEXT,
        reference_id TEXT,
        PRIMARY KEY (
            claim_id,
            reference_id
        )
)''')

cursor.execute('''
    CREATE TABLE IF NOT EXISTS refs(
        reference_id TEXT,
        reference_property_id TEXT,
        reference_index TEXT,
        reference_datatype TEXT,
        reference_value TEXT,
        PRIMARY KEY (
            reference_id,
            reference_property_id,
            reference_index
        )
)''')
db.commit()

In [5]:
def consume(iterator, n=None):
    "Advance the iterator n-steps ahead. If n is None, consume entirely."
    # Use functions that consume iterators at C speed.
    if n is None:
        # feed the entire iterator into a zero-length deque
        collections.deque(iterator, maxlen=0)
    else:
        # advance to the empty slice starting at position n
        next(islice(iterator, n, n), None)

In [6]:
def parse_picks(picks, last_pick, f, savepoints, extraction_fun, get_sizes=False, verbose=True):
    sizes = []
    '''
    - picks is the list of picked positions in the dump to extract
    - last_pick: is the last picked position in the dump (from picks) to have been succesfully extracted to the DB.
    when it is -1 it means no picked position from the dump was extracted. This is returned at the end to mark where
    the extraction stopped.
    - f is the dump file itself
    - savepoints is the actual byte position in the file where the entity in the respective picked position was found. For
    example, savepoints[42] returns the byte position in the file where the entity in position picks[24] of the dump begins.
    - index_start is the index on the picks list where to start reading next. If last_pick is -1, then index_start
    should be 0, as the first entity should be the one to be extracted next. Otherwise, it should begin on the entity immediately after
    last_pick, which means picks.index(last_pick)+1.
    - extraction_fun is what to do with each entity
    ''' 
    if last_pick == -1:
        index_start = 0
        f.seek(0) #If last_pick is -1, no extraction took place and we seek the beginning of the file
        consume(f,1) #skipping first [
    else:
        '''
        If the extraction is continuing from a past run, then last_pick is the picked position last extracted successfully.
        Thus we need to jump to the position the file reader would be if it didn't finish halfway through.
        This would be on the position right after consuming the entity represented by last_pick.
        To find that, we do:
        '''
        index_start = picks.index(last_pick)+1 # As described at the start, we take the index of the pick following last_pick
        # to start the loop
        f.seek(savepoints[index_start-1]) # We get the position in the file where the last_pick entity starts 
        consume(f,1)    # and then we skip that entity
    try:
        for i, pick in enumerate(picks[index_start:]):
            '''
            The first line here is consume(f, pick - last_pick - 1). This is because:
            If the extraction is starting from zero, last_pick is -1 and we consume pick - (-1) -1, which is pick, so
            we jump 'pick' number of lines and land on the 'pick'st line, as pick is 0-indexed 
            Exemple: pick is 0, so we jump 0 lines and get the first element of the dump.
            Exemple: pick is 42, so we jump 42 lines and get the 43rd element of the dump; which, picks being 0-indexed,
            is what pick=42 means!
            
            If the extraction is continuing a loop that started from scratch, then pick is the picked position following
            last_pick in the picks list. In this case, we jump the lines between the end of last_pick and the beginning of
            pick. This would be pick- last_pick -1, and not just pick - last_pick, as we are skipping the number of entities 
            BETWEEN them.
            Exemple: last_pick is 42, and pick is 46. We already read entity 42, so the reader is at the very start of 
            43. We jump 43, 44, and 45, landing on the beginning of 46, so 3 entities. 46-42-1 = 3.
            
            If, however, the extraction is following another extraction after successfully extracting a last_pick entity,
            then as per the code above, pick will start by being the picked position following last_pick in the picks list,
            so we go back to the case above.
            '''
            consume(f, pick - last_pick - 1)
            savepoints[index_start+i] = f.tell() # We register the starting position of this entity in the savepoints
            linebytes = f.readline() # We consume the entity. 
            # THIS HAS PUT THE POINTER AT THE END OF THIS ENTITY/START OF THE NEXT
            if get_sizes:
                sizes.append(len(linebytes))
            line_str = linebytes.decode('utf-8') # We decode the bytes
            line_str = line_str.rstrip(',\n')
            entity = json.loads(line_str)
            #print('{} : {} : {} : {}'.format(index_start+i,pick,entity['id'], savepoints[index_start+i]))
            if verbose:
                print(str(pick/(RANDOM_SAMPLE*TOTAL_SIZE)*100)+'%'+20*' ',end='\r')
            extraction_fun(entity)        
            last_pick = pick
    except Exception as err:
        print(err, entity['id'])
        traceback.print_exc()
        raise err
    finally:
        #db.commit()
        if get_sizes:
            return last_pick, savepoints, sizes
        return last_pick, savepoints
        
def reset_picks(filename, picks):
    f = bz2.open(filename, mode="rb")
    last_pick = -1
    savepoints = [None] * len(picks)
    return last_pick, f, savepoints

In [3]:
# GENERATE THE PICKED POSITIONS REPRESENTING 20% OF THE DUMPS
TOTAL_SIZE = 93396717 #based on decompressing and calling both "wc -l" and "grep -c $"
RANDOM_SAMPLE = .2
if RANDOM_SAMPLE < 1:
    np.random.seed(42) #we use the estimated size as seed too because why not
    picks = np.random.choice(TOTAL_SIZE,size=int(RANDOM_SAMPLE*TOTAL_SIZE),replace=False) # We randomly pick 20% of total
    picks = sorted(picks)
else:
    picks = list(range(93396717 - 2))#two of the lines are "[" and "]"

In [7]:
len(picks)

18679343

In [8]:
# SANITY CHECK TO SEE IF THE EXTRACTION IS GETTING THE PICKS, CHECK THE FIRST Nth PICKS
first_n = []
n = 100
for i,e in enumerate(wjd):
    if i in picks[:n]:
        first_n.append(e)
    if len(first_n) == n:
        break

first_n_extraction = []
def put_entity_in_list(e):
    global first_n_extraction
    first_n_extraction.append(e)
    
last_pick, f, savepoints = reset_picks('latest-all.json.bz2', picks)
last_pick, savepoints = parse_picks(picks[:n], last_pick, f, savepoints, put_entity_in_list, verbose=False)

try:
    assert(first_n == first_n_extraction)
except:
    print("Something is wrong, Gaby!") 
else:
    print("All is good, Gaby!")

All is good, Gaby!


In [9]:
def extract_claim(entity_id,claim):
    if claim['mainsnak']['snaktype'] == 'value':
        value = str(claim['mainsnak']['datavalue'])
    else:
        value = claim['mainsnak']['snaktype']
    try:
        cursor.execute('''
        INSERT INTO claims(entity_id, claim_id, claim_rank, property_id, datatype, datavalue)
        VALUES($var,$var,$var,$var,$var,$var)'''.replace('$var','?'), (
            entity_id,claim['id'],claim['rank'],
            claim['mainsnak']['property'],claim['mainsnak']['datatype'],value
        ))
        #db.commit()
    except UnicodeEncodeError:
        print(entity_id,claim['id'],claim['rank'],
            claim['mainsnak']['property'],claim['mainsnak']['datatype'],value)
        raise
    except sqlite3.IntegrityError as err:
        #db.rollback()
        cursor.execute(
            '''SELECT *
            FROM claims 
            WHERE claim_id=$var
            '''.replace('$var','?'), (claim['id'],)
        )
        conflicted_value = cursor.fetchone()
        if conflicted_value == (entity_id,claim['id'],claim['rank'],
                claim['mainsnak']['property'],claim['mainsnak']['datatype'],value):
            pass
        else:
            print(err, claim['id'])
            traceback.print_exc()
            raise err

def extract_reference(ref):
    for snaks in ref['snaks'].values():
        for i, snak in enumerate(snaks):
            if snak['snaktype'] == 'value':
                value = str(snak['datavalue'])
            else:
                value = snak['snaktype']
            try:
                cursor.execute('''
                INSERT INTO refs(reference_id, reference_property_id, reference_index,
                reference_datatype, reference_value)
                VALUES($var,$var,$var,$var,$var)'''.replace('$var','?'), (
                    ref['hash'],snak['property'],str(i),snak['datatype'],value
                ))
                #db.commit()
            except sqlite3.IntegrityError as err:
                #db.rollback()
                cursor.execute(# WE DONT USE THE INDEX HERE, THEY TEND TO COME SHUFFLED FROM API AND SORTING TAKES TOO LONG
                    '''SELECT reference_id, reference_property_id, reference_datatype, reference_value
                    FROM refs 
                    WHERE reference_id = $var
                    AND reference_property_id = $var
                    '''.replace('$var','?'), (ref['hash'],snak['property'])
                )
                conflicted_values = cursor.fetchall()
                if  (ref['hash'],snak['property'],snak['datatype'],value) in conflicted_values:
                    pass
                else:
                    print(err, ref['hash'],snak['property'],i)
                    print('in the db:', conflicted_value)
                    print('trying to insert:',(ref['hash'],snak['property'],str(i),snak['datatype'],value))
                    traceback.print_exc()
                    raise err
            
def extract_claim_reference(claim,ref):
    claim['id'],ref['hash']
    try:
        cursor.execute('''
        INSERT INTO claims_refs(claim_id, reference_id)
        VALUES($var,$var)'''.replace('$var','?'), (
            claim['id'],ref['hash']
        ))
        #db.commit()
    except sqlite3.IntegrityError as err:
        #db.rollback()
        pass
    
def extract_entity(e):
    for outgoing_property_id in e['claims'].values():
        for claim in outgoing_property_id:
            extract_claim(e['id'],claim)
            if 'references' in claim:
                for ref in claim['references']: 
                    extract_claim_reference(claim, ref)
                    extract_reference(ref)

In [10]:
# ACTUALLY RUN THE EXTRACTION
last_pick, f, savepoints = reset_picks('latest-all.json.bz2', picks)
last_pick, savepoints = parse_picks(picks, last_pick, f, savepoints, extract_entity)
last_pick, savepoints

0.10195219174566916%                       

(19044,
 [512223,
  763997,
  3482113,
  4879078,
  5521709,
  6142097,
  7181228,
  8119298,
  11107678,
  12286296,
  13541171,
  13690260,
  14186437,
  15094527,
  15769148,
  15794210,
  16709806,
  17053195,
  17735451,
  17817487,
  18026120,
  18126047,
  18722284,
  19118234,
  19504290,
  20453769,
  20822946,
  20976836,
  21648224,
  22048380,
  22129082,
  22224394,
  22454234,
  23789147,
  24190161,
  24905828,
  24950366,
  25095450,
  25593698,
  26086050,
  26209710,
  26248959,
  26380550,
  26647971,
  26826514,
  27034445,
  27069037,
  27462293,
  27509946,
  27982167,
  28108447,
  28285458,
  28753469,
  28938645,
  29013166,
  29030896,
  29257700,
  29292875,
  29472730,
  29743534,
  30004321,
  30042345,
  30045964,
  30112696,
  30444297,
  30775413,
  31042758,
  31113654,
  31233365,
  31512538,
  31723772,
  31846911,
  31901217,
  32051060,
  32328068,
  32348149,
  32367109,
  32595237,
  32643152,
  33068826,
  33273894,
  33409893,
  33515592,
  3380