-
Notifications
You must be signed in to change notification settings - Fork 2
/
vanderbot.py
2326 lines (2121 loc) · 144 KB
/
vanderbot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# VanderBot, a script for writing CSV data to a Wikibase API. vanderbot.py
version = '1.9.5'
created = '2023-02-08'
# (c) 2023 Vanderbilt University. This program is released under a GNU General Public License v3.0 http://www.gnu.org/licenses/gpl-3.0
# Author: Steve Baskauf
# For more information, see https://github.com/HeardLibrary/linked-data/tree/master/vanderbot
# See http://baskauf.blogspot.com/2020/02/vanderbot-python-script-for-writing-to.html
# for a series of blog posts about VanderBot.
# See http://baskauf.blogspot.com/2019/06/putting-data-into-wikidata-using.html
# for a general explanation about writing to the Wikidata API
# See https://github.com/HeardLibrary/digital-scholarship/blob/master/code/wikibase/api/write-statements.py
# for details of how to write to a Wikibase API and comments on the authentication functions
# The most important reference for formatting the data JSON to be sent to the API is:
# https://www.mediawiki.org/wiki/Wikibase/DataModel/JSON
# By default, this script uses the file csv-metadata.json as a schema to map the columns of the input CSV file to the
# Wikidata data model, specifically in the form of RDF/Linked Data. The response data from the Wikidata API is used
# to update the input file as a record that the write operations have been successfully carried out.
# Important note: This script only handles the following value types: URI, plain string, times, globecoordiates,
# quantities, and monolingual strings. It does not handle some of the more esoteric types like novalues.
# The script handles aliases, but in a very cludgy way. Hopefully this will improve later.
# NOTE: it doesn't add aliases, it replaces. See notes in code!
# -----------------------------------------
# Version 1.1 change notes:
# - No changes
# -----------------------------------------
# Version 1.2 change notes (2020-07-18):
# - The data type for dates was changed from 'date' to 'dateTime' since all dates in Wikidata are converted into datetimes.
# This prevents generating an error if the schema is used to convert the CSV data directly to RDF.
# - The method of indicating that a value is a URL was changed from providing an anyURI datatype in the schema to using a
# a string datatype and a valueUrl where the entire string is substituted within the curly brackets. This situation is
# detected when the first character in the valuUrl is '{'. This change was necessary in order to make the csv2rdf schema
# correctly generate RDF that matches the RDF provided by the SPARQL endpoint. Previously, the generated RDF would have
# have a literal value datatyped as 'anyURI', while the SPARQL endpoint would have a non-literal value.
# - The leading + required for dateTime values by the Wikidata API has been removed from the data in the CSV table and added
# or removed as necessary by the software prior to interactions with the API.
# - The requirement that there be a value for every reference and qualifier property was removed.
# - Changed handling of the alias column so that the JSON schema will produce valid RDF consistent with the Wikibase model.
# -----------------------------------------
# Version 1.3 change notes (2020-08-05):
# - Change GET request to the SPARQL endpoint to POST to avoid size limitations of the query based on URL length
# - This requires adding the correct Content-Type header (application/sparql-query)
# - Correct the form of the IRI for statements (add Q ID before UUID in IRI). This required a slight modification in the
# part of the script that searches the mapping template for statements (look for -} instead of just } )
# -----------------------------------------
# Version 1.4 change notes (2020-08-17):
# - In csv-metadata.json, replace wdt: namespace properties with ps: properties,
# e.g. https://github.com/HeardLibrary/linked-data/blob/v1-4/vanderbot/csv-metadata.json#L187
# - Modify vb6_upload_wikidata.py (this script) to find those ps: properties instead of the wdt: ones.
# -----------------------------------------
# Version 1.5 change notes (2020-08-30):
# - Correct two bugs involving downloading existing descriptions and aliases
# - Add code to determine rows with dates based on new metadata mapping schema format
# - Add code to convert non-standard Wikibase date forms into standard format with precision numbers
# -----------------------------------------
# Version 1.6 change notes (2020-11-13):
# - Add support for globecoordinate, quantity, and monolingual text. Due to limitations in the W3C csv2rdf Recommendation, it isn't
# possible to have the language of monolingualtext strings in a table column. Unfortunately, it has to be hard-coded in the schema.
# This imposes limitations on including two monolingualtext string properties in the same table, since they would have the same property
# QID. That would make it impossible to differentiate among them in the JSON returned from the API. So they have to be in separate tables.
# - Fix some outstanding issues related to negative dates.
# -----------------------------------------
# Version 1.6.1 change notes (2020-11-25):
# - Bug fixes including a problem that prevented the language of a monolingual string to be assigned properly, ambiguity about property columns
# when one property ID was a subset of another (e.g. P17 and P170), and an error generated when a statement had a reference column, but the
# item in Wikidata did not have any value assigned.
# -----------------------------------------
# Version 1.6.2 change notes (2020-12-01):
# - Fixes a bug where an error was raised when a reference property did not have a value.
# -----------------------------------------
# Version 1.6.4 change notes (2021-01-27):
# - contains a bug fix that explicitly encodes all HTTP POST bodies as UTF-8. This caused problems if strings being sent as
# part of a SPARQL query contained non-Latin characters.
# -----------------------------------------
# Version 1.7 change notes (2021-03-01):
# - enable options different from the default values using command line options
# - enable logging of some errors to be displayed (and saved to the log file if used): label/description fault, date fault
# - prior to writing new items, check that there are no existing items with the same labels and descriptions
# - move mutable configuration variables to the top of the script
# -----------------------------------------
# Version 1.7.1 change notes (2021-04-06):
# - enable --version option.
# add more complete error trapping for dates
# -----------------------------------------
# Version 1.8 change notes (2021-08-17):
# - enable --apisleep option to limit API write rate for newbies
# - add error trapping for errors not allowed by API
# - add special handling for Commons media when P18 is used
# -----------------------------------------
# Version 1.8.1 change notes (2021-10-24):
# - fix bug where Commons image URLs are created when there is no value
# -----------------------------------------
# Version 1.8.2 change notes (2021-12-08):
# - fixed slow speed of precessing unchanged rows caused by writing the data file for every row in the date and image reformatting section.
# -----------------------------------------
# Version 1.9 change notes (2022-01-14):
# - added support for somevalue snaks (required to handle anonymous creators and authors). Fixed error when property had a somevalue snak.
# - trap API error caused when labels or descriptions have leading or trailing whitespace
# -----------------------------------------
# Version 1.9.1 change notes (2022-06-03):
# - fixed error that caused SPARQL queries to fail if they contained double quotes (solution: triple double quotes).
# - switched SPARQL queries to use POST with the application/x-www-form-urlencoded Content-Type. This fixed the problem where
# strings containing non-ASCII UTF-8 characters were not matching with existing identical labels and descriptions in Wikidata.
# ----------------------------------------
# Version 1.9.2 change notes (2022-09-03)
# - add option for "terse" mode to suppress displaying progress. (Reporting of errors at end and logging not affected.)
# ----------------------------------------
# Version 1.9.3 change notes (2022-09-04)
# - add option for "terse" mode to suppress displaying progress. (Reporting of errors at end and logging not affected.)
# ----------------------------------------
# Version 1.9.4 change notes (2023-02-06)
# - fix error in SPARQL query when labels or descriptions end in a double or single quote.
# - remove hard-coded references to the www.wikidata.org subdomain and make them configurable.
# - allow suppression of duplicate label/description checking for situations where the wikibase SPARQL endpoint is slow
# - enforce 1.25 s throttling for only wikidata.org and wikimedia.org domain names (shorter sleep times allowed for other wikibase instances)
# - minor bug fixes
# ----------------------------------------
# Version 1.9.5 change notes (2023-02-08)
# Skip checking the SPARQL endpoint for labels and descriptions if allow_label_description_changes is false.
# Add retry code for cases where the server doesn't give a response.
# Strip leading and trailing whitespace from values of properties in the CSV file (causes API errors).
# Warn if any values are changed but don't log an error.
# Print out elapsed time at end of script.
# ----------------------------------------
# Version 1.9.6 change notes (2023-02-11)
# Add special little hack to handle responses from the Structured Data on Commons API (https://commons.wikimedia.org).
# Their response JSON uses the key "statements" instead of "claims" for no apparent reason.
# ----------------------------------------
# Version 1.9.7 change notes (2023-11-06)
# Minor bug fix: Write error message to log when value doesn't appear in API response. This is a rare case that can happen when the Commons API
# changes a file name from the putitive name that was uploaded (e.g. removing double spaces).
import json
import requests
import csv
from pathlib import Path
import time
from time import sleep
import sys
import uuid
import re
from datetime import datetime
import urllib.parse
from typing import List, Dict, Tuple, Optional, Any
# Change the following lines to hard-code different defaults if not running from the command line.
# Set script-wide variable values. Assign default values, then override if passed in as command line arguments
log_path = '' # path to log file, default to none
log_object = sys.stdout # log output defaults to the console screen
allow_label_description_changes = False # labels and descriptions in the local CSV file that differ from existing Wikidata items are not automatically written
endpoint = 'https://query.wikidata.org/sparql' # default to the Wikidata Query Service endpoint
sparqlSleep = 0.1 # delay time between calls to SPARQL endpoint
json_metadata_description_file = 'csv-metadata.json' # "Generating RDF from Tabular Data on the Web" metadata description file (mapping schema)
credentials_path_string = 'home' # value is "home", "working", "gdrive", or a relative or absolute path with trailing "/"
credentials_filename = 'wikibase_credentials.txt' # name of the API credentials file
commons_prefix = 'http://commons.wikimedia.org/wiki/Special:FilePath/' # prepended to URL-encoded Commons media filenames
terse_string = 'false' # True suppresses display of progress output. False dispays information about the current line being processed
duplicate_check_string = 'true' # True allows checking for duplicate labels and descriptions when creating new items. False suppresses checking
calendar_model = 'Q1985727' # Default to Wikidata gregorian calendar
globe_value = 'Q2' # the Earth; globe to be used for globe-coordinate datatypes
# This is the format of the API credentials file. Username and password are for a bot that you've created
# (the example below is not real). Save file in the directory specified by the credentials_path_string.
'''
endpointUrl=https://test.wikidata.org
username=User@bot
password=465jli90dslhgoiuhsaoi9s0sj5ki3lo
'''
arg_vals = sys.argv[1:]
# see https://www.gnu.org/prep/standards/html_node/_002d_002dversion.html
if '--version' in arg_vals or '-V' in arg_vals: # provide version information according to GNU standards
# Remove version argument to avoid disrupting pairing of other arguments
# Not really necessary here, since the script terminates, but use in the future for other no-value arguments
if '--version' in arg_vals:
arg_vals.remove('--version')
if '-V' in arg_vals:
arg_vals.remove('-V')
print('VanderBot', version)
print('Copyright ©', created[:4], 'Vanderbilt University')
print('License GNU GPL version 3.0 <http://www.gnu.org/licenses/gpl-3.0>')
print('This is free software: you are free to change and redistribute it.')
print('There is NO WARRANTY, to the extent permitted by law.')
print('Author: Steve Baskauf')
print('Revision date:', created)
sys.exit()
if '--help' in arg_vals or '-H' in arg_vals: # provide help information according to GNU standards
# needs to be expanded to include brief info on invoking the program
print('For help, see the VanderBot landing page at https://github.com/HeardLibrary/linked-data/blob/master/vanderbot/README.md')
print('Report bugs to: steve.baskauf@vanderbilt.edu')
sys.exit()
# Code from https://realpython.com/python-command-line-arguments/#a-few-methods-for-parsing-python-command-line-arguments
opts = [opt for opt in arg_vals if opt.startswith('-')]
args = [arg for arg in arg_vals if not arg.startswith('-')]
if '--log' in opts: # set output to specified log file or path including file name
log_path = args[opts.index('--log')]
log_object = open(log_path, 'wt', encoding='utf-8') # direct output sent to log_object to log file instead of sys.stdout
if '-L' in opts: # set output to specified log file or path including file name
log_path = args[opts.index('-L')]
log_object = open(log_path, 'wt', encoding='utf-8') # direct output sent to log_object to log file instead of sys.stdout
if '--update' in opts: # allow labels and descriptions that differ locally from existing Wikidata items to be updated
if args[opts.index('--update')] == 'allow':
allow_label_description_changes = True
if '-U' in opts: # allow labels and descriptions that differ locally from existing Wikidata items to be updated
if args[opts.index('-U')] == 'allow':
allow_label_description_changes = True
if '--endpoint' in opts: # specifies a Wikibase SPARQL endpoint different from the Wikidata Query Service
endpoint = args[opts.index('--endpoint')]
if '-E' in opts: # specifies a Wikibase SPARQL endpoint different from the Wikidata Query Service
endpoint = args[opts.index('-E')]
if '--sleep' in opts: # specifies a delay value (in seconds) between requests to the Query Service that is different from the default
sparqlSleep = args[opts.index('--sleep')]
if '-S' in opts: # specifies a delay value (in seconds) between requests to the Query Service that is different from the default
sparqlSleep = args[opts.index('-S')]
# Specifies a different file path for the metadata description file that maps the columns in the CSV
# May be a different filename in the same directory as the script or a full or relative path.
if '--json' in opts:
json_metadata_description_file = args[opts.index('--json')]
if '-J' in opts:
json_metadata_description_file = args[opts.index('-J')]
if '--path' in opts: # specifies the location of the credentials file.
credentials_path_string = args[opts.index('--path')] # include trailing slash if relative or absolute path
if '-P' in opts: # specifies the location of the credentials file.
credentials_path_string = args[opts.index('-P')] # include trailing slash if relative or absolute path
if '--credentials' in opts: # specifies the name of the credentials file.
credentials_filename = args[opts.index('--credentials')]
if '-C' in opts: # specifies the name of the credentials file.
credentials_filename = args[opts.index('-C')]
if '--terse' in opts: # terse output boolean.
terse_string = args[opts.index('--terse')]
if '-T' in opts: # terse output boolean.
terse_string = args[opts.index('-T')]
if terse_string == 'true':
terse = True
else:
terse = False
if '--dupcheck' in opts: # specifies whether to check the Query Service for duplicate label/description combinations.
duplicate_check_string = args[opts.index('--dupcheck')]
if '-D' in opts: # specifies whether to check the Query Service for duplicate label/description combinations
duplicate_check_string = args[opts.index('-D')]
if duplicate_check_string == 'false':
duplicate_check = False
else:
duplicate_check = True
# NOTE: As of 2023-02-06 wikibase.cloud APIs will throw an error for any calendar model that is not in the Wikidata
# namespace, regardless of the namespace of the Wikibase instance.
# Thus, the option is not given to specify the full URL, just the Q ID.
# The API does not check whether the Q ID is a valid calendar model.
if '--calmodel' in opts: # specifies the calendar model to be used in time data types.
calendar_model = args[opts.index('--calmodel')]
if '-M' in opts: # specifies the calendar model to be used in time data types.
calendar_model = args[opts.index('-M')]
# NOTE: As with the calendar model, the API validator requires a value that is in the Wikidata namespace,
# regardless of the namespace of the Wikibase instance.
if '--globe' in opts: # specifies the globe to be used in globe-coordinate data types.
globe_value = args[opts.index('--globe')]
if '-G' in opts: # specifies the globe to be used in globe-coordinate data types.
globe_value = args[opts.index('-G')]
google_drive_root = '/content/drive/My Drive/'
if credentials_path_string == 'home': # credential file is in home directory
home = str(Path.home()) # gets path to home directory; works for both Win and Mac
credentials_path = home + '/' + credentials_filename
elif credentials_path_string == 'working': # credential file is in current working directory
credentials_path = credentials_filename
# Note: as of 2021 script will not run from Google Colab due to IP blocking. So this option isn't useful
elif credentials_path_string == 'gdrive': # credential file is in the root of the Google Drive
credentials_path = google_drive_root + credentials_filename
else: # credential file is in a directory whose path was specified by the credential_path_string
credentials_path = credentials_path_string + credentials_filename
# The limit for bots without a bot flag seems to be 50 writes per minute. That's 1.2 s between writes.
# To be safe and avoid getting blocked, leave the api_sleep value at its default: 1.25 s.
# The option to increase the delay is offered if the user is a "newbie", defined as having an
# account less than four days old and with fewer than 50 edits. The newbie limit is 8 edits per minute.
# Therefore, newbies should set the API sleep value to 8 to avoid getting blocked.
api_sleep = 1.25
if '--apisleep' in opts: # delay between API POSTs. Used by newbies to slow writes to within limits.
api_sleep = int(args[opts.index('--apisleep')]) # Number of seconds between API calls. Numeric only, do not include "s"
if '-A' in opts:
api_sleep = int(args[opts.index('-A')])
# See https://meta.wikimedia.org/wiki/User-Agent_policy
user_agent_header = 'VanderBot/' + version + ' (https://github.com/HeardLibrary/linked-data/tree/master/vanderbot; mailto:steve.baskauf@vanderbilt.edu)'
# Set the value of the maxlag parameter to back off when the server is lagged
# see https://www.mediawiki.org/wiki/Manual:Maxlag_parameter
# The recommended value is 5 seconds.
# To not use maxlang, set the value to 0
# To test the maxlag handler code, set maxlag to a very low number like .1
# If you don't know what you are doing, leave this value alone. In any case, it is rude to use a value greater than 5.
maxlag = 5
accept_media_type = 'application/json'
# The following code generates a request header dictionary suitable for sending to a SPARQL endpoint.
# If the query is SELECT, use the JSON media type above. For CONSTRUCT queryies use text/turtle to get RDF/Turtle
# Best to send a user-agent header because some Wikimedia servers don't like unidentified clients
# NOTE: This header has the wrong Content-Type for SPARQL UPDATE, which needs type application/sparql-update
def generate_header_dictionary(accept_media_type,user_agent_header):
request_header_dictionary = {
'Accept' : accept_media_type,
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': user_agent_header
}
return request_header_dictionary
# Generate the request header using the function above
request_header = generate_header_dictionary(accept_media_type,user_agent_header)
# -----------------------------------------------------------------
# function definitions
def retrieveCredentials(path):
with open(path, 'rt') as fileObject:
lineList = fileObject.read().split('\n')
domain_name = lineList[0].split('=')[1]
username = lineList[1].split('=')[1]
password = lineList[2].split('=')[1]
#userAgent = lineList[3].split('=')[1]
return domain_name, username, password
def getLoginToken(apiUrl):
parameters = {
'action':'query',
'meta':'tokens',
'type':'login',
'format':'json'
}
r = session.get(url=apiUrl, params=parameters)
data = r.json()
return data['query']['tokens']['logintoken']
def logIn(apiUrl, token, username, password):
parameters = {
'action':'login',
'lgname':username,
'lgpassword':password,
'lgtoken':token,
'format':'json'
}
r = session.post(apiUrl, data=parameters)
data = r.json()
return data
def getCsrfToken(apiUrl):
parameters = {
"action": "query",
"meta": "tokens",
"format": "json"
}
r = session.get(url=apiUrl, params=parameters)
data = r.json()
return data["query"]["tokens"]["csrftoken"]
# read a CSV into a list of dictionaries
def readDict(filename):
fileObject = open(filename, 'r', newline='', encoding='utf-8')
dictObject = csv.DictReader(fileObject)
array = []
for row in dictObject:
array.append(row)
fileObject.close()
return array
# write the data to a file
def writeToFile(tableFileName, fieldnames, tableData):
with open(tableFileName, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for writeRowNumber in range(0, len(tableData)):
writer.writerow(tableData[writeRowNumber])
# function to get local name from an IRI
def extractFromIri(iri, numberPieces):
# with pattern like http://www.wikidata.org/entity/Q6386232 there are 5 pieces with qId as number 4
pieces = iri.split('/')
return pieces[numberPieces]
def remove_leading_trailing_whitespace(string: str) -> Tuple[str, bool]:
"""Removes leading and trailing whitespace from a string and prints a warning if the string is changed."""
error = False
if string != string.strip():
error = True
print('WARNING: leading or trailing whitespace removed from "' + string + '"')
return string.strip(), error
def safe_quotes(label: str) -> str:
"""Encloses a string in appropriate triple quotes to prevent malformed SPARQL query.
Note
----
If triple quotes (double or single) are used to enclose a string in a SPARQL query, the string can
include any combination of double or single quotes anywhere in the string except for the last position.
The last position cannot be the same type of quote that is used (in triplicate) to enclose the string.
When that happens, the first three of the quote are considered to close the string. The remaining quote
is then an unexpected character, which causes an error in the SPARQL processor.
"""
if label[-1]=='"':
enclosed = "'''" + label + "'''"
else:
enclosed = '"""' + label + '"""'
return enclosed
# convert a Commons URL to an unencoded filename string
def commons_url_to_filename(url):
# form of URL is: http://commons.wikimedia.org/wiki/Special:FilePath/Castle%20De%20Haar%20%281892-1913%29%20-%20360%C2%B0%20Panorama%20of%20Castle%20%26%20Castle%20Grounds.jpg
string = url.split(commons_prefix)[1] # get local name file part of URL
filename = urllib.parse.unquote(string) # reverse URL-encode the string
return filename
# convert an unencoded filename string to a URL-encoded Commons URL; inverse of previous function
def filename_to_commons_url(filename):
# form of filename is 'Castle De Haar (1892-1913) - 360° Panorama of Castle & Castle Grounds.jpg'
encoded_filename = urllib.parse.quote(filename)
url = commons_prefix + encoded_filename
return url
# SPARQL ASK query used to determine whether labels and descriptions already exist in Wikidata
def ask_query(graph_pattern):
query_string = '''ask where {
'''+ graph_pattern + '''
}'''
#print(query_string)
#response = requests.post(endpoint, data=query_string.encode('utf-8'), headers=request_header)
response = requests.post(endpoint, data=dict(query=query_string), headers=request_header)
#print(response.text) # uncomment to view the raw response, e.g. if you are getting an error
data = response.json() # NOTE: the conversion from JSON to Python data structure turns JSON true into Python True
#print(json.dumps(data, indent=2))
results = data['boolean']
#print(results)
return results
# The following two functions use the ASK function above for cases where only a label or only a description is present
def check_for_only_label(label_string, language):
# Label must exist
graph_pattern = ' ?entity rdfs:label ' + safe_quotes(label_string) + '@' + language + '.'
#print('Checking ' + language + ' label: "' + label_string)
#print(graph_pattern)
label_exists = ask_query(graph_pattern)
sleep(sparqlSleep)
#print(label_exists)
#print()
# Also label must exist with NO description
graph_pattern = ' ?entity rdfs:label ' + safe_quotes(label_string) + '@' + language + '''.
?entity schema:description ?desc.
filter(lang(?desc)="''' + language + '")'
#print('Checking ' + language + ' label: "' + label_string + '" with description')
#print(graph_pattern)
label_with_description = ask_query(graph_pattern)
sleep(sparqlSleep)
#print(label_with_description)
#print()
# A True indicates that there is a label with no description for that language
return label_exists and not(label_with_description)
def check_for_only_description(description_string, language):
# Description must exist
graph_pattern = ' ?entity schema:description ' + safe_quotes(description_string) + '@' + language + '.'
#print('Checking ' + language + ' description: "' + description_string + '"')
#print(graph_pattern)
description_exists = ask_query(graph_pattern)
sleep(sparqlSleep)
#print(description_exists)
#print()
# Also description must exist with NO label
graph_pattern = ' ?entity schema:description ' + safe_quotes(description_string) + '@' + language + '''.
?entity rdfs:label ?label.
filter(lang(?label)="''' + language + '")'
#print('Checking ' + language + ' description: "' + description_string + '" with label')
#print(graph_pattern)
description_with_record = ask_query(graph_pattern)
sleep(sparqlSleep)
#print(description_with_record)
#print()
# A True indicates that there is a label with no description for that language
return description_exists and not(description_with_record)
# search for any of the "label" types: label, alias, description
def searchLabelsDescriptionsAtWikidata(qIds, labelType, language):
# create a string for all of the Wikidata item IDs to be used as subjects in the query
alternatives = ''
for qId in qIds:
alternatives += 'wd:' + qId + '\n'
if labelType == 'label':
predicate = 'rdfs:label'
elif labelType == 'alias':
predicate = 'skos:altLabel'
elif labelType == 'description':
predicate = 'schema:description'
else:
predicate = 'rdfs:label'
# create a string for the query
query = ''
# SPARQL queries to wikibases other than Wikidata won't necessarily have wd: defined
# automatically. In those cases, the prefix must be defined in a preamble to the query.
if DOMAIN_NAME != 'http://www.wikidata.org':
query += 'PREFIX wd: <' + DOMAIN_NAME + '/entity/>\n'
query += 'select distinct ?id ?string '
query += '''where {
VALUES ?id
{
''' + alternatives + '''}
?id '''+ predicate + ''' ?string.
filter(lang(?string)="''' + language + '''")
}'''
#print(query)
returnValue = []
#r = requests.post(endpoint, data=query.encode('utf-8'), headers=request_header)
r = requests.post(endpoint, data=dict(query=query), headers=request_header)
data = r.json()
results = data['results']['bindings']
for result in results:
# remove wd: 'http://www.wikidata.org/entity/'
qNumber = extractFromIri(result['id']['value'], 4)
string = result['string']['value']
resultsDict = {'qId': qNumber, 'string': string}
returnValue.append(resultsDict)
# delay to avoid hitting the SPARQL endpoint too rapidly
sleep(sparqlSleep)
return returnValue
# Generate a UUID for the value node identifier when there isn't already one
def generateNodeId(rowData, columnNameRoot):
changed = False
# Only do something in the case where there is a value. Missing values and blank nodes should be skipped.
if rowData[columnNameRoot + '_val'] != '' and rowData[columnNameRoot + '_val'][:2] != '_:':
# If there is no UUID in the _nodeId column, generate one
if rowData[columnNameRoot + '_nodeId'] == '':
rowData[columnNameRoot + '_nodeId'] = str(uuid.uuid4())
changed = True
return rowData, changed
def generate_uuid_bnode(string):
"""If string is bnode abbreviation, add UUID and return as string."""
if string == '_:':
return True, '_:' + str(uuid.uuid4())
else:
return False, string
# Function to check for the particular form of xsd:dateTime required for full dates in Wikidata
# See https://stackoverflow.com/questions/41129921/validate-an-iso-8601-datetime-string-in-python
regex = r'^(-?(?:[1-9][0-9]*)?[0-9]{4})-(1[0-2]|0[0-9])-(3[01]|0[0-9]|[12][0-9])T([0][0]):([0][0]):([0][0])(Z)$'
match_iso8601 = re.compile(regex).match
def validate_iso8601(str_val):
try:
if match_iso8601( str_val ) is not None:
return True
except:
pass
return False
# Function to check for valid abbreviated dates
def validate_time(date_text):
try:
if date_text != datetime.strptime(date_text, "%Y-%m-%d").strftime("%Y-%m-%d"):
raise ValueError
form = 'day'
except ValueError:
try:
if date_text != datetime.strptime(date_text, "%Y-%m").strftime('%Y-%m'):
raise ValueError
form = 'month'
except ValueError:
try:
if date_text != datetime.strptime(date_text, "%Y").strftime('%Y'):
raise ValueError
form = 'year'
except ValueError:
form ='none'
return form
# Function to convert times to the format required by Wikidata
def convertDates(rowData, dateColumnNameRoot):
error = False
changed = False
# Only do something in the case where there is a date. Missing values and bland nodes should be skipped.
if rowData[dateColumnNameRoot + '_val'] != '' and rowData[dateColumnNameRoot + '_val'][:2] != '_:':
# Assume that if the precision column is empty that the dates need to be converted
if rowData[dateColumnNameRoot + '_prec'] == '':
changed = True
#print(dateColumnNameRoot, rowData[dateColumnNameRoot + '_val'])
# set these two to default to the existing values
# precisionNumber = int(rowData[dateColumnNameRoot + '_prec']) # not necessary since conditional on value of ''
timeString = rowData[dateColumnNameRoot + '_val']
value = rowData[dateColumnNameRoot + '_val']
date_type = validate_time(value)
# date is YYYY-MM-DD
if date_type == 'day':
timeString = value + 'T00:00:00Z'
precisionNumber = 11 # precision to days
# date is YYYY-MM
elif date_type == 'month':
timeString = value + '-00T00:00:00Z'
precisionNumber = 10 # precision to months
# date is YYYY
elif date_type == 'year':
timeString = value + '-00-00T00:00:00Z'
precisionNumber = 9 # precision to years
# date does not conform to any of the tested options
else:
# date is xsd:dateTime and doesn't need adjustment
if validate_iso8601(value):
timeString = value
precisionNumber = 11 # assume precision to days since Wikibase doesn't support greater resolution than that
# date form unknown, don't adjust
else:
#print('Warning: date for ' + dateColumnNameRoot + '_val:', rowData[dateColumnNameRoot + '_val'], 'does not conform to any standard format! Check manually.')
error = True
precisionNumber = '' # must have a value to prevent an error, will be ignored since the write and save will be killed
# assign the changed values back to the dict
rowData[dateColumnNameRoot + '_val'] = timeString
rowData[dateColumnNameRoot + '_prec'] = precisionNumber
else:
# Check that a pre-existing value for the date string conforms to the Wikidata format requirements
if validate_iso8601(rowData[dateColumnNameRoot + '_val']):
# a pre-existing precisionNumber must be an integer when written to the API
try:
rowData[dateColumnNameRoot + '_prec'] = int(rowData[dateColumnNameRoot + '_prec'])
except: # throw an error if characters can't be converted to an integer
error = True
else:
error = True
return rowData, error, changed
# Find the column with the UUID for the statement
def findPropertyUuid(propertyId, columns):
statementUuidColumn = '' # start value as empty string in case no UUID column
nUuidColumns = 0
for column in columns:
if not('suppressOutput' in column):
# find the valueUrl in the column for which the value of the statement has the prop version of the property as its propertyUrl
valueString = column['propertyUrl'].partition('prop/')[2] # This will pick up all kinds of properties, but only p: properties will have PID directly after 'prop/'
if propertyId == valueString:
nUuidColumns += 1
temp = column['valueUrl'].partition('-{')[2]
statementUuidColumn = temp.partition('}')[0] # in the event of two columns with the same property ID, the last one is used
#print(statementUuidColumn)
# Give a warning if there isn't any UUID column for the property
if statementUuidColumn == '':
print('Warning: No UUID column for property ' + propertyId)
if nUuidColumns > 1:
print('Warning: there are', nUuidColumns,'for property',propertyId)
return statementUuidColumn
# Each property can have zero to many references. This function searches the column headers to find all of
# the columns that are references for a particulary property used in statements
def findReferencesForProperty(statementUuidColumn, columns):
# build up a list of dictionaries about references to associate with the property
referenceList = []
# Step through the columns looking for references associated with the property
for column in columns:
if not('suppressOutput' in column):
# check if the aboutUrl for the column has the statement subject UUID column as the about value and that the propertyUrl value is wasDerivedFrom
if ('prov:wasDerivedFrom' in column['propertyUrl']) and (statementUuidColumn in column['aboutUrl']):
temp = column['valueUrl'].partition('{')[2]
refHashColumn = temp.partition('}')[0]
#print(refHashColumn)
# These are the lists that will accumulate data about each property of the reference
refPropList = [] # P ID for the property
refValueColumnList = [] # column header string for the reference property's value
refEntityOrLiteral = [] # values: entity or literal, determined by presence of a valueUrl key for the column
refTypeList = [] # the datatype of the property's value: url, time, or string
refValueTypeList = [] # the specific type of a string: time or string
refLangList = [] # the language of monolingualtext
# The kind of value in the column (dateTime, string) can be retrieved directly from the column 'datatype' value
# Now step throught the columns looking for each of the properties that are associated with the reference
for propColumn in columns:
if not('suppressOutput' in propColumn):
# Find the columns that have the refHash column name in the aboutUrl
if refHashColumn in propColumn['aboutUrl']:
# Determine whether the value of the reference is a value node (e.g. dates) or a direct value
valueString = propColumn['propertyUrl'].partition('prop/reference/')[2]
if "value" in valueString: # e.g. value/P813
# The property IRI namespace for references with value nodes is http://www.wikidata.org/prop/reference/value/
refPropList.append(valueString.partition('value/')[2])
# The column title will be something like employer_ref1_retrieved_nodeId,
# so get the root of the string to the left of "_nodeId"
refValueColumnList.append(propColumn['titles'].partition('_nodeId')[0])
refLangList.append('')
# Find out what kind of value node it is. Currently supported is date; future: globe coordinate value and quantities
for testColumn in columns:
try:
if propColumn['titles'] in testColumn['aboutUrl']:
if 'timeValue' in testColumn['propertyUrl']: # value is a date
refEntityOrLiteral.append('value')
refTypeList.append('time')
refValueTypeList.append('time')
elif 'geoLatitude' in testColumn['propertyUrl']: # value is a globe coordinate value
refEntityOrLiteral.append('value')
refTypeList.append('globe-coordinate')
refValueTypeList.append('globecoordinate')
elif 'quantityAmount' in testColumn['propertyUrl']: # value is a quantity
refEntityOrLiteral.append('value')
refTypeList.append('quantity')
refValueTypeList.append('quantity')
else:
continue
except:
pass
else: # e.g. P854
# The property IRI namespace for references with direct values is http://www.wikidata.org/prop/reference/
refPropList.append(valueString)
# Just use the whole column title
refValueColumnList.append(propColumn['titles'])
if 'valueUrl' in propColumn:
refLangList.append('')
# URIs are detected when there is a valueUrl whose value has a first character of "{"
if propColumn['valueUrl'][0] == '{':
refEntityOrLiteral.append('literal')
refTypeList.append('url')
refValueTypeList.append('string')
else:
refEntityOrLiteral.append('entity')
refTypeList.append('wikibase-item')
refValueTypeList.append('wikibase-entityid')
else:
# monolingualtext detected by language tag
if 'lang' in propColumn:
refEntityOrLiteral.append('monolingualtext')
refTypeList.append('monolingualtext')
refValueTypeList.append('monolingualtext')
refLangList.append(propColumn['lang'])
# plain text string
else:
refEntityOrLiteral.append('literal')
refTypeList.append('string')
refValueTypeList.append('string')
refLangList.append('')
# After all of the properties have been found and their data have been added to the lists,
# insert the lists into the reference list as values in a dictionary
referenceList.append({'refHashColumn': refHashColumn, 'refPropList': refPropList, 'refValueColumnList': refValueColumnList, 'refEntityOrLiteral': refEntityOrLiteral, 'refTypeList': refTypeList, 'refValueTypeList': refValueTypeList, 'refLangList': refLangList})
# After every column has been searched for references associated with the property, return the reference list
#print('References: ', json.dumps(referenceList, indent=2))
return referenceList
# Each property can have zero to many qualifiers. This function searches the column headers to find all of
# the columns that are qualifiers for a particulary property
def findQualifiersForProperty(statementUuidColumn, columns):
# These are the lists that will accumulate data about each qualifier
qualPropList = [] # P ID for the property
qualValueColumnList = [] # column header string for the reference property's value
qualEntityOrLiteral = [] # values: entity or literal, determined by presence of a valueUrl key for the column
qualTypeList = [] # the datatype of the qualifier's value: url, time, or string
qualValueTypeList = [] # the specific type of a string: time or string
qualLangList = [] # the language of monolingualtext
# The kind of value in the column (dateTime, string) can be retrieved directly from the column 'datatype' value
for column in columns:
if not('suppressOutput' in column):
# find the column that has the statement UUID in the about
# and the property is a qualifier property
if (statementUuidColumn in column['aboutUrl']) and ('qualifier' in column['propertyUrl']):
# Determine whether the value of the qualifier is a value node (e.g. dates) or a direct value
valueString = column['propertyUrl'].partition('prop/qualifier/')[2]
if "value" in valueString: # e.g. value/P580
qualLangList.append('')
# The property IRI namespace for qualifiers with value nodes is http://www.wikidata.org/prop/qualifier/value/
qualPropList.append(valueString.partition('value/')[2])
# The column title will be something like employer_startDate_nodeId,
# so get the root of the string to the left of "_nodeId"
qualValueColumnList.append(column['titles'].partition('_nodeId')[0])
# Find out what kind of value node it is.
for testColumn in columns:
try:
if column['titles'] in testColumn['aboutUrl']:
if 'timeValue' in testColumn['propertyUrl']: # value is a date
qualEntityOrLiteral.append('value')
qualTypeList.append('time')
qualValueTypeList.append('time')
elif 'geoLatitude' in testColumn['propertyUrl']: # value is a globe coordinate value
qualEntityOrLiteral.append('value')
qualTypeList.append('globe-coordinate')
qualValueTypeList.append('globecoordinate')
pass
elif 'quantityAmount' in testColumn['propertyUrl']: # value is a quantity
qualEntityOrLiteral.append('value')
qualTypeList.append('quantity')
qualValueTypeList.append('quantity')
else:
continue
except:
pass
else: # e.g. P1545
# The property IRI namespace for qualifiers with direct values is http://www.wikidata.org/prop/qualifier/
qualPropList.append(valueString)
# Just use the whole column title
qualValueColumnList.append(column['titles'])
# determine whether the qualifier is an entity/URI or string
if 'valueUrl' in column:
qualLangList.append('')
# URIs are detected when there is a valueUrl whose value has a first character of "{"
if column['valueUrl'][0] == '{':
qualEntityOrLiteral.append('literal')
qualTypeList.append('url')
qualValueTypeList.append('string')
else:
qualEntityOrLiteral.append('entity')
qualTypeList.append('wikibase-item')
qualValueTypeList.append('wikibase-entityid')
else:
# monolingualtext detected by language tag
if 'lang' in column:
qualEntityOrLiteral.append('monolingualtext')
qualTypeList.append('monolingualtext')
qualValueTypeList.append('monolingualtext')
qualLangList.append(column['lang'])
# plain text string
else:
qualEntityOrLiteral.append('literal')
qualTypeList.append('string')
qualValueTypeList.append('string')
qualLangList.append('')
# After all of the qualifier columns are found for the property, create a dictionary to pass back
qualifierDictionary = {'qualPropList': qualPropList, 'qualValueColumnList': qualValueColumnList, "qualEntityOrLiteral": qualEntityOrLiteral, 'qualTypeList': qualTypeList, 'qualValueTypeList': qualValueTypeList, 'qualLangList': qualLangList}
#print('Qualifiers: ', json.dumps(qualifierDictionary, indent=2))
return(qualifierDictionary)
# The form of snaks is the same for references and qualifiers, so they can be generated systematically
# Although the variable names include "ref", they apply the same to the analagous "qual" variables.
def generateSnaks(snakDictionary, require_references, refValue, refPropNumber, refPropList, refValueColumnList, refValueTypeList, refTypeList, refEntityOrLiteral):
if not(refValue): # evaluates both empty strings for direct values or empty dict for node-valued values
if require_references: # Do not write the record if it's missing a reference.
print('Reference value missing! Cannot write the record.')
sys.exit()
else:
if refEntityOrLiteral[refPropNumber] == 'value':
if refTypeList[refPropNumber] == 'time':
# Wikibase model requires leading + sign for dates
if refValue['timeValue'][0] != '-':
refValue['timeValue'] = '+' + refValue['timeValue']
snakDictionary[refPropList[refPropNumber]] = [
{
'snaktype': 'value',
'property': refPropList[refPropNumber],
'datavalue':{
'value': {
'time': refValue['timeValue'],
'timezone': 0,
'before': 0,
'after': 0,
'precision': refValue['timePrecision'],
'calendarmodel': 'http://www.wikidata.org/entity/' + calendar_model
},
'type': 'time'
},
'datatype': 'time'
}
]
elif refTypeList[refPropNumber] == 'quantity':
if refValue['amount'][0] != '-':
refValue['amount'] = '+' + refValue['amount']
snakDictionary[refPropList[refPropNumber]] = [
{
'snaktype': 'value',
'property': propertiesIdList[propertyNumber],
'datavalue':{
'value':{
'amount': refValue['amount'], # a string for a decimal number; must have leading + or -
# NOTE: the wikibase.cloud API does not enforce the Wikidata namespace for units.
# So any IRI can be used. We assume that the units are defined in the Wikibase instance.
'unit': DOMAIN_NAME + '/entity/' + refValue['unit'] # IRI as a string
},
'type': 'quantity',
},
'datatype': 'quantity'
}
]
elif refTypeList[refPropNumber] == 'globe-coordinate':
snakDictionary[refPropList[refPropNumber]] = [
{
'snaktype': 'value',
'property': propertiesIdList[propertyNumber],
'datavalue': {
'value': {
'latitude': float(refValue['latitude']), # latitude; decimal number
'longitude': float(refValue['longitude']), # longitude; decimal number
'precision': float(refValue['precision']), # precision; decimal number
'globe': 'http://www.wikidata.org/entity/' + globe_value # defaults to the earth
},
'type': 'globecoordinate'
},
'datatype': 'globe-coordinate'
}
]
# other unsupported types
else:
pass
elif refEntityOrLiteral[refPropNumber] == 'entity':
# case where the value is an entity
snakDictionary[refPropList[refPropNumber]] = [
{
'snaktype': 'value',
'property': refPropList[refPropNumber],
'datavalue': {
'value': {
'id': refValue
},
'type': 'wikibase-entityid'
},
'datatype': 'wikibase-item'
}
]
elif refEntityOrLiteral[refPropNumber] == 'monolingualtext':
# language-tagged literals
snakDictionary[refPropList[refPropNumber]] = [
{
'snaktype': 'value',
'property': refPropList[refPropNumber],
'datavalue': {
'value': {
'text': refValue['text'],
'language': refValue['language']
},
'type': 'monolingualtext'
},
'datatype': 'monolingualtext'
}
]
else:
# case where value is a string of some kind
snakDictionary[refPropList[refPropNumber]] = [
{
'snaktype': 'value',
'property': refPropList[refPropNumber],
'datavalue': {
'value': refValue,
'type': refValueTypeList[refPropNumber]
},
'datatype': refTypeList[refPropNumber]
}
]
return snakDictionary
# If there are references for a statement, return a reference list
def createReferences(referenceListForProperty, rowData):
referenceListToReturn = []
for referenceDict in referenceListForProperty:
refPropList = referenceDict['refPropList']
refValueColumnList = referenceDict['refValueColumnList']
refValueTypeList = referenceDict['refValueTypeList']
refTypeList = referenceDict['refTypeList']
refEntityOrLiteral = referenceDict['refEntityOrLiteral']
refLangList = referenceDict['refLangList']
snakDictionary = {}
for refPropNumber in range(0, len(refPropList)):
if refEntityOrLiteral[refPropNumber] == 'value':
# value nodes with no nodeId should be considered to have no value