# Create a  table snapshot in BigQuery

Selects all of the data from a given source table and appends it to a destination table with the current timestamp. This deals with schema changes by appending columns that did not previously exist to the dataset.

In [None]:
import pandas as pd
from google.cloud import bigquery

In [None]:
conn = stitch_context.connections['Default Warehouse']['client']

In [None]:
# # DO ONLY ONCE
# # create a new dataset for your daily snapshots

# dataset_ref = conn.dataset('snapshots_dataset')

# dataset = bigquery.Dataset(dataset_ref)
# dataset.location = 'US'
# conn.create_dataset(dataset)

In [None]:
# # DO ONLY ONCE
# # create a new table

# table_ref = dataset_ref.table('shakespeare_daily')
# table = bigquery.Table(table_ref)
# table = conn.create_table(table)  # API request

# assert table.table_id == 'shakespeare_daily'

In [None]:
# get info on destination dataset
dataset_ref = conn.dataset('snapshots_dataset')

# Retrieves the destination table and checks the length of the schema
table_id = 'shakespeare_daily'
table_ref = dataset_ref.table(table_id)
table = conn.get_table(table_ref)

log.info("Table {} contains {} columns.".format(table_id, len(table.schema)))

In [None]:
# configure the query to append the results to a destination table,
# allowing field addition

try:
    job_config = bigquery.QueryJobConfig()
    job_config.schema_update_options = [
        bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
    ]
    job_config.destination = table_ref
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
except Exception as e:
    log.error(e)

query_job = conn.query(
    
#     select everything from your source dataset
#     and add a created_at column with the current timestamp.
#     current_datetime returns UTC timestamp
    'SELECT *, CURRENT_DATETIME() as created_at from `bigquery-public-data.samples.shakespeare`;', 
    
# Location must match that of the dataset(s) referenced in the query
# and of the destination table.
    location='US',
    job_config=job_config
)  

In [None]:
try:
    query_job.result()  # Waits for the query to finish
    print("Query job {} complete.".format(query_job.job_id))
    log.info("Query job {} complete.".format(query_job.job_id))
except Exception as e:
    log.error(e)

# check the updated length of the schema

table = conn.get_table(table)

log.info("Table {} now contains {} columns.".format(table_id, len(table.schema)))

# look at the new timestamps

sql = """
    select distinct created_at
    from snapshots_dataset.shakespeare_daily
    limit 10
"""

conn.query(sql).to_dataframe()