-
Notifications
You must be signed in to change notification settings - Fork 6
/
collect_data.py
74 lines (62 loc) · 2.93 KB
/
collect_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""
This script can be used to run queries on BigQuery for any number of blockchains, and save the results in the
raw_block_data directory of the project.
The relevant queries must be stored in a file named 'queries.yaml' in the root directory of the project.
Attention! Before running this script, you need to generate service account credentials from Google, as described
here (https://developers.google.com/workspace/guides/create-credentials#service-account) and save your key in the
root directory of the project under the name 'google-service-account-key.json'
"""
import consensus_decentralization.helper as hlp
import google.cloud.bigquery as bq
import json
import argparse
import logging
from yaml import safe_load
from consensus_decentralization.helper import ROOT_DIR, RAW_DATA_DIR
def collect_data(ledgers, force_query):
if not RAW_DATA_DIR.is_dir():
RAW_DATA_DIR.mkdir()
with open(ROOT_DIR / "queries.yaml") as f:
queries = safe_load(f)
client = bq.Client.from_service_account_json(json_credentials_path=ROOT_DIR / "google-service-account-key.json")
for ledger in ledgers:
file = RAW_DATA_DIR / f'{ledger}_raw_data.json'
if not force_query and file.is_file():
logging.info(f'{ledger} data already exists locally. '
f'For querying {ledger} anyway please run the script using the flag --force-query')
continue
logging.info(f"Querying {ledger}..")
query = (queries[ledger])
query_job = client.query(query)
try:
rows = query_job.result()
logging.info(f'Done querying {ledger}')
except Exception as e:
logging.info(f'{ledger} query failed, please make sure it is properly defined.')
logging.info(f'The following exception was raised: {repr(e)}')
continue
logging.info(f"Writing {ledger} data to file..")
# write json lines to file
with open(file, 'w') as f:
for row in rows:
f.write(json.dumps(dict(row), default=str) + "\n")
logging.info(f'Done writing {ledger} data to file.\n')
if __name__ == '__main__':
logging.basicConfig(format='[%(asctime)s] %(message)s', datefmt='%Y/%m/%d %I:%M:%S %p', level=logging.INFO)
default_ledgers = hlp.get_ledgers()
parser = argparse.ArgumentParser()
parser.add_argument(
'--ledgers',
nargs="*",
type=str.lower,
default=default_ledgers,
choices=[ledger for ledger in default_ledgers],
help='The ledgers to collect data for.'
)
parser.add_argument(
'--force-query',
action='store_true',
help='Flag to specify whether to query for project data regardless if the relevant data already exist.'
)
args = parser.parse_args()
collect_data(ledgers=args.ledgers, force_query=args.force_query)