# GBQ Datapipeline

The purpose of this script is to automatically query GBQ in a python script. 
<br> It allows to send SQL queries to GBQ and returns a pandas df containing the result.

## 1. Authenticate with bigquery and create client

In [5]:
# import google cloud bigquery library 
from google.cloud import bigquery

# setting google application credentials as environment variable
%env GOOGLE_APPLICATION_CREDENTIALS=GBQ_python_credentials.json

# initiate a client to google big query 
client = bigquery.Client()

env: GOOGLE_APPLICATION_CREDENTIALS=GBQ_python_credentials.json


## 2. Function to call gbq and store result as pandas df (including query limit)

In [10]:
# Function 1 - queries gbq and stores result in df
def query_to_pandas(client, query):
    """
    sends query to gbq and returns result as pandas df
    arguments:
    client: gbq client object
    query: (str) sql query
    """
    # import libraries
    import pandas as pd

    # call cliet object with query
    results = client.query(query)

    # create iterator element
    iterator = results.result(timeout=30)
    rows = list(iterator)

    # Transform the rows into a nice pandas dataframe
    df = pd.DataFrame(data=[list(x.values()) for x in rows], columns=list(rows[0].keys()))

    # compute memory usage of df
    memory_usage = str(df.memory_usage(index=True).sum())
    print('size of resulting df (bytes): '+memory_usage)

    return df


# Function 2 - estimates query limit
def estimate_query_size(client, query):
        """
        Estimate gigabytes scanned by query.
        Does not consider if there is a cached query table.
        See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.dryRun
        """
        my_job_config = bigquery.job.QueryJobConfig()
        my_job_config.dry_run = True
        my_job = client.query(query, job_config=my_job_config)
        return my_job.total_bytes_processed / 2**30



# Function 3 - calls function 1 after checking query limit provided by function 2
def query_to_pandas_safe(client, query, query_limit_gb):
    """
    Queries only if query size is below query limit.
    Returns query as pd df or throws error message. 
    Calls two functions 
        (1) estimate query size
        (2) query to pandas
    """
    # check query limit
    query_size = estimate_query_size(client, query)

    if query_size <=  query_limit_gb:
        return query_to_pandas(client, query)
    else:
        msg = "Query cancelled; estimated size of {0} exceeds limit of {1} GB"
        print(msg.format(query_size, query_limit_gb))



## 3. Queries

### 3.1 Daily transactions per dApp

In [11]:
# compute daily transaction metrics per dApp
# sql query 
query = """
WITH dapps AS
        (SELECT 
            string_field_0 d_name,
            string_field_1 c_address
         FROM `eth-transactions.dapps_contract_link.dapps_contract_link` dapp_contract_link 
        ),
    txn2dapp AS  #contains only transactions to dapps 
        (SELECT
            txn.from_address sender,
            txn.to_address ,
            dapps.d_name d_name,
            txn.value value,
            txn.gas gas,
            txn.gas_price gas_price,
            txn.receipt_gas_used, 
            txn.block_timestamp block_timestamp
         FROM `bigquery-public-data.crypto_ethereum.transactions` AS txn
         INNER JOIN dapps
         ON txn.to_address = LOWER(dapps.c_address)
        )
SELECT 
    DATE(block_timestamp) day,
    d_name, 
    COUNT(*) txn_count,
    AVG(gas) avg_gas,
    MIN(gas) min_gas,
    MAX(gas) max_gas,
    AVG(gas_price) avg_gas_price,
    MIN(gas_price) min_gas_price,
    MAX(gas_price) max_gas_price,
    AVG(value) avg_value,
    MIN(value) min_value,
    MAX(value) max_value,
    AVG(receipt_gas_used) avg_receipt_gas_used,
    MIN(receipt_gas_used) min_receipt_gas_used,
    MAX(receipt_gas_used) max_receipt_gas_used
FROM txn2dapp
GROUP BY 1,2 
ORDER BY 1
"""

In [12]:
# run query 
daily_txn_metrics = query_to_pandas(client, query)
# store result in local json file 
daily_txn_metrics.to_json(r'daily_dapp_txn_metrics.json')

size of resulting df (bytes): 39909608


# Google Big Query Helper Functions and Additional Examples

### Google Big Query Helper Functions
https://github.com/SohierDane/BigQuery_Helper

In [16]:
"""
Helper class to simplify common read-only BigQuery tasks.
"""


import pandas as pd
import time

from google.cloud import bigquery


class BigQueryHelper(object):
    """
    Helper class to simplify common BigQuery tasks like executing queries,
    showing table schemas, etc without worrying about table or dataset pointers.
    See the BigQuery docs for details of the steps this class lets you skip:
    https://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/reference.html
    """

    def __init__(self, active_project, dataset_name, max_wait_seconds=180):
        self.project_name = active_project
        self.dataset_name = dataset_name
        self.max_wait_seconds = max_wait_seconds
        self.client = bigquery.Client()
        self.__dataset_ref = self.client.dataset(self.dataset_name, project=self.project_name)
        self.dataset = None
        self.tables = dict()  # {table name (str): table object}
        self.__table_refs = dict()  # {table name (str): table reference}
        self.total_gb_used_net_cache = 0
        self.BYTES_PER_GB = 2**30

    def __fetch_dataset(self):
        """
        Lazy loading of dataset. For example,
        if the user only calls `self.query_to_pandas` then the
        dataset never has to be fetched.
        """
        if self.dataset is None:
            self.dataset = self.client.get_dataset(self.__dataset_ref)

    def __fetch_table(self, table_name):
        """
        Lazy loading of table
        """
        self.__fetch_dataset()
        if table_name not in self.__table_refs:
            self.__table_refs[table_name] = self.dataset.table(table_name)
        if table_name not in self.tables:
            self.tables[table_name] = self.client.get_table(self.__table_refs[table_name])

    def __handle_record_field(self, row, schema_details, top_level_name=''):
        """
        Unpack a single row, including any nested fields.
        """
        name = row['name']
        if top_level_name != '':
            name = top_level_name + '.' + name
        schema_details.append([{
            'name': name,
            'type': row['type'],
            'mode': row['mode'],
            'fields': pd.np.nan,
            'description': row['description']
                               }])
        # float check is to dodge row['fields'] == np.nan
        if type(row.get('fields', 0.0)) == float:
            return None
        for entry in row['fields']:
            self.__handle_record_field(entry, schema_details, name)

    def __unpack_all_schema_fields(self, schema):
        """
        Unrolls nested schemas. Returns dataframe with one row per field,
        and the field names in the format accepted by the API.
        Results will look similar to the website schema, such as:
            https://bigquery.cloud.google.com/table/bigquery-public-data:github_repos.commits?pli=1
        Args:
            schema: DataFrame derived from api repr of raw table.schema
        Returns:
            Dataframe of the unrolled schema.
        """
        schema_details = []
        schema.apply(lambda row:
            self.__handle_record_field(row, schema_details), axis=1)
        result = pd.concat([pd.DataFrame.from_dict(x) for x in schema_details])
        result.reset_index(drop=True, inplace=True)
        del result['fields']
        return result

    def table_schema(self, table_name):
        """
        Get the schema for a specific table from a dataset.
        Unrolls nested field names into the format that can be copied
        directly into queries. For example, for the `github.commits` table,
        the this will return `committer.name`.
        This is a very different return signature than BigQuery's table.schema.
        """
        self.__fetch_table(table_name)
        raw_schema = self.tables[table_name].schema
        schema = pd.DataFrame.from_dict([x.to_api_repr() for x in raw_schema])
        # the api_repr only has the fields column for tables with nested data
        if 'fields' in schema.columns:
            schema = self.__unpack_all_schema_fields(schema)
        # Set the column order
        schema = schema[['name', 'type', 'mode', 'description']]
        return schema

    def list_tables(self):
        """
        List the names of the tables in a dataset
        """
        self.__fetch_dataset()
        return([x.table_id for x in self.client.list_tables(self.dataset)])

    def estimate_query_size(self, query):
        """
        Estimate gigabytes scanned by query.
        Does not consider if there is a cached query table.
        See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.dryRun
        """
        my_job_config = bigquery.job.QueryJobConfig()
        my_job_config.dry_run = True
        my_job = self.client.query(query, job_config=my_job_config)
        return my_job.total_bytes_processed / self.BYTES_PER_GB

    def query_to_pandas(self, query):
        """
        Execute a SQL query & return a pandas dataframe
        """
        my_job = self.client.query(query)
        start_time = time.time()
        while not my_job.done():
            if (time.time() - start_time) > self.max_wait_seconds:
                print("Max wait time elapsed, query cancelled.")
                self.client.cancel_job(my_job.job_id)
                return None
            time.sleep(0.1)
        # Queries that hit errors will return an exception type.
        # Those exceptions don't get raised until we call my_job.to_dataframe()
        # In that case, my_job.total_bytes_billed can be called but is None
        if my_job.total_bytes_billed:
            self.total_gb_used_net_cache += my_job.total_bytes_billed / self.BYTES_PER_GB
        return my_job.to_dataframe()

    def query_to_pandas_safe(self, query, max_gb_scanned=1):
        """
        Execute a query, but only if the query would scan less than `max_gb_scanned` of data.
        """
        query_size = self.estimate_query_size(query)
        if query_size <= max_gb_scanned:
            return self.query_to_pandas(query)
        msg = "Query cancelled; estimated size of {0} exceeds limit of {1} GB"
        print(msg.format(query_size, max_gb_scanned))

    def head(self, table_name, num_rows=5, start_index=None, selected_columns=None):
        """
        Get the first n rows of a table as a DataFrame.
        Does not perform a full table scan; should use a trivial amount of data as long as n is small.
        """
        self.__fetch_table(table_name)
        active_table = self.tables[table_name]
        schema_subset = None
        if selected_columns:
            schema_subset = [col for col in active_table.schema if col.name in selected_columns]
        results = self.client.list_rows(active_table, selected_fields=schema_subset,
            max_results=num_rows, start_index=start_index)
        results = [x for x in results]
        return pd.DataFrame(
            data=[list(x.values()) for x in results], columns=list(results[0].keys()))

In [33]:
# enter the public dataset you want to query and an active project
bq_assistant = BigQueryHelper("eth-transactions", "bigquery-public-data.crypto_ethereum")

In [None]:
# Quering GBQ with BigQueryHelper
    
    # BigQueryHelper Functions
# Queries dataset and returns a panda dataframe
df = bq_assistant.query_to_pandas(query)

# Queries dataset and returns a panda dataframe + allows to set a max scan limit
df = bq_assistant.query_to_pandas_safe(query, max_gb_scanned=40)

# Lists all tables in the dataset
bq_assistant.list_tables()

# Shows the head of a specific table
bq_assistant.head("table_name", num_rows=3)

# Shows details about colums 
bq_assistant.table_schema("table_name")

# check estimated size of a query
bq_assistant.estimate_query_size(query)

    # other usefull functions
# Print size of dataframe
print('Size of dataframe: {} Bytes'.format(int(df.memory_usage(index=True, deep=True).sum())))

### Example queries 

In [3]:
# example bigquery code that queries the shakespeare sample data 

from google.cloud import bigquery

client = bigquery.Client()

query = """
    SELECT corpus AS title, COUNT(word) AS unique_words
    FROM `bigquery-public-data.samples.shakespeare`
    GROUP BY title
    ORDER BY unique_words
    DESC LIMIT 10
"""
results = client.query(query)

for row in results:
    title = row['title']
    unique_words = row['unique_words']
    print(f'{title:<20} | {unique_words}')

hamlet               | 5318
kinghenryv           | 5104
cymbeline            | 4875
troilusandcressida   | 4795
kinglear             | 4784
kingrichardiii       | 4713
2kinghenryvi         | 4683
coriolanus           | 4653
2kinghenryiv         | 4605
antonyandcleopatra   | 4582


In [None]:
# example bigquery python code that queries the public github dataset

from google.cloud import bigquery

client = bigquery.Client()

query = """
    SELECT subject AS subject, COUNT(*) AS num_duplicates
    FROM bigquery-public-data.github_repos.commits
    GROUP BY subject
    ORDER BY num_duplicates
    DESC LIMIT 10
"""
results = client.query(query)

for row in results:
    subject = row['subject']
    num_duplicates = row['num_duplicates']
    print(f'{subject:<20} | {num_duplicates:>9,}')

In [None]:
# code that disables caching 
# code to show query statistics 

from google.cloud import bigquery

client = bigquery.Client()

query = """
    SELECT subject AS subject, COUNT(*) AS num_duplicates
    FROM bigquery-public-data.github_repos.commits
    GROUP BY subject
    ORDER BY num_duplicates
    DESC LIMIT 10
"""
# disable caching -> 
job_config = bigquery.job.QueryJobConfig(use_query_cache=False)
results = client.query(query, job_config=job_config)

for row in results:
    subject = row['subject']
    num_duplicates = row['num_duplicates']
    print(f'{subject:<20} | {num_duplicates:>9,}')

# query statistics -> the statistics are provided by the job object
print('-'*60)
print(f'Created: {results.created}')
print(f'Ended:   {results.ended}')
print(f'Bytes:   {results.total_bytes_processed:,}')