In [61]:
"""
Helper class to simplify common read-only BigQuery tasks.
"""
import pandas as pd

from google.cloud import bigquery
import time


class BigQueryHelper(object):
    """
    Helper class to simplify common BigQuery tasks like executing queries,
    showing table schemas, etc without worrying about table or dataset pointers.

    See the BigQuery docs for details of the steps this class lets you skip:
    https://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/reference.html
    """

    BYTES_PER_GB = 2**30

    def __init__(self, active_project, dataset_name, max_wait_seconds=180):
        self.project_name = active_project
        self.dataset_name = dataset_name
        self.max_wait_seconds = max_wait_seconds
        self.client = bigquery.Client()
        self.__dataset_ref = self.client.dataset(self.dataset_name, project=self.project_name)
        self.dataset = None
        self.tables = dict()  # {table name (str): table object}
        self.__table_refs = dict()  # {table name (str): table reference}
        self.total_gb_used_net_cache = 0

    def __fetch_dataset(self):
        # Lazy loading of dataset. For example,
        # if the user only calls `self.query_to_pandas` then the
        # dataset never has to be fetched.
        if self.dataset is None:
            self.dataset = self.client.get_dataset(self.__dataset_ref)

    def __fetch_table(self, table_name):
        # Lazy loading of table
        self.__fetch_dataset()
        if table_name not in self.__table_refs:
            self.__table_refs[table_name] = self.dataset.table(table_name)
        if table_name not in self.tables:
            self.tables[table_name] = self.client.get_table(self.__table_refs[table_name])

    def table_schema(self, table_name):
        """
        Get the schema for a specific table from a dataset
        """
        self.__fetch_table(table_name)
        return(self.tables[table_name].schema)

    def list_tables(self):
        """
        List the names of the tables in a dataset
        """
        self.__fetch_dataset()
        return([x.table_id for x in self.client.list_tables(self.dataset)])

    def estimate_query_size(self, query):
        """
        Estimate gigabytes scanned by query.
        Does not consider if there is a cached query table.
        See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.dryRun
        """
        my_job_config = bigquery.job.QueryJobConfig()
        my_job_config.dry_run = True
        my_job = self.client.query(query, job_config=my_job_config)
        return my_job.total_bytes_processed / self.BYTES_PER_GB

    def query_to_pandas(self, query):
        """
        Take a SQL query & return a pandas dataframe
        """
        my_job = self.client.query(query)
        start_time = time.time()
        while not my_job.done():
            if (time.time() - start_time) > self.max_wait_seconds:
                print("Max wait time elapsed, query cancelled.")
                self.client.cancel_job(my_job.job_id)
                return None
            time.sleep(0.1)
        # Queries that hit errors will return an exception type.
        # Those exceptions don't get raised until we call my_job.to_dataframe()
        # In that case, my_job.total_bytes_billed can be called but is None
        if my_job.total_bytes_billed:
            self.total_gb_used_net_cache += my_job.total_bytes_billed / self.BYTES_PER_GB
        return my_job.to_dataframe()

    def query_to_pandas_safe(self, query, max_gb_scanned=1):
        """
        Execute a query if it's smaller than a certain number of gigabytes
        """
        query_size = self.estimate_query_size(query)
        if query_size <= max_gb_scanned:
            return self.query_to_pandas(query)
        msg = "Query cancelled; estimated size of {0} exceeds limit of {1} GB"
        print(msg.format(query_size, max_gb_scanned))

    def head(self, table_name, num_rows=5, start_index=None, selected_columns=None):
        """
        Get the first n rows of a table as a DataFrame
        """
        self.__fetch_table(table_name)
        active_table = self.tables[table_name]
        schema_subset = None
        if selected_columns:
            schema_subset = [col for col in active_table.schema if col.name in selected_columns]
        results = self.client.list_rows(active_table, selected_fields=schema_subset,
            max_results=num_rows, start_index=start_index)
        results = [x for x in results]
        return pd.DataFrame(
            data=[list(x.values()) for x in results], columns=list(sorted(results[0]._xxx_field_to_index, key=results[0]._xxx_field_to_index.get)))

In [62]:
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="DataVizMgmt-1e99d9535545.json"

nyc = BigQueryHelper(active_project="bigquery-public-data",
                                   dataset_name="new_york")

In [63]:
nyc.head("tlc_yellow_trips_2016", num_rows=5)

#nyc.table_schema("tlc_yellow_trips_2016")

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,imp_surcharge,total_amount
0,1,2016-04-03 10:57:51+00:00,2016-04-03 11:18:59+00:00,2,7.2,-73.997177,40.716515,1,N,-73.954987,40.777653,1,23.0,0.0,0.5,4.75,0.0,0.3,28.55
1,1,2016-07-05 00:11:29+00:00,2016-07-05 00:33:31+00:00,1,8.8,,,1,N,,,1,27.0,0.5,0.5,5.65,0.0,0.3,33.95
2,2,2016-02-29 08:52:02+00:00,2016-02-29 09:30:42+00:00,2,5.84,-73.970963,40.790916,1,N,-73.991661,40.731342,1,27.0,0.0,0.5,2.78,0.0,0.3,30.58
3,2,2016-01-17 16:33:57+00:00,2016-01-17 17:01:37+00:00,5,6.92,-73.956718,40.78096,1,N,-74.00174,40.730331,1,25.0,0.0,0.5,5.16,0.0,0.3,30.96
4,1,2016-01-30 00:42:39+00:00,2016-01-30 01:08:33+00:00,1,6.4,-74.005386,40.720814,1,N,-73.929634,40.685833,1,23.0,0.5,0.5,6.05,0.0,0.3,30.35
