# Create Sample Wikidata

As of 22 June 2022
* Translate Stream Updater JSON to SPARQL UPDATE (INSERT/DELETE DATA) statements
** To be used in testing of the stress test infrastructure and process
** Saved as sparql-update.txt
* Capture the earliest (first) triple for each deleted subject/predicate pair in a new file, deleted-triples.nt
* Reverse the INSERT/DELETE sequence to completely restore a Wikidata RDF load
* Transform the CSV data used in testing the query analysis infrastructure to N-triples 
** 3 files were provided with entities from the human, scholarly articles, taxon, gene and film subgraphs
* Manually add the results from translating the 3 files above to the deleted-triples.nt file, to create wikidata-subset.nt

As of 23 June 2022
* Reprocessed Stream Updater JSON to create 1 second batches of INSERTS and DELETEs
** If triple is both inserted and removed in the same "batch", then it is ignored

In [1]:
import json
import csv
import queue

## Process the Steaming Updater JSON

In [2]:
# Take dump of Stream Updater data and convert to SPARQL INSERT/DELETE statements
with open("wikidata_update_stream_6k_edits_20220531.ndjson", "r") as update_data:
    with open('sparql-update.txt', 'w') as sparql_update:
        while True:
            line = update_data.readline()  # Read an entry
            if not line: 
                break
            entry = json.loads(line)
            operation = entry["operation"]
            if operation == "reconcile":   # Reconciles are not relevant at this time
                continue
            request = ''
            if "rdf_added_data" in entry.keys():
                request += "INSERT DATA { " + \
                    entry["rdf_added_data"]["data"].replace("\n", " ").replace("\t", "").strip() + "}\n"
            if "rdf_deleted_data" in entry.keys():
                request += "DELETE DATA { " + \
                    entry["rdf_deleted_data"]["data"].replace("\n", " ").replace("\t", "").strip() + "}\n"
            if "rdf_linked_data" in entry.keys():
                request += "INSERT DATA { " + \
                    entry["rdf_linked_data"]["data"].replace("\n", " ").replace("\t", "").strip() + "}\n"
            # Ignore rdf_unlinked_data
            sparql_update.write(request)
            # Note that operations with sequence_lengths different than 1 are not treated differently
            # All information in the JSON output is written as complete triples

In [3]:
# TODO: Capture all integer and decimal properties to correctly process the JSON
integer_predicates = ['<http://wikiba.se/ontology#identifiers>', '<http://wikiba.se/ontology#sitelinks>',
                      '<http://wikiba.se/ontology#statements>', '<http://schema.org/version>',
                      '<http://www.wikidata.org/prop/statement/P1128>', '<http://www.wikidata.org/prop/direct/P1128>',
                      '<http://www.wikidata.org/prop/statement/P2635>', '<http://www.wikidata.org/prop/direct/P2635>',
                      '<http://www.wikidata.org/prop/statement/P8687>', '<http://www.wikidata.org/prop/direct/P8687>']
decimal_predicates = ['<http://wikiba.se/ontology#geoLongitude>', '<http://wikiba.se/ontology#geoLatitude>',
                      '<http://www.wikidata.org/prop/statement/P2044>', '<http://www.wikidata.org/prop/direct/P2044>',
                      '<http://www.wikidata.org/prop/statement/P2046>', '<http://www.wikidata.org/prop/direct/P2046>',
                      '<http://www.wikidata.org/prop/statement/P2047>', '<http://www.wikidata.org/prop/direct/P2047>',
                      '<http://www.wikidata.org/prop/statement/P2048>', '<http://www.wikidata.org/prop/direct/P2048>',
                      '<http://www.wikidata.org/prop/statement/P2054>', '<http://www.wikidata.org/prop/direct/P2054>',
                      '<http://www.wikidata.org/prop/statement/P2067>', '<http://www.wikidata.org/prop/direct/P2067>',
                      '<http://www.wikidata.org/prop/statement/P2101>', '<http://www.wikidata.org/prop/direct/P2101>',
                      '<http://www.wikidata.org/prop/statement/P2102>', '<http://www.wikidata.org/prop/direct/P2102>',
                      '<http://www.wikidata.org/prop/statement/P2107>', '<http://www.wikidata.org/prop/direct/P2107>',
                      '<http://www.wikidata.org/prop/statement/P2119>', '<http://www.wikidata.org/prop/direct/P2119>',
                      '<http://www.wikidata.org/prop/statement/P2177>', '<http://www.wikidata.org/prop/direct/P2177>',
                      '<http://www.wikidata.org/prop/statement/P2405>', '<http://www.wikidata.org/prop/direct/P2405>',
                      '<http://www.wikidata.org/prop/statement/P2565>', '<http://www.wikidata.org/prop/direct/P2565>',
                      '<http://www.wikidata.org/prop/statement/P2854>', '<http://www.wikidata.org/prop/direct/P2854>',
                      '<http://www.wikidata.org/prop/statement/P3063>', '<http://www.wikidata.org/prop/direct/P3063>',
                      '<http://www.wikidata.org/prop/statement/P4250>', '<http://www.wikidata.org/prop/direct/P4250>',
                      '<http://www.wikidata.org/prop/statement/P2046>', '<http://www.wikidata.org/prop/direct/P2046>']


In [4]:
# Add first occurrence of each "deleted" subj/predicate triple to an output file
# May miss a few, unique triples if the object can be multi-valued
triples = dict()
with open('sparql-update.txt', 'r') as sparql_update:
    while True:
        line = sparql_update.readline()
        if not line:
            break
        if line.startswith("INSERT "):   # Only processing DELETED triples
            continue
        del_data = line.split("{")[1].split("}")[0]
        statements = del_data.split(" . ")
        for statement in statements:
            subj = statement.split('<')[1].split(">")[0]
            clauses = statement.split(' ;')
            first_clause = True
            for clause in clauses:
                if clause.endswith(" ."):   # Last clause does not match " . "
                    clause = clause[:-2]    # Just get rid of the period
                if '> a ' in clause or clause.startswith(' a '):
                    pred = 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type'
                    obj = clause.split(' a ')[1]
                else:
                    if first_clause:
                        # Format is "<subj> <pred> obj_as_value_or_IRI"
                        pred = clause.split('<')[2].split('>')[0]   
                        obj = clause[(clause.find('>', clause.find('>') + 1) + 2):]   # Find 2nd '>' and +2
                    else:
                        # Format is " <pred> obj_as_value_or_IRI"
                        pred = clause.split('<')[1].split('>')[0] 
                        obj = clause[(clause.index('>') + 2):]
                if first_clause:            # Clean up boolean
                    first_clause = False
                subj_pred = subj + pred
                if subj_pred not in triples.keys():
                    if f'<{pred}>' in integer_predicates:
                        triples[subj_pred] = f'<{subj}> <{pred}> "{obj}"^^<http://www.w3.org/2001/XMLSchema#integer> .'
                    elif f'<{pred}>' in decimal_predicates:
                        triples[subj_pred] = f'<{subj}> <{pred}> "{obj}"^^<http://www.w3.org/2001/XMLSchema#decimal> .'
                    else:
                        if "@" in obj and " , " in obj:   # Multi-valued string, Take first one
                            triples[subj_pred] = f"<{subj}> <{pred}> {obj.split(' , ')[0]} ."
                        else:
                            triples[subj_pred] = f"<{subj}> <{pred}> {obj} ."
with open("deleted_triples.nt", "w") as new:
    for key, value in triples.items():
        new.write(value + "\n")

In [5]:
# To account for re-adding any new triples or deleting any inserted ones
# When operating with a complete dump of the Wikidata RDF
# Process the Updater's INSERTs/DELETEs in reverse order, and also reverse the requests such that INSERTs become DELETEs and vice-versa
lifo = queue.LifoQueue()
with open('sparql-update.txt', 'r') as sparql_update:
    while True:
        line = sparql_update.readline()
        if not line:
            break
        if line.startswith("DELETE "):
            new_line = line.replace("DELETE DATA", "INSERT DATA")
        else:
            new_line = line.replace("INSERT DATA", "DELETE DATA")
        lifo.put(new_line)
with open("restore_wikidata.txt", "w") as new:
    while not lifo.empty():
        new.write(lifo.get())

# Process the Sample RDF Used for Query Analysis Code Testing

In [6]:
# Convert CSV from the test infrastructure for query analyses, into triples
for input_file in ('subgraphs', 'scholarly_articles', 'scholarly_articles_and_authors'):
    with open(f'{input_file}.csv', newline='') as csvfile:
        with open(f'{input_file}.nt', 'w') as wikidata:
            reader = csv.DictReader(csvfile)
            for row in reader:
                wikidata.write(f"{row['subject']} {row['predicate']} {row['object']} .\n")

# Batch the INSERTs/DELETEs to Send 1 Update/Sec

Also:
* Ignore an INSERT that is removed by a DELETE in the same 1 second interval
* Ignore a DELETE that has a corresponding INSERT in the same 1 second interval

In [7]:
def split_triples_remove_overlaps(rdf_data: str, is_add: bool, inserts: dict, deletes: dict):
    # Split request and create s-p-o triples
    statements = rdf_data.split(" . ")
    for statement in statements:
        subj = statement.split('<')[1].split(">")[0]
        clauses = statement.split(' ; ')
        first_clause = True
        for clause in clauses:
            if clause.endswith(" ."):   # Last clause does not match " . "
                clause = clause[:-2]    # Just get rid of the period
            if 'a ' in clause:
                pred = '-a'
                pred_str = 'a'
                obj = clause.split('a ')[1]
            else:
                if first_clause:
                    # Format is "<subj> <pred> obj_as_value_or_IRI"
                    pred = clause.split('<')[2].split('>')[0]   
                    obj = clause[(clause.find('>', clause.find('>') + 1) + 2):]   # Find 2nd '>' and +2
                else:
                    # Format is " <pred> obj_as_value_or_IRI"
                    pred = clause.split('<')[1].split('>')[0] 
                    obj = clause[(clause.index('>') + 2):]
                pred_str = f'<{pred}>'
            if first_clause:            # Clean up boolean
                first_clause = False
            subj_pred_obj = subj + pred + obj
            if is_add and subj_pred_obj in deletes.keys():
                # Check that inserted triple is not previously deleted (e.g., is deleted and quickly reinserted)
                # If found, remove it
                del deletes[subj_pred_obj]
                # Since no longer deleted, no need to re-insert it
                continue
            elif not is_add and subj_pred_obj in inserts.keys():
                # Check that deleted triple is not previously inserted (e.g., is inserted and quickly deleted)
                # If found, remove it
                del inserts[subj_pred_obj]
                # Since no longer inserted, no need to delete it
                continue
            # Add details to appropriate dictionary since triple is new    
            if is_add:
                inserts[subj_pred_obj] = f'<{subj}> {pred_str} {obj} .'
            else:
                deletes[subj_pred_obj] = f'<{subj}> {pred_str} {obj} .'
    

In [8]:
# Take dump of Stream Updater data and convert batches of 1 second insert/delete updates

event_time = ""   # Starting time for first Updater event
inserts = dict()
deletes = dict()

with open("wikidata_update_stream_6k_edits_20220531.ndjson", "r") as update_data:
    with open('sparql-update-batches.txt', 'w') as sparql_update:
        while True:
            line = update_data.readline()  # Read an entry
            if not line: 
                break
            entry = json.loads(line)
            operation = entry["operation"]
            if operation == "reconcile":   # Reconciles are not relevant at this time
                continue
            
            # Determine "batch"
            new_event_time = entry["event_time"]
            if new_event_time != event_time:
                # Save current "batch" if event_time is defined 
                if event_time:     # Not defined for the first pass 
                    insert_str = ''
                    delete_str = ''
                    if len(inserts.keys()) > 0:
                        for key, value in inserts.items():
                            insert_str += value + ' '
                    if len(deletes.keys()) > 0:
                        for key, value in deletes.items():
                            delete_str += value + ' '
                    if insert_str and delete_str:
                        sparql_update.write("INSERT DATA { " + insert_str[:-1] + "} "
                                            + "DELETE DATA { " + delete_str[:-1] + "}\n")
                    elif insert_str and not delete_str:
                        sparql_update.write("INSERT DATA { " + insert_str[:-1] + "}\n")
                    elif not insert_str and delete_str:
                        sparql_update.write("DELETE DATA { " + delete_str[:-1] + "}\n")                       
                # Start new "batch"
                event_time = new_event_time
                inserts = dict()
                deletes = dict()
                
            # Process the changes    
            if "rdf_added_data" in entry.keys():
                split_triples_remove_overlaps(
                    entry["rdf_added_data"]["data"].replace("\n", " ").replace("\t", "").strip(), 
                    True, inserts, deletes)
            if "rdf_deleted_data" in entry.keys():
                split_triples_remove_overlaps(
                    entry["rdf_deleted_data"]["data"].replace("\n", " ").replace("\t", "").strip(), 
                    False, inserts, deletes)
            if "rdf_linked_data" in entry.keys():
                split_triples_remove_overlaps(
                    entry["rdf_linked_data"]["data"].replace("\n", " ").replace("\t", "").strip(), 
                    True, inserts, deletes)
            # Ignore rdf_unlinked_data

# Process TSV files to add test triples

In [9]:
from os import listdir
from os.path import isfile, join

tsv_files = [f for f in listdir("./test_triples") if isfile(join("./test_triples", f))]

wikidata = open('new_triples.nt', 'w')  # Reset the file
wikidata.close()

with open('new_triples.nt', 'a') as wikidata:
    for input_file in tsv_files:
        with open(f'./test_triples/{input_file}') as infile:
            text = infile.read()
            lines = text.split('\n')
            for line in lines:
                if line.startswith('?subject'):
                    continue
                parts = line.split('\t')
                if len(parts)>2:
                    pred = parts[1]
                    if pred in integer_predicates:
                        wikidata.write(f'{parts[0]} {parts[1]} "{parts[2]}"^^<http://www.w3.org/2001/XMLSchema#integer> .\n')
                    elif pred in decimal_predicates:
                        wikidata.write(f'{parts[0]} {parts[1]} "{parts[2]}"^^<http://www.w3.org/2001/XMLSchema#decimal> .\n')
                    else:
                        wikidata.write(f"{parts[0]} {parts[1]} {parts[2]} .\n")