# Quality control script prior to phase 2b

2022-02-01

After finishing the creation of artwork items for works where ACT IDs were mis-assigned to non-artwork items, this is a quality control script to check for duplicates and make sure that the list of items to be added is actually correct.

In [None]:
# (c) 2022 Vanderbilt University. This program is released under a GNU General Public License v3.0 http://www.gnu.org/licenses/gpl-3.0
# Author: Steve Baskauf

# Import modules
import json
import csv
import math
import datetime
import urllib
from time import sleep
import requests
import re # regex
# Pandas for data frame management
import pandas as pd
# Fuzzy string matching
from fuzzywuzzy import fuzz # fuzzy logic matching
# Web scraping library
from bs4 import BeautifulSoup # web-scraping library, use PIP to install beautifulsoup4 (included in Anaconda)

accept_media_type = 'application/json'
endpoint = 'https://query.wikidata.org/sparql'
user_agent_header = 'act_disambiguation/0.1 (mailto:steve.baskauf@vanderbilt.edu)'
sparql_sleep = 0.1
request_sleep = 0.1 # wait 0.1 seconds between each HTTP request
default_language = 'en'

# Load data
act_data = pd.read_csv('../processed_lists/act_all_202109241353_repaired.csv', na_filter=False, dtype = str)
ids = pd.read_csv('clean_ids.csv', na_filter=False, dtype = str)
duplicates = pd.read_csv('../processed_lists/duplicates_of_existing_commons_ids.csv', na_filter=False, dtype = str)

country_mappings = pd.read_csv('country_mappings.csv', na_filter=False, dtype = str)
collections_mappings = pd.read_csv('collections.csv', na_filter=False, dtype = str)

# For testing purposes, just use the first few rows
#test_rows = 10
#ids = ids.head(test_rows).copy()

# --------------------
# Low-level functions
# --------------------

# read from a CSV file into a list of dictionaries
def read_dict(filename):
    with open(filename, 'r', newline='', encoding='utf-8') as file_object:
        dict_object = csv.DictReader(file_object)
        array = []
        for row in dict_object:
            array.append(row)
    return array

# write a list of dictionaries to a CSV file
def write_dicts_to_csv(table, filename, fieldnames):
    with open(filename, 'w', newline='', encoding='utf-8') as csv_file_object:
        writer = csv.DictWriter(csv_file_object, fieldnames=fieldnames)
        writer.writeheader()
        for row in table:
            writer.writerow(row)

# Extracts the local name part of an IRI, e.g. a qNumber from a Wikidata IRI
def extract_local_name(iri):
    # pattern is http://www.wikidata.org/entity/Q6386232
    pieces = iri.split('/')
    last_piece = len(pieces)
    return pieces[last_piece - 1]

# NOTE: there are still some issues that have not been worked out with quotation marks in query strings.
# Still working on this; see also the send_sparql_query() below.
def generate_sparql_header_dictionary(accept_media_type,user_agent_header):
    request_header_dictionary = {
        'Accept' : accept_media_type,
#        'Content-Type': 'application/sparql-query',
        'Content-Type': 'application/x-www-form-urlencoded',
        'User-Agent': user_agent_header
    }
    return request_header_dictionary

# The following function requires the request header generated above
sparql_request_header = generate_sparql_header_dictionary(accept_media_type, user_agent_header)


# Functions to interconvert various forms of Commons identifiers
commons_prefix = 'http://commons.wikimedia.org/wiki/Special:FilePath/'
commons_page_prefix = 'https://commons.wikimedia.org/wiki/File:'

def commons_url_to_filename(url):
    # form of URL is: http://commons.wikimedia.org/wiki/Special:FilePath/Castle%20De%20Haar%20%281892-1913%29%20-%20360%C2%B0%20Panorama%20of%20Castle%20%26%20Castle%20Grounds.jpg
    string = url.split(commons_prefix)[1] # get local name file part of URL
    filename = urllib.parse.unquote(string) # reverse URL-encode the string
    return filename

def filename_to_commons_url(filename):
    encoded_filename = urllib.parse.quote(filename)
    url = commons_prefix + encoded_filename
    return url

def commons_page_url_to_filename(url):
    # form of URL is: https://commons.wikimedia.org/wiki/File:Castle_De_Haar_(1892-1913)_-_360%C2%B0_Panorama_of_Castle_%26_Castle_Grounds.jpg
    string = url.split(commons_page_prefix)[1] # get local name file part of URL
    string = string.replace('_', ' ')
    filename = urllib.parse.unquote(string) # reverse URL-encode the string
    return filename

def filename_to_commons_page_url(filename):
    filename = filename.replace(' ', '_')
    encoded_filename = urllib.parse.quote(filename)
    url = commons_page_prefix + encoded_filename
    url = url.replace('%28', '(').replace('%29', ')').replace('%2C', ',')
    return url

# ----------------------------
# Intermediate-level functions
# ----------------------------

# Sends a query to the query service endpoint. 
# NOTE: request_header and endpoint are global variables defined earlier in the script
def send_sparql_query(query_string):
    # You can delete the two print statements if the queries are short. However, for large/long queries,
    # it's good to let the user know what's going on.
    #print('querying SPARQL endpoint to acquire item metadata')
    #response = requests.post(endpoint, data=query_string.encode('utf-8'), headers=sparql_request_header)
    response = requests.post(endpoint, data=dict(query=query_string), headers=sparql_request_header)
    #print(response.text) # uncomment to view the raw response, e.g. if you are getting an error
    data = response.json()

    # Extract the values from the response JSON
    results = data['results']['bindings']
    
    print('done retrieving data')
    # print(json.dumps(results, indent=2))
    
    sleep(sparql_sleep) # delay to avoid hitting the Query Service too fast
    return results



## Check for duplicates in the clean_ids.csv file


In [None]:
# Check for duplicate ACT IDs (none found)
duplicate_act_ids = ids[ids.duplicated(['RecordNumber'])]
duplicate_act_ids

In [None]:
# Check for duplicate filenames

# Find duplicated rows
duplicate_filename_rows = ids[ids.duplicated(['filename'])].copy()
# Get the filename column
duplicated_filenames_series = duplicate_filename_rows.filename
# deduplicate, sort, and convert to list
filenames = list(duplicated_filenames_series.drop_duplicates().sort_values())
filenames

In [None]:
# Check whether the discovered duplicates are in the hand-curated duplicates listing
# Any undiscovered duplicates should added to the duplicates listing and removed from 
# the cleaned_output.csv file.

for filename in filenames:
    print(filename)
    # look up the filename in the ids file to get all of the ACT IDs for those files
    act_ids = ids.loc[ids.filename == filename, 'RecordNumber'].values
    for act_id in act_ids:
        if act_id in duplicates.actId_of_original.values:
            print(act_id, 'is original')
        elif act_id in duplicates.RecordNumber.values:
            print(act_id, 'is copy')
        else:
            print('https://diglib.library.vanderbilt.edu//act-imagelink.pl?RC=' + act_id)
            description = act_data.loc[act_data.RecordNumber == act_id, 'Title'].values[0]
            print(description)
    print()

## Check that Commons URLs still dereference

The following cell is copied from the original `act.ipynb` script.

In [None]:
# On 2022-02-01 there were three works that didn't dereference. See https://github.com/HeardLibrary/vandycite/issues/53#issuecomment-1027858980

file_path = '../processed_lists/cleaned_output.csv'

output_list = []
data = read_dict(file_path)
for record_number in range(len(data)):
#for record_number in range(1530, 1535):
    if record_number%10 == 0: # print the row number every 10 requests
        print(record_number)
    response = requests.get(data[record_number]['commons_page_url'])
    #print(data[record_number]['commons_page_url'])
    #print(response.url)
    #print(response.status_code)
    #print()
    output_list.append({'status': response.status_code, 'requested_url': data[record_number]['commons_page_url'], 'response_url': response.url})
    sleep(request_sleep)

fieldnames = ['status', 'requested_url', 'response_url']
write_dicts_to_csv(output_list, 'dereference_test.csv', fieldnames)
print('done')

## Retrieve data about items with ACT ID statement

In [None]:
query_string = '''
select distinct ?qid ?act_id ?label ?description ?image_iri where {
?qid wdt:P9092 ?act_id.
?qid wdt:P18 ?image_iri.
OPTIONAL {
?qid rdfs:label ?label.
FILTER(lang(?label)="'''+ default_language + '''")
  }
OPTIONAL {
?qid schema:description ?description.
FILTER(lang(?description)="'''+ default_language + '''")
  }
}
'''
#print(query_string)
results = send_sparql_query(query_string)

output_list = []
for result in results:
    qid = extract_local_name(result['qid']['value'])
    act_id = result['act_id']['value']
    label = result['label']['value']
    try:
        description = result['description']['value']
    except:
        description = ''
    image_filename = commons_url_to_filename(result['image_iri']['value'])
    output_list.append({'qid': qid, 'act_id': act_id, 'label': label, 'description': description, 'image_filename': image_filename})

print('There are', len(output_list), 'works already in Wikidata.')

print(json.dumps(output_list[:5], indent=2))

fieldnames = ['qid', 'act_id', 'label', 'description', 'image_filename']
write_dicts_to_csv(output_list, '../processed_lists/works_already_in_wikidata.csv', fieldnames)
print('done')

## Remove works already in Wikidata from the cleaned output


In [None]:
already_in = pd.read_csv('../processed_lists/works_already_in_wikidata.csv', na_filter=False, dtype = str)

file_path = '../processed_lists/cleaned_output.csv'

output_list = []
data = read_dict(file_path)
for record_number in range(len(data)):
    if record_number%10 == 0: # print the row number every 10 requests
        #print(record_number)
        pass
    # Add record to output list if not one of the ACT IDs that's already in Wikidata
    if not data[record_number]['RecordNumber'] in already_in.act_id.values:
        output_list.append(data[record_number])

fieldnames = ['RecordNumber', 'actId', 'qid', 'filename', 'commons_uri', 'commons_page_url']
write_dicts_to_csv(output_list, '../processed_lists/add_to_wikidata.csv', fieldnames)
print('done')

## Remove works found during search of artwork labels

This is basically a hack of the previous cell

In [None]:
already_in = pd.read_csv('artwork_matches.csv', na_filter=False, dtype = str)

file_path = 'add_to_wikidata.csv'

output_list = []
data = read_dict(file_path)
for record_number in range(len(data)):
    # Add record to output list if not one of the ACT IDs that's already in Wikidata
    # Note: the add_to_wikidata.csv file in this directory had the column header "act_id"
    if not data[record_number]['act_id'] in already_in.act_id.values:
        output_list.append(data[record_number])

fieldnames = ['act_id', 'filename', 'commons_uri', 'commons_page_url']
write_dicts_to_csv(output_list, 'add_to_wikidata_new.csv', fieldnames)
print('done')

## Re-run the check for the little Wikidata flag on the works we think we need to write

Best to do this before the manual work part, since it will save us from processing those that just need to be linked.

Code cell hacked from https://github.com/HeardLibrary/linked-data/blob/77052c1dd0e761c58f8ba7e4395134f6255e1cfb/commonsbot/commons_data.ipynb

In [None]:
# ---------------------------
# Look for a Wikidata link on the image page
# ---------------------------

# This script finds the link for the tiny little Wikidata logo found on many pages that use the artwork template. 
# It's significant because this links to the abstract artwork even if the file represented on the Commons page
# isn't the one used as the value of the image (P18) property in Wikidata.

# For references on art in Wikidata, see https://www.wikidata.org/wiki/Wikidata:WikiProject_sum_of_all_paintings
# https://www.wikidata.org/wiki/Wikidata:WikiProject_Visual_arts/Item_structure

file_path = 'add_to_wikidata.csv'
file_data = read_dict(file_path)

output_list = []
#if True:
for record in file_data:
    print(record['filename'])

    # Retrieve the page HTML
    image_filename = record['filename']
    # image_filename = 'Christ sur la mer de Galilée (Delacroix) Walters Art Museum 37.186.jpg'
    # image_filename = 'Ingobertus_001.jpg' # should not work as of 2022-01-13
    # image_filename = 'Fra_Filippo_Lippi_-_Madonna_and_Child_with_two_Angels_-_Uffizi.jpg'
    # image_filename = 'Drawing of Abbie Sweetwine treating injured.jpg' # should produce nothing
    # image_filename = 'A_Walk_along_a_Path_at_Sunset_by_Hermann_Herzog.jpg' # should be screened out
    # image_filename = 'Andrea_Mantegna_015.jpg'
    page_url = 'https://commons.wikimedia.org/wiki/File:' + image_filename
    response = requests.get(page_url)
    
    # Create a soup object and find the file info table
    soup = BeautifulSoup(response.text, 'html.parser')
    # Restrict only to links found in the mediawiki image page content section
    tables = soup.findAll('table', class_ = re.compile('fileinfotpl-type-artwork'))
    if len(tables) > 0:
        # Find the rows in the image page content table
        rows = tables[0].findAll('tr')
        if len(rows) > 0:
            # The header row of the table actually contains a nested th element that isn't inside a td element
            # When the header row is missing, the first row contains two td elements and the second one has a th
            # So the th must be directly inside the tr, not inside a td inside the tr.
            for tag in rows[0].children:
                if tag.name == 'th':
                    # The link to the Wikidata item will be in an href in the th element.
                    # Sometimes the artist link is to a Wikidata item, so can't screen on subdomain.
                    anchors = tag.findAll('a', title = re.compile('wikidata:'))
                    if len(anchors) > 0:
                        link = anchors[0]['href']
                        qid = extract_local_name(link)
                        print(qid)
                        retrieved_data = {'act_id': record['act_id'], 'qid': qid, 'filename': record['filename']}
                        output_list.append(retrieved_data)
    sleep(request_sleep) # Don't hit the server too fast
    
fieldnames = ['act_id', 'qid', 'filename']
write_dicts_to_csv(output_list, 'wikidata_found.csv', fieldnames)

print('done')
