-
Notifications
You must be signed in to change notification settings - Fork 0
/
bq_exporter.py
91 lines (73 loc) · 2.91 KB
/
bq_exporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
'''
WARNING: for the sake of expediency, this script is expected to run from Google Colab
'''
try:
from google.colab import auth
from google.cloud import bigquery
import argparse
import os
import json
def load_dict_into_bq(project, dataset, nl_json_filename):
print(f"Exporting {nl_json_filename}")
# Construct a BigQuery client object.
client = bigquery.Client(project=project)
table = nl_json_filename.split("/")[-1].split(".")[-2]
format = nl_json_filename.split("/")[-1].split(".")[-1]
if format == "json":
source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
elif format == "csv":
source_format = bigquery.SourceFormat.CSV
else:
print(f"Failed to recognize input format")
return
job_config = bigquery.LoadJobConfig(
source_format=source_format, autodetect=True,
)
table_id = f"{project}.{dataset}.{table}"
with open(nl_json_filename, "rb") as source_file:
job = client.load_table_from_file(source_file, table_id, job_config=job_config)
try:
job.result() # Waits for the job to complete.
table = client.get_table(table_id) # Make an API request.
print(
"Loaded {} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id
)
)
except:
print(f"Failed to import into BQ")
def main():
parser = argparse.ArgumentParser(
usage="%(prog)s --output-dataset <BQ dataset> --paying <project ID> --input-dir <dir>\n\n"
"This tool will import newline delimited JSONs from a folder into a BQ dataset"
)
parser.add_argument(
"--output-dataset",
"-b",
dest="output_dataset",
help="BigQuery dataset to store resulting dictionary tables",
required=True
)
parser.add_argument(
"--paying",
"-p",
dest="paying",
help="ID of the GCP project that will be used to charge for data egress (free if within the same region)",
required=True
)
parser.add_argument(
"--input-dir",
"-d",
dest="input_dir",
help="Directory that contains New Line delimited JSON content to be imported into BQ",
required=True
)
args = parser.parse_args()
for filename in os.listdir(args.input_dir):
if not filename.endswith(".json"):
continue
load_dict_into_bq(args.paying, args.output_dataset, os.path.join(args.input_dir,filename))
if __name__ == "__main__":
main()
except ImportError as e:
print("WARNING: for the sake of expediency, this script can only be used from Google Colab!")