
# Python BigQuery Client(s)

This notebook covers many ways of interacting with BigQuery tables from Python, including:
- ways unique to Jupyter notebooks like magics
- built in methods for pandas like pandas-gbq methods
- the BigQuery Python clinet
- BigQuery storage api for fast, multiprocessing, reads directly from BigQuery storage


**Notes:**
- The `LIMIT 5` statement does limit the number of rows returned by BigQuery to 5 but BigQuery still does a full table scan.  If you have a table larger than 1GB and want to limit the rows scanned for a quick review then replacing `LIMIT 5` with `TABLESAMPLE SYSTEM (1 PERCENT)` would be more efficient.  For tables under 1GB it will still return the full table.  More on [Table Sampling](https://cloud.google.com/bigquery/docs/table-sampling)
- Each of the examples below run the same query in BigQuery.  The query is cached on the first run for up to 24 hours.  This means the subsequent, identical queries will not scan the data and instead use the cached results table.  More information on [Using cached query results](https://cloud.google.com/bigquery/docs/cached-results).



**Resources:**
- [BigQuery Python Client](https://cloud.google.com/python/docs/reference/bigquery/latest)
    - Interact with BigQuery compute to run queries
- [BigQuery Storage API Python Client](https://cloud.google.com/python/docs/reference/bigquerystorage/latest)
    - directly read from BigQuery storage through streams which support multiprocessing
- Using BigQuery From Python, Notebooks in This Repository
    - [01 - Data Sources/01 - BigQuery - Table Data Sources](../01%20-%20Data%20Sources/01%20-%20BigQuery%20-%20Table%20Data%20Source.ipynb)
    - [03 - BigQuery ML (BQML)/03 - Introduction to BigQuery ML (BQML)](../03%20-%20BigQuery%20ML%20(BQML)/03%20-%20Introduction%20to%20BigQuery%20ML%20(BQML).ipynb)
    - [Applied Forecasting/BigQuery Time Series Forecasting Data Review and Preparation](../Applied%20Forecasting/BigQuery%20Time%20Series%20Forecasting%20Data%20Review%20and%20Preparation.ipynb)


---
## Setup

inputs:

In [1]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
PROJECT_ID

'statmike-mlops-349915'

In [2]:
# source data
BQ_PROJECT = 'bigquery-public-data'
BQ_DATASET = 'ml_datasets'
BQ_TABLE = 'ulb_fraud_detection'

packages:

In [74]:
import timeit
import concurrent.futures

import pandas as pd
from google.cloud import bigquery
from google.cloud import bigquery_storage

clients:

In [75]:
bq = bigquery.Client(project = PROJECT_ID)
bqstorage = bigquery_storage.BigQueryReadClient()

parameters:

In [76]:
BQ_SOURCE = f'{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}'

---
## BigQuery From Jupyter

### BigQuery Cell Magic

See the documentation for [IPython Magics for BigQuery](https://cloud.google.com/python/docs/reference/bigquery/latest/magics)

In [77]:
%%bigquery
SELECT *
FROM bigquery-public-data.ml_datasets.ulb_fraud_detection # this cannot be parameterized with magics
LIMIT 5

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,282.0,-0.356466,0.725418,1.971749,0.831343,0.369681,-0.107776,0.75161,-0.120166,-0.420675,...,0.020804,0.424312,-0.015989,0.466754,-0.809962,0.657334,-0.04315,-0.046401,0.0,0
1,14332.0,1.07195,0.340678,1.784068,2.846396,-0.751538,0.403028,-0.73492,0.205807,1.092726,...,-0.169632,-0.113604,0.067643,0.468669,0.223541,-0.112355,0.014015,0.021504,0.0,0
2,32799.0,1.153477,-0.047859,1.358363,1.48062,-1.222598,-0.48169,-0.654461,0.128115,0.907095,...,0.125514,0.480049,-0.025964,0.701843,0.417245,-0.257691,0.060115,0.035332,0.0,0
3,35799.0,-0.769798,0.622325,0.242491,-0.586652,0.527819,-0.104512,0.209909,0.669861,-0.304509,...,0.152738,0.255654,-0.130237,-0.660934,-0.493374,0.331855,-0.011101,0.049089,0.0,0
4,36419.0,1.04796,0.145048,1.624573,2.932652,-0.726574,0.690451,-0.627288,0.278709,0.318434,...,0.078499,0.658942,-0.06781,0.476882,0.52683,0.219902,0.070627,0.028488,0.0,0


---
## BigQuery From Python

Use the [BigQuery Python Client](https://cloud.google.com/python/docs/reference/bigquery/latest) to query BigQuery and return results as Pandas dataframes.

### BigQuery API From Python


In [78]:
query = f"""
    SELECT * 
    FROM `{BQ_SOURCE}`
    LIMIT 5
"""
preview = bq.query(query = query).to_dataframe()
preview

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,282.0,-0.356466,0.725418,1.971749,0.831343,0.369681,-0.107776,0.75161,-0.120166,-0.420675,...,0.020804,0.424312,-0.015989,0.466754,-0.809962,0.657334,-0.04315,-0.046401,0.0,0
1,14332.0,1.07195,0.340678,1.784068,2.846396,-0.751538,0.403028,-0.73492,0.205807,1.092726,...,-0.169632,-0.113604,0.067643,0.468669,0.223541,-0.112355,0.014015,0.021504,0.0,0
2,32799.0,1.153477,-0.047859,1.358363,1.48062,-1.222598,-0.48169,-0.654461,0.128115,0.907095,...,0.125514,0.480049,-0.025964,0.701843,0.417245,-0.257691,0.060115,0.035332,0.0,0
3,35799.0,-0.769798,0.622325,0.242491,-0.586652,0.527819,-0.104512,0.209909,0.669861,-0.304509,...,0.152738,0.255654,-0.130237,-0.660934,-0.493374,0.331855,-0.011101,0.049089,0.0,0
4,36419.0,1.04796,0.145048,1.624573,2.932652,-0.726574,0.690451,-0.627288,0.278709,0.318434,...,0.078499,0.658942,-0.06781,0.476882,0.52683,0.219902,0.070627,0.028488,0.0,0


### BigQuery Python Client: Helper Function

In [79]:
def bq_runner(query):
    return bq.query(query = query)

In [80]:
bq_runner(
    query = f"""
        SELECT * 
        FROM `{BQ_SOURCE}`
        LIMIT 5
    """
).to_dataframe()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,282.0,-0.356466,0.725418,1.971749,0.831343,0.369681,-0.107776,0.75161,-0.120166,-0.420675,...,0.020804,0.424312,-0.015989,0.466754,-0.809962,0.657334,-0.04315,-0.046401,0.0,0
1,14332.0,1.07195,0.340678,1.784068,2.846396,-0.751538,0.403028,-0.73492,0.205807,1.092726,...,-0.169632,-0.113604,0.067643,0.468669,0.223541,-0.112355,0.014015,0.021504,0.0,0
2,32799.0,1.153477,-0.047859,1.358363,1.48062,-1.222598,-0.48169,-0.654461,0.128115,0.907095,...,0.125514,0.480049,-0.025964,0.701843,0.417245,-0.257691,0.060115,0.035332,0.0,0
3,35799.0,-0.769798,0.622325,0.242491,-0.586652,0.527819,-0.104512,0.209909,0.669861,-0.304509,...,0.152738,0.255654,-0.130237,-0.660934,-0.493374,0.331855,-0.011101,0.049089,0.0,0
4,36419.0,1.04796,0.145048,1.624573,2.932652,-0.726574,0.690451,-0.627288,0.278709,0.318434,...,0.078499,0.658942,-0.06781,0.476882,0.52683,0.219902,0.070627,0.028488,0.0,0


### BigQuery Python Client: Using Query Job Attributes and Methods

Query Jobs have Methods and Attributes that can benefit the Python workflow:
- Query Job [Methods](https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.QueryJob.html#google.cloud.bigquery.job.QueryJob:~:text=for%20accurate%20signature.-,Methods,-__init__(job_id%2C%C2%A0query)
- Query Job [Attributes](https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.QueryJob.html#google.cloud.bigquery.job.QueryJob:~:text=from%20a%20QueryJob-,Attributes,-allow_large_results)

BigQuery Query Job (using helper function):

In [81]:
job = bq_runner(
    query = f"""
        SELECT * 
        FROM `{BQ_SOURCE}`
        LIMIT 5
    """
)

Using Query Job Atrributes to get timing:

In [82]:
job.result()
(job.ended-job.started).total_seconds()

0.089

Using Query Job Methods to retrieve result to Pandas dataframe:

In [83]:
job.to_dataframe()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,282.0,-0.356466,0.725418,1.971749,0.831343,0.369681,-0.107776,0.75161,-0.120166,-0.420675,...,0.020804,0.424312,-0.015989,0.466754,-0.809962,0.657334,-0.04315,-0.046401,0.0,0
1,14332.0,1.07195,0.340678,1.784068,2.846396,-0.751538,0.403028,-0.73492,0.205807,1.092726,...,-0.169632,-0.113604,0.067643,0.468669,0.223541,-0.112355,0.014015,0.021504,0.0,0
2,32799.0,1.153477,-0.047859,1.358363,1.48062,-1.222598,-0.48169,-0.654461,0.128115,0.907095,...,0.125514,0.480049,-0.025964,0.701843,0.417245,-0.257691,0.060115,0.035332,0.0,0
3,35799.0,-0.769798,0.622325,0.242491,-0.586652,0.527819,-0.104512,0.209909,0.669861,-0.304509,...,0.152738,0.255654,-0.130237,-0.660934,-0.493374,0.331855,-0.011101,0.049089,0.0,0
4,36419.0,1.04796,0.145048,1.624573,2.932652,-0.726574,0.690451,-0.627288,0.278709,0.318434,...,0.078499,0.658942,-0.06781,0.476882,0.52683,0.219902,0.070627,0.028488,0.0,0


### Indirect use with pandas-gbq

When working with [Pandas](https://pandas.pydata.org/docs/user_guide/index.html#user-guide) the methods above show the client returning data to pandas dataframes.  This section will show a pandas mudule, [pandas-gbq](https://pandas-gbq.readthedocs.io/en/latest/) that wraps the BigQuery client so that pandas can retrieve BigQuery data to dataframes.

References:
- [Comparison of BigQuery Client with pandas-gbq](https://cloud.google.com/bigquery/docs/pandas-gbq-migration)

#### Package Install (if needed)

In [84]:
try:
    import pandas_gbq
except ImportError:
    print('You need to pip install pandas-gbq')
    !pip install pandas-gbq -q --user

#### Using pandas-gbq

In [85]:
query = f"""
SELECT * 
FROM `{BQ_SOURCE}`
LIMIT 5
"""
df = pd.read_gbq(query, project_id = PROJECT_ID)

In [86]:
df

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,282.0,-0.356466,0.725418,1.971749,0.831343,0.369681,-0.107776,0.75161,-0.120166,-0.420675,...,0.020804,0.424312,-0.015989,0.466754,-0.809962,0.657334,-0.04315,-0.046401,0.0,0
1,14332.0,1.07195,0.340678,1.784068,2.846396,-0.751538,0.403028,-0.73492,0.205807,1.092726,...,-0.169632,-0.113604,0.067643,0.468669,0.223541,-0.112355,0.014015,0.021504,0.0,0
2,32799.0,1.153477,-0.047859,1.358363,1.48062,-1.222598,-0.48169,-0.654461,0.128115,0.907095,...,0.125514,0.480049,-0.025964,0.701843,0.417245,-0.257691,0.060115,0.035332,0.0,0
3,35799.0,-0.769798,0.622325,0.242491,-0.586652,0.527819,-0.104512,0.209909,0.669861,-0.304509,...,0.152738,0.255654,-0.130237,-0.660934,-0.493374,0.331855,-0.011101,0.049089,0.0,0
4,36419.0,1.04796,0.145048,1.624573,2.932652,-0.726574,0.690451,-0.627288,0.278709,0.318434,...,0.078499,0.658942,-0.06781,0.476882,0.52683,0.219902,0.070627,0.028488,0.0,0


---

## BigQuery Storage API From Python

BigQuery has an API for directly reading and writing to storage - no need for a query which uses the compute side of BigQuery.  This interacts directly with a single table, not a view, and reads/write rows with some basic filterting capabilities to limit the returned records.  Since BigQuery is columnar storage you can boost efficiency by only reading the columns needed with these methods!


### Setup a Read Session

Setup the options for the session: filter rows, limit columns read

Use [bigquery_storage.types.ReadSession.TableReadOptions()](https://cloud.google.com/python/docs/reference/bigquerystorage/latest/google.cloud.bigquery_storage_v1.types.ReadSession.TableReadOptions)

In [132]:
read_options = bigquery_storage.types.ReadSession.TableReadOptions(
    #row_restriction = "Amount > 0",
    selected_fields = [f'V{n+1}' for n in range(28)]
)

Setup a read session for a table:

Use [bigquery_storage.types.ReadSession()](https://cloud.google.com/python/docs/reference/bigquerystorage/latest/google.cloud.bigquery_storage_v1.types.ReadSession)

In [133]:
session = bigquery_storage.types.ReadSession(
    table = f"projects/{BQ_PROJECT}/datasets/{BQ_DATASET}/tables/{BQ_TABLE}",
    data_format = bigquery_storage.types.DataFormat.ARROW,
    read_options = read_options
)

### Create A Read Session

Use the [BigQueryReadClient](https://cloud.google.com/python/docs/reference/bigquerystorage/latest/google.cloud.bigquery_storage_v1.client.BigQueryReadClient) to create a read session with [create_read_session](https://cloud.google.com/python/docs/reference/bigquerystorage/latest/google.cloud.bigquery_storage_v1.client.BigQueryReadClient#google_cloud_bigquery_storage_v1_client_BigQueryReadClient_create_read_session).

>`max_stream_count` set the upper limit for the number of streams returned.  The actual number will be determined based on the tables size and storage layout and could be less than the value provided.  Specifying `0` indicates the system can determine the best value for the `max_stream_counts` and uses a default limit of 1000.

In [134]:
read_session = bqstorage.create_read_session(
    parent = f'projects/{PROJECT_ID}',
    read_session = session,
    max_stream_count = 0
)

Check for the number of streams returned.  A value of would mean all records would be read through a single stream.  A value larger than 1 indicates different streams would be used to read different parts of the table.

In [135]:
len(read_session.streams)

2

### Read the table: One Stream At A Time



In [137]:
def read_stream(stream):
    # setup a reader
    reader = bqstorage.read_rows(
        name = stream.name
    )

    # start timer
    start = timeit.default_timer()

    # read rows from reader into a dataframe.  Note this is actually multiple operations - read and convert
    train = reader.to_dataframe()#.drop(['splits', 'transaction_id'], axis = 1)

    # stop timer and calculate elabsed time
    execution_time = timeit.default_timer() - start

    # report data shape and elapsed time
    print(f'The stream ({stream.name}) read {train.shape[0]} rows and {train.shape[1]} columns in {execution_time} seconds')

    return train

In [138]:
train0 = read_stream(read_session.streams[0])

The stream (projects/statmike-mlops-349915/locations/us/sessions/CAISDGtWemZpSFNrMXRDNBoCamQaAmpmGgJpchoCb2oaAmpxGgJuYRoCb3MaAm93GgJqchoCaXcaAmpjGgJwehoCcHkaAmpzGgJweBoCb3YaAmppGgJpYRoCaWMaAnBs/streams/GgJqZBoCamYaAmlyGgJvahoCanEaAm5hGgJvcxoCb3caAmpyGgJpdxoCamMaAnB6GgJweRoCanMaAnB4GgJvdhoCamkaAmlhGgJpYxoCcGwoAg) read 142736 rows and 28 columns in 0.2528313589282334 seconds


In [139]:
train1 = read_stream(read_session.streams[1])

The stream (projects/statmike-mlops-349915/locations/us/sessions/CAISDGtWemZpSFNrMXRDNBoCamQaAmpmGgJpchoCb2oaAmpxGgJuYRoCb3MaAm93GgJqchoCaXcaAmpjGgJwehoCcHkaAmpzGgJweBoCb3YaAmppGgJpYRoCaWMaAnBs/streams/CAEaAmpkGgJqZhoCaXIaAm9qGgJqcRoCbmEaAm9zGgJvdxoCanIaAml3GgJqYxoCcHoaAnB5GgJqcxoCcHgaAm92GgJqaRoCaWEaAmljGgJwbCgC) read 142071 rows and 28 columns in 0.2890617420198396 seconds


In [140]:
train0.shape[0] + train1.shape[0]

284807

In [141]:
train = pd.concat([train0, train1])

In [142]:
train.shape

(284807, 28)

### Read the table: Async Streams

In [143]:
start = timeit.default_timer()

train = []
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = {
        executor.submit(read_stream, stream): stream for stream in read_session.streams
    }
    for future in concurrent.futures.as_completed(futures):
        stream = futures[future]
        train.append(future.result())
        
print(f'The total elapsed time was: {timeit.default_timer() - start}')

The stream (projects/statmike-mlops-349915/locations/us/sessions/CAISDGtWemZpSFNrMXRDNBoCamQaAmpmGgJpchoCb2oaAmpxGgJuYRoCb3MaAm93GgJqchoCaXcaAmpjGgJwehoCcHkaAmpzGgJweBoCb3YaAmppGgJpYRoCaWMaAnBs/streams/GgJqZBoCamYaAmlyGgJvahoCanEaAm5hGgJvcxoCb3caAmpyGgJpdxoCamMaAnB6GgJweRoCanMaAnB4GgJvdhoCamkaAmlhGgJpYxoCcGwoAg) read 142736 rows and 28 columns in 0.36878692999016494 seconds
The stream (projects/statmike-mlops-349915/locations/us/sessions/CAISDGtWemZpSFNrMXRDNBoCamQaAmpmGgJpchoCb2oaAmpxGgJuYRoCb3MaAm93GgJqchoCaXcaAmpjGgJwehoCcHkaAmpzGgJweBoCb3YaAmppGgJpYRoCaWMaAnBs/streams/CAEaAmpkGgJqZhoCaXIaAm9qGgJqcRoCbmEaAm9zGgJvdxoCanIaAml3GgJqYxoCcHoaAnB5GgJqcxoCcHgaAm92GgJqaRoCaWEaAmljGgJwbCgC) read 142071 rows and 28 columns in 0.38595614093355834 seconds
The total elapsed time was: 0.5918880190001801


In [144]:
len(train)

2

In [145]:
train[0].shape, train[1].shape

((142736, 28), (142071, 28))

In [146]:
train = pd.concat(train)
train.shape

(284807, 28)

### Full Example With Bigger Table

In [147]:
read_options = bigquery_storage.types.ReadSession.TableReadOptions(
    #row_restriction = "rank > 10",
    selected_fields = ['dma_id', 'term', 'week', 'score']
)

In [148]:
session = bigquery_storage.types.ReadSession(
    table = f"projects/{BQ_PROJECT}/datasets/google_trends/tables/top_terms",
    data_format = bigquery_storage.types.DataFormat.ARROW,
    read_options = read_options
)

In [149]:
read_session = bqstorage.create_read_session(
    parent = f'projects/{PROJECT_ID}',
    read_session = session,
    max_stream_count = 2
)

In [150]:
len(read_session.streams)

2

In [151]:
start = timeit.default_timer()

train = []
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = {
        executor.submit(read_stream, stream): stream for stream in read_session.streams
    }
    for future in concurrent.futures.as_completed(futures):
        stream = futures[future]
        train.append(future.result())
        
print(f'The total elapsed time was: {timeit.default_timer() - start}')

The stream (projects/statmike-mlops-349915/locations/us/sessions/CAISDHBDZ3dBOFBneXVwcxoCamQaAmpmGgJpchoCb2oaAmpxGgJuYRoCb3MaAm93GgJqchoCaXcaAmpjGgJwehoCcHkaAmpzGgJweBoCb3YaAmppGgJpYRoCaWMaAnBs/streams/CAEaAmpkGgJqZhoCaXIaAm9qGgJqcRoCbmEaAm9zGgJvdxoCanIaAml3GgJqYxoCcHoaAnB5GgJqcxoCcHgaAm92GgJqaRoCaWEaAmljGgJwbCgC) read 21811440 rows and 4 columns in 16.728841520962305 seconds
The stream (projects/statmike-mlops-349915/locations/us/sessions/CAISDHBDZ3dBOFBneXVwcxoCamQaAmpmGgJpchoCb2oaAmpxGgJuYRoCb3MaAm93GgJqchoCaXcaAmpjGgJwehoCcHkaAmpzGgJweBoCb3YaAmppGgJpYRoCaWMaAnBs/streams/GgJqZBoCamYaAmlyGgJvahoCanEaAm5hGgJvcxoCb3caAmpyGgJpdxoCamMaAnB6GgJweRoCanMaAnB4GgJvdhoCamkaAmlhGgJpYxoCcGwoAg) read 21872404 rows and 4 columns in 17.71911338099744 seconds
The total elapsed time was: 17.871793464990333


In [152]:
len(train)

2

In [153]:
train[0].shape, train[1].shape

((21811440, 4), (21872404, 4))

In [154]:
train = pd.concat(train)
train.shape

(43683844, 4)