Easily integrate data in BigQuery
from pygbq import Client
import requests
client = Client()
token = client.get_secret('secret_name')
headers = {'Authorization': f'Bearer {token}'}
url = ...
data = requests.get(url, headers=headers).json()
response = client.update_table_using_temp(data=data, table_id='mydataset.mytable', how=['id'])
This snippets gets some data from an url
and (merges)[https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement] ((upserts)[https://en.wikipedia.org/wiki/Merge_(SQL)]) on id
column it to the table mytable
in the dataset mydataset
.
pip install pygbq
Set up the authentication.
PyGBQ generates one or many temporary tables that are merged into the target table. During the merge all the columns of the target table are updated. Here's how it looks like:
- Split
data
into batches. - For every batch create
mydataset.mytable_tmp_SOMERANDOMPOSTFIX
, put it inside and run
MERGE myproject.mydataset.mytable T
USING myproject.mydataset.mytable_tmp_SOMERANDOMPOSTFIX S
ON T.column1 = S.column1 AND T.column2 = S.column2
WHEN NOT MATCHED THEN
INSERT ROW
WHEN MATCHED THEN
UPDATE SET
column1 = S.column1,
column2 = S.column2,
column3 = S.column3,
column4 = S.column4
...
- Creates a table
mydataset.mytable
with schema automatically generated bybigquery-schema-generator
. - Splits data into batches and inserts it to
mydataset.mytable
.
Identical to how='replace'
except that it fails if mydataset.mytable
exists.
Splits data into batches and inserts (appends) it to mydataset.mytable
.
For more details look at Documentation section.
Here's the documentation with default parameters.
from pygbq import Client
client = Client(default_dataset=None, path_to_key=None)
Initalizes a client. You can specify:
default_dataset
- (str) default dataset that theclient
will be using to reference tablespath_to_key
- (str) By default PyGQB usesfrom google.auth import default
to get credentials, but you can specify this parameter if you wish to usefrom google.auth import load_credentials_from_file
instead.
client.update_table_using_temp(data, table_id, how, schema: Union[str, List[dict]] = None, expiration=1, max_insert_num_rows=4000)
Updates table.
data
- list of dicttable_id
- (str) Table id, could have one of the following forms:table_name
ifdefault_dataset
is setdataset_name.table_name
project_id.dataset_name.table_name
how
- (str or List[dict]) Look at How it works sectionexpiration
- (float) temporary tables expiration time in hoursmax_insert_num_rows
- (int) how many rows per temporary table is inserted
client.get_secret(self, secret_id, version="latest")
Get a secret stored in Secret Manager.
secret_id
- (str) Secret nameversion
- Secret version
client.get_secret(self, secret_id, version="latest")
Adds a new secret version in Secret Manager.
secret_id
- (str) Secret namedata
- (str) Secret value
from pygbq import read_jsonl
read_jsonl(name: str = "data.jsonl")
Reads a new line delimited json.