# Template code for interacting with Wikidata

This notebook is a collection of functions and code blocks to use when interacting with Wikidata or other instances of Wikibase.

In [None]:
# (c) 2022 Vanderbilt University, except for Sparqler class, which is (c) 2022 Steven J. Baskauf
# This program is released under a GNU General Public License v3.0 http://www.gnu.org/licenses/gpl-3.0
# Author: Steve Baskauf
# 2022-06-03

import requests
import json
import csv
import sys # Read CLI arguments
import datetime
from pathlib import Path
#import os
from time import sleep

# Use the following code for a stand-alone script if you want to pass in a value (e.g. file path) when running
# the script from the command line. If no arguments are passed, the "else" value will be used.

if len(sys.argv) == 2: # if exactly one argument passed (i.e. the configuration file path)
    file_path = sys.argv[1] # sys.argv[0] is the script name
else:
    file_path = 'file.csv'


# ----------------
# File IO
# ----------------

# Many functions operate on a list of dictionaries, where each item in the list represents a spreadsheet row
# and each column is identified by a dictionary item whose key is the column header in the spreadsheet.
# The first two functions read and write from files into this data structure.

# Read from a CSV file into a list of dictionaries
def read_dicts_from_csv(filename):
    with open(filename, 'r', newline='', encoding='utf-8') as file_object:
        dict_object = csv.DictReader(file_object)
        array = []
        for row in dict_object:
            array.append(row)
    return array

# Write a list of dictionaries to a CSV file
# The fieldnames object is a list of strings whose items are the keys in the row dictionaries that are chosen
# to be the columns in the output spreadsheet. The order in the list determines the order of the columns.
# To determine the field names from the first dict in the list, use this code:
'''
fieldnames = table[0].keys()
'''
# The column headers of the output will be in the order in which they occur in the dict (usually the order they were added)
def write_dicts_to_csv(table, filename, fieldnames):
    with open(filename, 'w', newline='', encoding='utf-8') as csv_file_object:
        writer = csv.DictWriter(csv_file_object, fieldnames=fieldnames)
        writer.writeheader()
        for row in table:
            writer.writerow(row)


# If configuration or other data are stored in a file as JSON, this function loads them into a Python data structure

# Load JSON file data from local drive into a Python data structure
def load_json_into_data_struct(path):
    with open(path, 'rt', encoding='utf-8') as file_object:
        file_text = file_object.read()
    structure = json.loads(file_text)
    # uncomment the following line to view the data
    # print(json.loads(structure, indent = 2))
    return(structure)

# Load JSON file data from GitHub into a Python data structure
# NOTE: requires the requests module to be installed!
# raw_file_url must be the URL of the raw file, not the web page about the file
def load_github_json_into_data_struct(raw_file_url):
    response_object = requests.get(raw_file_url)
    file_text = response_object.text
    structure = json.loads(file_text)
    # uncomment the following line to view the data
    # print(json.loads(structure, indent = 2))
    return(structure)

# This function will load some credential from a text file, either in the home directory or current working directory
# The value of the directory variable should be either 'home' or 'working'
# Keeping the credential in the home directory prevents accidentally uploading it with the notebook.
# The function returns a single string, so if there is more than one credential (e.g. key plus secret), additional
# parsing of the return value may be required. 
def load_credential(filename, directory):
    cred = ''
    # to change the script to look for the credential in the working directory, change the value of home to empty string
    if directory == 'home':
        home = str(Path.home()) #gets path to home directory; works for both Win and Mac
        credential_path = home + '/' + filename
    else:
        directory = 'working'
        credential_path = filename
    try:
        with open(credential_path, 'rt', encoding='utf-8') as file_object:
            cred = file_object.read()
    except:
        print(filename + ' file not found - is it in your ' + directory + ' directory?')
        exit()
    return(cred)

# ----------------
# Code for interacting with a Wikibase query interface (SPARQL endpoint). Typically, it's the Wikidata Query Service
# ----------------

class Sparqler:
    """Build SPARQL queries of various sorts

    Parameters
    -----------
    useragent : str
        Required if using the Wikidata Query Service, otherwise optional.
        Use the form: appname/v.v (URL; mailto:email@domain.com)
        See https://meta.wikimedia.org/wiki/User-Agent_policy
    endpoint: URL
        Defaults to Wikidata Query Service if not provided.
    method: str
        Possible values are "post" (default) or "get". Use "get" if read-only query endpoint.
        Must be "post" for update endpoint.
    sleep: float
        Number of seconds to wait between queries. Defaults to 0.1
        
    Required modules:
    -------------
    import requests
    from time import sleep
    """
    def __init__(self, **kwargs):
        # attributes for all methods
        try:
            self.http_method = kwargs['method']
        except:
            self.http_method = 'post' # default to POST
        try:
            self.endpoint = kwargs['endpoint']
        except:
            self.endpoint = 'https://query.wikidata.org/sparql' # default to Wikidata endpoint
        try:
            self.useragent = kwargs['useragent']
        except:
            if self.endpoint == 'https://query.wikidata.org/sparql':
                print('You must provide a value for the useragent argument when using the Wikidata Query Service.')
                print()
                raise KeyboardInterrupt # Use keyboard interrupt instead of sys.exit() because it works in Jupyter notebooks
            else:
                self.useragent = ''
        try:
            self.sleep = kwargs['sleep']
        except:
            self.sleep = 0.1 # default throtting of 0.1 seconds

        self.requestheader = {}
        if self.useragent:
            self.requestheader['User-Agent'] = self.useragent
        
        if self.http_method == 'post':
            self.requestheader['Content-Type'] = 'application/x-www-form-urlencoded'

    def query(self, query_string, **kwargs):
        """Sends a SPARQL query to the endpoint.
        
        Parameters
        ----------
        form : str
            The SPARQL query form.
            Possible values are: "select" (default), "ask", "construct", and "describe".
        mediatype: str
            The response media type (MIME type) of the query results.
            Some possible values for "select" and "ask" are: "application/sparql-results+json" (default) and "application/sparql-results+xml".
            Some possible values for "construct" and "describe" are: "text/turtle" (default) and "application/rdf+xml".
        verbose: bool
            Prints status when True. Defaults to False.
        default: list of str
            The graphs to be merged to form the default graph. List items must be URIs in string form.
            If omitted, no graphs will be specified and default graph composition will be controlled by FROM clauses
            in the query itself. 
            See https://www.w3.org/TR/sparql11-query/#namedGraphs and https://www.w3.org/TR/sparql11-protocol/#dataset
            for details.
        named: list of str
            Graphs that may be specified by IRI in a query. List items must be URIs in string form.
            If omitted, named graphs will be specified by FROM NAMED clauses in the query itself.
            
        Returns
        -------
        If the form is "select" and mediatype is "application/json", a list of dictionaries containing the data.
        If the form is "ask" and mediatype is "application/json", a boolean is returned.
        If the mediatype is "application/json" and an error occurs, None is returned.
        For other forms and mediatypes, the raw output is returned.

        Notes
        -----
        To get UTF-8 text in the SPARQL queries to work properly, send URL-encoded text rather than raw text.
        That is done automatically by the requests module for GET. I guess it also does it for POST when the
        data are sent as a dict with the urlencoded header. 
        See SPARQL 1.1 protocol notes at https://www.w3.org/TR/sparql11-protocol/#query-operation        
        """
        try:
            query_form = kwargs['form']
        except:
            query_form = 'select' # default to SELECT query form
        try:
            media_type = kwargs['mediatype']
        except:
            #if query_form == 'construct' or query_form == 'describe':
            if query_form == 'construct':
                media_type = 'text/turtle'
            else:
                media_type = 'application/sparql-results+json' # default for SELECT and ASK query forms
        self.requestheader['Accept'] = media_type
        try:
            verbose = kwargs['verbose']
        except:
            verbose = False # default to no printouts
            
        # Build the payload dictionary (query and graph data) to be sent to the endpoint
        payload = {'query' : query_string}
        try:
            payload['default-graph-uri'] = kwargs['default']
        except:
            pass
        
        try:
            payload['named-graph-uri'] = kwargs['named']
        except:
            pass

        if verbose:
            print('querying SPARQL endpoint')

        start_time = datetime.datetime.now()
        if self.http_method == 'post':
            response = requests.post(self.endpoint, data=payload, headers=self.requestheader)
        else:
            response = requests.get(self.endpoint, params=payload, headers=self.requestheader)
        elapsed_time = (datetime.datetime.now() - start_time).total_seconds()
        self.response = response.text
        sleep(self.sleep) # Throttle as a courtesy to avoid hitting the endpoint too fast.

        if verbose:
            print('done retrieving data in', int(elapsed_time), 's')

        if query_form == 'construct' or query_form == 'describe':
            return response.text
        else:
            if media_type != 'application/sparql-results+json':
                return response.text
            else:
                try:
                    data = response.json()
                except:
                    return None # Returns no value if an error. 

                if query_form == 'select':
                    # Extract the values from the response JSON
                    results = data['results']['bindings']
                else:
                    results = data['boolean'] # True or False result from ASK query 
                return results           

    def update(self, request_string, **kwargs):
        """Sends a SPARQL update to the endpoint.
        
        Parameters
        ----------
        mediatype : str
            The response media type (MIME type) from the endpoint after the update.
            Default is "application/json"; probably no need to use anything different.
        verbose: bool
            Prints status when True. Defaults to False.
        default: list of str
            The graphs to be merged to form the default graph. List items must be URIs in string form.
            If omitted, no graphs will be specified and default graph composition will be controlled by USING
            clauses in the query itself. 
            See https://www.w3.org/TR/sparql11-update/#deleteInsert
            and https://www.w3.org/TR/sparql11-protocol/#update-operation for details.
        named: list of str
            Graphs that may be specified by IRI in the graph pattern. List items must be URIs in string form.
            If omitted, named graphs will be specified by USING NAMED clauses in the query itself.
        """
        try:
            media_type = kwargs['mediatype']
        except:
            media_type = 'application/json' # default response type after update
        self.requestheader['Accept'] = media_type
        try:
            verbose = kwargs['verbose']
        except:
            verbose = False # default to no printouts
        
        # Build the payload dictionary (update request and graph data) to be sent to the endpoint
        payload = {'update' : request_string}
        try:
            payload['using-graph-uri'] = kwargs['default']
        except:
            pass
        
        try:
            payload['using-named-graph-uri'] = kwargs['named']
        except:
            pass

        if verbose:
            print('beginning update')
            
        start_time = datetime.datetime.now()
        response = requests.post(self.endpoint, data=payload, headers=self.requestheader)
        elapsed_time = (datetime.datetime.now() - start_time).total_seconds()
        self.response = response.text
        sleep(self.sleep) # Throttle as a courtesy to avoid hitting the endpoint too fast.

        if verbose:
            print('done updating data in', int(elapsed_time), 's')

        if media_type != 'application/json':
            return response.text
        else:
            try:
                data = response.json()
            except:
                return None # Returns no value if an error converting to JSON (e.g. plain text) 
            return data           

    def load(self, file_location, graph_uri, **kwargs):
        """Loads an RDF document into a specified graph.
        
        Parameters
        ----------
        s3 : str
            Name of an AWS S3 bucket containing the file. Omit load a generic URL.
        verbose: bool
            Prints status when True. Defaults to False.
        
        Notes
        -----
        The triplestore may or may not rely on receiving a correct Content-Type header with the file to
        determine the type of serialization. Blazegraph requires it, AWS Neptune does not and apparently
        interprets serialization based on the file extension.
        """
        try:
            s3 = kwargs['s3']
        except:
            s3 = ''
        try:
            verbose = kwargs['verbose']
        except:
            verbose = False # default to no printouts

        if s3:
            request_string = 'LOAD <https://' + s3 + '.s3.amazonaws.com/' + file_location + '> INTO GRAPH <' + graph_uri + '>'
        else:
            request_string = 'LOAD <' + file_location + '> INTO GRAPH <' + graph_uri + '>'
        
        if verbose:
            print('Loading file:', file_location, ' into graph: ', graph_uri)
        data = self.update(request_string, verbose=verbose)
        return data

    def drop(self, graph_uri, **kwargs):
        """Drop a specified graph.
        
        Parameters
        ----------
        verbose: bool
            Prints status when True. Defaults to False.
        """
        try:
            verbose = kwargs['verbose']
        except:
            verbose = False # default to no printouts

        request_string = 'DROP GRAPH <' + graph_uri + '>'

        if verbose:
            print('Deleting graph:', graph_uri)
        data = self.update(request_string, verbose=verbose)
        return data

# ----------------
# Utility code
# ----------------

# Generate the current UTC xsd:date
def generate_utc_date():
    whole_time_string_z = datetime.datetime.utcnow().isoformat() # form: 2019-12-05T15:35:04.959311
    date_z = whole_time_string_z.split('T')[0] # form 2019-12-05
    return date_z

# Extracts the local name part of an IRI, e.g. a qNumber from a Wikidata IRI
def extract_local_name(iri):
    # pattern is http://www.wikidata.org/entity/Q6386232
    pieces = iri.split('/')
    last_piece = len(pieces)
    return pieces[last_piece - 1]

# Extracts the UUID and qId from a statement IRI and returns them as a tuple
def extract_statement_uuid(iri):
    # pattern is http://www.wikidata.org/entity/statement/Q7552806-8B88E0CA-BCC8-49D5-9AC2-F1755464F1A2
    pieces = iri.split('/')
    statement_id = pieces[5]
    pieces = statement_id.split('-')
    # UUID is the first item of the tuple, Q ID is the second item
    return pieces[1] + '-' + pieces[2] + '-' + pieces[3] + '-' + pieces[4] + '-' + pieces[5], pieces[0]

# To sort a list of dictionaries by a particular dictionary key's values, define the following function
# then invoke the sort using the code that follows

# function to use in sort
def sort_funct(row):
    return row['filename'] # sort by the filename key

'''
output_list.sort(key = sort_funct) # sort by the filename field
'''


## Test querying

Query WDQS

In [None]:
labels = [
    {'string': '尼可罗·马基亚维利', 'language_code': 'zh'},
    {'string': '"I Hate You For Hitting My Mother," Minneapolis', 'language_code': 'en'},
    {'string': "A Picture from an Outline of Women's Manners - The Wedding Ceremony", 'language_code': 'en'}    
]

values = ''
for label in labels:
    values += "'''" + label['string'] + "'''@" + label['language_code'] + '\n'

query_string = '''select distinct ?item ?label where {
  VALUES ?value
  {
  ''' + values + '''}
?item rdfs:label|skos:altLabel ?value.
?item rdfs:label ?label.
FILTER(lang(?label)='en')
  }
'''
#print(query)

user_agent = 'VanderBot/1.9 (https://github.com/HeardLibrary/linked-data/tree/master/publications; mailto:steve.baskauf@vanderbilt.edu)'
wdqs = Sparqler(useragent=user_agent)
data = wdqs.query(query_string)
if wdqs.response[0] == '{':
    print('no error')
else:
    print('error')
print()
print(json.dumps(data, indent=2))
# print(wdqs.response)


Query public endpoint of Neptune

In [None]:
# http://nomenclature_2022-02-02
# http://AATOut_2Terms

query_string1 = '''select distinct ?graph where {
graph ?graph {?s ?o ?p.}
}'''

query_string = '''select distinct ?s ?o ?p where {
?s ?o ?p.
}
limit 5'''

from_graphs = ['http://bluffton']
endpoint_url = 'https://5j6diw4i0h.execute-api.us-east-1.amazonaws.com/sparql'
sve = Sparqler(endpoint=endpoint_url, method='get')
#data = sve.query(query_string)
data = sve.query(query_string, default=from_graphs)
if sve.response[0] == '{':
    print('no error')
else:
    print('error')
print()
print(json.dumps(data, indent=2))


## Test SPARQL Update using SSH tunnel/Neptune write endpoint

Insert data (one triple)

In [None]:
request_string = 'INSERT DATA { <https://test.com/s> <https://test.com/p> <https://test.com/o> . }'
endpoint_url = 'https://triplestore1.cluster-cml0hq81gymg.us-east-1.neptune.amazonaws.com:8182/sparql'
neptune = Sparqler(endpoint=endpoint_url, sleep=0)
data = neptune.update(request_string, verbose=True)
print(json.dumps(data, indent=2))


Query to see if the triple is there

In [None]:
query_string = '''select distinct ?o ?p where {
<https://test.com/s> ?o ?p.
}'''

endpoint_url = 'https://5j6diw4i0h.execute-api.us-east-1.amazonaws.com/sparql'
sve = Sparqler(endpoint=endpoint_url, method='get')
data = sve.query(query_string)
#data = sve.query(query_string, default=from_graphs)
if sve.response[0] == '{':
    print('no error')
else:
    print('error')
print()
print(json.dumps(data, indent=2))


Remove the triple

In [None]:
request_string = 'DELETE DATA { <https://test.com/s> <https://test.com/p> <https://test.com/o> . }'
data = neptune.update(request_string, verbose=True)
print(json.dumps(data, indent=2))


## Test file load from S3 bucket, then drop it

Load the graph

In [None]:
s3_bucket_name = 'triplestore-upload'
filename = 'bluffton.ttl'
file_url = 'https://triplestore-upload.s3.amazonaws.com/bluffton.ttl'
graph_name = 'http://bluffton'
endpoint_url = 'https://triplestore1.cluster-cml0hq81gymg.us-east-1.neptune.amazonaws.com:8182/sparql'
neptune = Sparqler(endpoint=endpoint_url, sleep=0)
data = neptune.load(filename, graph_name, s3=s3_bucket_name, verbose=True)
#data = neptune.load(file_url, graph_name, verbose=True)
print(json.dumps(data, indent=2))


In [None]:
graph_name = 'http://bluffton'
data = neptune.drop(graph_name, verbose=True)
print(json.dumps(data, indent=2))
