In [1]:
from pathlib import Path
import requests
from time import sleep
import json
import csv
import os
import sys # Read CLI arguments
import urllib.parse

# ----------------
# Configuration settings
# ----------------

if len(sys.argv) == 2: # if exactly one argument passed (i.e. the configuration file path)
    file_path = sys.argv[1] # sys.argv[0] is the script name
else:
    file_path = 'act.csv'

commons_urls = 'act_CopyrightStatement-wikimedia_202108181034.csv'

sparql_sleep = 0.1 # number of seconds to wait between queries to SPARQL endpoint
home = str(Path.home()) # gets path to home directory; supposed to work for both Win and Mac
endpoint = 'https://query.wikidata.org/sparql'
accept_media_type = 'application/json'
commons_prefix = 'http://commons.wikimedia.org/wiki/Special:FilePath/'
commons_page_prefix = 'https://commons.wikimedia.org/wiki/File:'
extensions_list = ['.jpg', '.JPG', '.png', '.PNG', '.tif']

# ----------------
# Utility functions
# ----------------

# Best to send a user-agent header because some Wikimedia servers don't like unidentified clients
def generate_header_dictionary(accept_media_type):
    user_agent_header = 'VanderBot/1.6.1 (https://github.com/HeardLibrary/linked-data/tree/master/vanderbot; mailto:steve.baskauf@vanderbilt.edu)'
    requestHeaderDictionary = {
        'Accept' : accept_media_type,
        'Content-Type': 'application/sparql-query',
        'User-Agent': user_agent_header
    }
    return requestHeaderDictionary

requestheader = generate_header_dictionary(accept_media_type)

# read from a CSV file into a list of dictionaries
def read_dict(filename):
    with open(filename, 'r', newline='', encoding='utf-8') as file_object:
        dict_object = csv.DictReader(file_object)
        array = []
        for row in dict_object:
            array.append(row)
    return array

# write a list of dictionaries to a CSV file
def write_dicts_to_csv(table, filename, fieldnames):
    with open(filename, 'w', newline='', encoding='utf-8') as csv_file_object:
        writer = csv.DictWriter(csv_file_object, fieldnames=fieldnames)
        writer.writeheader()
        for row in table:
            writer.writerow(row)

# extracts the qNumber from a Wikidata IRI
def extract_qnumber(iri):
    # pattern is http://www.wikidata.org/entity/Q6386232
    pieces = iri.split('/')
    return pieces[4]

# extracts the UUID and qId from a statement IRI
def extract_statement_uuid(iri):
    # pattern is http://www.wikidata.org/entity/statement/Q7552806-8B88E0CA-BCC8-49D5-9AC2-F1755464F1A2
    pieces = iri.split('/')
    statement_id = pieces[5]
    pieces = statement_id.split('-')
    return pieces[1] + '-' + pieces[2] + '-' + pieces[3] + '-' + pieces[4] + '-' + pieces[5], pieces[0]

# function to use in sort
def sort_funct(row):
    return row['filename']

def commons_url_to_filename(url):
    # form of URL is: http://commons.wikimedia.org/wiki/Special:FilePath/Castle%20De%20Haar%20%281892-1913%29%20-%20360%C2%B0%20Panorama%20of%20Castle%20%26%20Castle%20Grounds.jpg
    string = url.split(commons_prefix)[1] # get local name file part of URL
    filename = urllib.parse.unquote(string) # reverse URL-encode the string
    return filename

def filename_to_commons_url(filename):
    encoded_filename = urllib.parse.quote(filename)
    url = commons_prefix + encoded_filename
    return url

def commons_page_url_to_filename(url):
    # form of URL is: https://commons.wikimedia.org/wiki/File:Castle_De_Haar_(1892-1913)_-_360%C2%B0_Panorama_of_Castle_%26_Castle_Grounds.jpg
    string = url.split(commons_page_prefix)[1] # get local name file part of URL
    string = string.replace('_', ' ')
    filename = urllib.parse.unquote(string) # reverse URL-encode the string
    return filename

def filename_to_commons_page_url(filename):
    filename = filename.replace(' ', '_')
    encoded_filename = urllib.parse.quote(filename)
    url = commons_page_prefix + encoded_filename
    url = url.replace('%28', '(').replace('%29', ')').replace('%2C', ',')
    return url


# original script (spring 2021?)

Was used to clean up IDs and query to find items already in Wikidata

In [23]:
data = read_dict(file_path)
input_list = []
iri_values = ''  # VALUES list for query
for record in data:
    record_dict = {'act_id': record['RecordNumber']}
    # some records have spaces with other junk after them
    strings = record['filename'].split(' ')
    # use only the first string in the list (item 0)
    record_dict['filename'] = strings[0]
    # to generate the IRIs, the underscores need to be replaced with escaped spaces (%20)
    filename = strings[0].replace('_','%20')
    url = 'http://commons.wikimedia.org/wiki/Special:FilePath/' + filename
    record_dict['url'] = url
    input_list.append(record_dict)
    iri_values += '<' + url + '>\n'

# remove trailing newline
iri_values = iri_values[:len(iri_values)-1]

In [None]:
print(json.dumps(output_list, indent=2))

In [None]:
print(iri_values)

In [None]:
query = '''
select distinct ?qid ?iri
where {'''
query += '''
      VALUES ?iri
    {
    ''' + iri_values + '''
    }
?qid wdt:P18 ?iri.
}'''
print(query)

In [None]:
# ----------------
# send request to Wikidata Query Service
# ----------------

print('querying SPARQL endpoint to acquire item metadata')
response = requests.post(endpoint, data=query.encode('utf-8'), headers=requestheader)
#print(response.text)
data = response.json()

# extract the values from the response JSON
results = data['results']['bindings']

print('done retrieving data')
print(json.dumps(results, indent=2))


In [None]:
# ----------------
# extract Q IDs from the results and match them with the ACT IDs
# ----------------

output_list = []
for record in input_list:
    found = False
    for result in results:
        if record['url'] == result['iri']['value']:
            found = True
            qid = extract_qnumber(result['qid']['value'])
            record['qid'] = qid
            break
    if not found:
        record['qid'] = ''
    output_list.append(record)
print(json.dumps(output_list, indent = 2))

In [None]:
output_list.sort(key = sort_funct) # sort by the filename field
fieldnames = ['act_id', 'qid', 'filename', 'url']
write_dicts_to_csv(output_list, 'output.csv', fieldnames)
print('done')

# Script for cleaned up IRIs (fall 2021)

Uses new functions designed for converting between various forms of Commons IRIs. See [the development script](https://github.com/HeardLibrary/vandycite/blob/master/commons_test/commons_identifier_conversion.ipynb) for details.

configureation section

In [34]:
import csv
import requests
import json

endpoint = 'https://query.wikidata.org/sparql'
accept_media_type = 'application/json'
commons_prefix = 'http://commons.wikimedia.org/wiki/Special:FilePath/'
commons_page_prefix = 'https://commons.wikimedia.org/wiki/File:'

def commons_url_to_filename(url):
    # form of URL is: http://commons.wikimedia.org/wiki/Special:FilePath/Castle%20De%20Haar%20%281892-1913%29%20-%20360%C2%B0%20Panorama%20of%20Castle%20%26%20Castle%20Grounds.jpg
    string = url.split(commons_prefix)[1] # get local name file part of URL
    filename = urllib.parse.unquote(string) # reverse URL-encode the string
    return filename

def filename_to_commons_url(filename):
    encoded_filename = urllib.parse.quote(filename)
    url = commons_prefix + encoded_filename
    return url

def commons_page_url_to_filename(url):
    # form of URL is: https://commons.wikimedia.org/wiki/File:Castle_De_Haar_(1892-1913)_-_360%C2%B0_Panorama_of_Castle_%26_Castle_Grounds.jpg
    string = url.split(commons_page_prefix)[1] # get local name file part of URL
    string = string.replace('_', ' ')
    filename = urllib.parse.unquote(string) # reverse URL-encode the string
    return filename

def filename_to_commons_page_url(filename):
    filename = filename.replace(' ', '_')
    encoded_filename = urllib.parse.quote(filename)
    url = commons_page_prefix + encoded_filename
    url = url.replace('%28', '(').replace('%29', ')').replace('%2C', ',')
    return url

# write a list of dictionaries to a CSV file
def write_dicts_to_csv(table, filename, fieldnames):
    with open(filename, 'w', newline='', encoding='utf-8') as csv_file_object:
        writer = csv.DictWriter(csv_file_object, fieldnames=fieldnames)
        writer.writeheader()
        for row in table:
            writer.writerow(row)

# read from a CSV file into a list of dictionaries
def read_dict(filename):
    with open(filename, 'r', newline='', encoding='utf-8') as file_object:
        dict_object = csv.DictReader(file_object)
        array = []
        for row in dict_object:
            array.append(row)
    return array

# Best to send a user-agent header because some Wikimedia servers don't like unidentified clients
def generate_header_dictionary(accept_media_type):
    user_agent_header = 'VanderBot/1.8 (https://github.com/HeardLibrary/linked-data/tree/master/vanderbot; mailto:steve.baskauf@vanderbilt.edu)'
    requestHeaderDictionary = {
        'Accept' : accept_media_type,
        'Content-Type': 'application/sparql-query',
        'User-Agent': user_agent_header
    }
    return requestHeaderDictionary

requestheader = generate_header_dictionary(accept_media_type)

# extracts the qNumber from a Wikidata IRI
def extract_qnumber(iri):
    # pattern is http://www.wikidata.org/entity/Q6386232
    pieces = iri.split('/')
    return pieces[4]

# function to use in sort
def sort_funct(row):
    return row['filename']



The following script was actually used to do the cleanup.

RUN ONE TIME ONLY!

In [14]:
commons_items = read_dict(commons_urls)
output_table = []
for item_number in range(len(commons_items)):
    metadata = {}
    item = commons_items[item_number]
    commons_url = item['CopyrightStatement'].strip()
    commons_url = commons_url.replace('http:', 'https:')
    commons_url = commons_url.replace('/Image', '/File')
    commons_url = commons_url.replace('https://commons.m.wikimedia.org/wiki/File:', commons_page_prefix)
    try:
        filename = commons_page_url_to_filename(commons_url)
        # Some of the strings have file owner names included after the file extension, or have fragment identifiers that need to be removed
        for extension in extensions_list:
            if extension in filename:
                # extract what's before the extension, then put the extension back on
                filename = filename.split(extension)[0] + extension
            # if the extension isn't on the list, nothing will happen
        transformed_url = filename_to_commons_page_url(filename)
        metadata['RecordNumber'] = item['RecordNumber']
        metadata['commons_page_url'] = commons_url
        metadata['filename'] = filename
        metadata['commons_uri'] = filename_to_commons_url(filename)
        output_table.append(metadata)
    except:
        print(item_number, 'error:', commons_url)
write_dicts_to_csv(output_table, 'clean_ids.csv', ['RecordNumber', 'filename', 'commons_page_url', 'commons_uri'])
print('done')

done


Start of script for check

In [50]:
# ----------------
# Load data and construct query using list of IRIs
# ----------------

file_path = 'clean_ids.csv'

data = read_dict(file_path)


In [51]:
# ----------------
# Construct query using list of IRIs
# ----------------

iri_values = ''  # VALUES list for query
#input_list = []

for record_number in range(len(data)):
    #record_dict = {'act_id': record['RecordNumber']}
    #url = record['commons_uri']
    iri_values += '<' + data[record_number]['commons_uri'] + '>\n'
    #input_list.append(record_dict)

# Remove trailing newline
iri_values = iri_values[:len(iri_values)-1]

# Create query to find items having the IRI as their image
query = '''
select distinct ?qid ?iri
where {'''
query += '''
      VALUES ?iri
    {
    ''' + iri_values + '''
    }
?qid wdt:P18 ?iri.
}'''
#print(query)
print('done')

done


In [52]:
# ----------------
# send request to Wikidata Query Service
# ----------------

print('querying SPARQL endpoint to acquire item metadata')
response = requests.post(endpoint, data=query.encode('utf-8'), headers=requestheader)
#print(response.text)
respons_json = response.json()

# extract the values from the response JSON
results = respons_json['results']['bindings']

print('done retrieving data')
#print(json.dumps(results, indent=2))


querying SPARQL endpoint to acquire item metadata
done retrieving data


In [53]:
# ----------------
# extract Q IDs from the results and match them with the ACT IDs
# ----------------

#output_list = []
for record_number in range(len(data)):
    found = False
    for result in results:
        if data[record_number]['commons_uri'] == result['iri']['value']:
            found = True
            qid = extract_qnumber(result['qid']['value'])
            data[record_number]['qid'] = qid
            break
    if not found:
        data[record_number]['qid'] = ''
    #output_list.append(record)
#print(json.dumps(data, indent = 2))
print('done')

done


In [54]:
data.sort(key = sort_funct) # sort by the filename field
fieldnames = ['RecordNumber', 'qid', 'filename', 'commons_uri', 'commons_page_url']
write_dicts_to_csv(data, 'output.csv', fieldnames)
print('done')

done
