/
id_to_qid.py
executable file
·150 lines (116 loc) · 3.62 KB
/
id_to_qid.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
If query.sparql and the input data file are downloaded,
the script can simply be called as:
python iq-to-qid.py
"""
import argparse
import os
import time
import pandas as pd
import requests
from requests.exceptions import HTTPError
from tqdm import tqdm, trange
HEADERS = {"User-Agent": "ID-to-QID"}
def main(
test: bool = False,
inputFile: str = "github.csv",
pageSize: int = 125000,
batchSize: int = 100,
inputList=[],
batch="",
):
"""From a list of IDs, go through and sequentially output a list of their properties and values."""
with open("query.sparql") as r:
query = r.readlines()
query = "".join(query)
if len(inputList) == 0:
if not os.path.isfile(inputFile):
print(inputFile + " does not exist")
exit()
with open(inputFile) as f:
inputList = [line.strip() for line in f]
for j in range(0, len(inputList), pageSize):
if j < 4500000:
continue
start = j
end = j + pageSize
data = []
for i in trange(start, end, batchSize):
IDs = inputList[i : i + batchSize]
if not IDs:
break
IDstring = " ".join(["'" + q + "'" for q in IDs])
data += getData(query, IDstring)
if test:
break
df = pd.DataFrame(data, columns=["qid", "doi", "wdLicenseQID"])
if batch:
df.to_csv("crossref/data/qid-doi-" + batch + ".csv", index=False)
else:
df.to_csv(
"crossref/data/qid-doi-" + str(start) + "-" + str(end) + ".csv",
index=False,
)
# missingDois = set(inputList).difference(set(df["doi"]))
# with open("missing-dois.txt", "w") as w:
# w.write('\n'.join(missingDois))
def getData(query, IDstring):
# print( query.format(
# values=IDstring,
# ))
# exit()
data = runQuery(
query.format(
values=IDstring,
)
)
# print(query.format(
# values=IDstring,
# ))
# exit()
output = []
for item in data["results"]["bindings"]:
QID = item["item"]["value"][31:]
id = item["id"]["value"]
if "license" in item:
license = item["license"]["value"]
else:
license = None
output.append([QID, id, license])
return output
def runQuery(query):
url = "https://query.wikidata.org/sparql"
params = {"query": query, "format": "json"}
try:
response = requests.get(url, params=params, headers=HEADERS)
return response.json()
except HTTPError as e:
print(response.text)
print(e.response.text)
print(query)
return {"results": {"bindings": []}}
except BaseException as err:
print(query)
print(f"Unexpected {err=}, {type(err)=}")
raise
def timer(tick, msg=""):
print("--- %s %.3f seconds ---" % (msg, time.time() - tick))
return time.time()
def defineArgParser():
"""Creates parser for command line arguments"""
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"-t",
"--test",
help="",
action="store_true",
)
return parser
if __name__ == "__main__":
argParser = defineArgParser()
clArgs = argParser.parse_args()
tick = time.time()
main(test=clArgs.test)
timer(tick)