# BigQuery Pipeline
Google Cloud Datalab enables productionizing (i.e. scheduling and orchestrating) notebooks that accomplish ETL with BigQuery and GCS. This notebook illustrates how the 'pipeline' subcommand accomplishes this.

This requires either Cloud Composer to be running with an active environment, or Airflow installed on a VM.  

This requires prior setup with either Airflow or Google Cloud Composer

## Airflow 
Copy and execute airflow_vm_create.sh (https://github.com/googledatalab/notebooks/blob/master/samples/contrib/pipeline/airflow_vm_create.sh) in a terminal. This will create a VM in your project with Airflow running on it. 

## Google Cloud Composer 
Refer https://cloud.google.com/composer/docs/quickstart to setup an active Composer environment.

In [66]:
%%bq pipeline --help

usage: %bq pipeline [-h] -n NAME [-e ENVIRONMENT] [-l LOCATION]
                    [-d GCS_DAG_BUCKET] [-f GCS_DAG_FILE_PATH]

Creates a pipeline to execute a SQL query to transform data using BigQuery.

optional arguments:
  -h, --help            show this help message and exit
  -n NAME, --name NAME  BigQuery pipeline name
  -e ENVIRONMENT, --environment ENVIRONMENT
                        The name of the Google Cloud Composer environment.
  -l LOCATION, --location LOCATION
                        The location of the Google Cloud Composer environment.
  -d GCS_DAG_BUCKET, --gcs_dag_bucket GCS_DAG_BUCKET
                        The Google Cloud Storage bucket for the Airflow dags.
  -f GCS_DAG_FILE_PATH, --gcs_dag_file_path GCS_DAG_FILE_PATH
                        The file path suffix for the Airflow dags.


# Building a data transformation pipeline
Define your query in BQ SQL and a destination table for the results, test it using the 'execute' subcommand, and then create a pipeline that does this on a daily schedule. The config (i.e. the cell-body) for the 'pipeline' subcommand provides a 'transformation' key for referencing queries that have been defined previously in the notebook, and an 'output' key for specifying the destination table, and the 'mode'.

In [0]:
%%bq query --name current_year_query
SELECT * FROM `cloud-ml-dev.rajivpb_demo.the_datetime_table` WHERE CAST(EXTRACT(YEAR FROM the_datetime) AS STRING) = @_ts_year


In [0]:
results_table = 'cloud-ml-dev.rajivpb_demo.the_datetime_table_results'

In [72]:
%%bq execute --query current_year_query --table $results_table --mode overwrite

the_datetime
2017-06-22T06:51:07.708789
2017-06-24T05:50:44.783137
2017-06-24T04:38:10.811248
2017-06-24T09:11:28.942573
2017-06-24T17:10:28.700188
2017-06-24T23:24:25.651131
2017-06-25T00:09:11.087442
2017-06-24T23:50:06.981845
2017-06-25T06:48:10.197584
2017-06-21T19:00:31.495821


In [74]:
%%bq pipeline --name bq-trasform-bq --environment datalab-testing --location us-central1
transformation:
  query: current_year_query
output:
  table: $results_table
  mode: overwrite
schedule:
  start: 2017-12-01T00:00:00
  end:  2017-12-04T00:00:00
  interval: '@daily'

'\nimport datetime\nfrom airflow import DAG\nfrom airflow.operators.bash_operator import BashOperator\nfrom airflow.contrib.operators.bigquery_operator import BigQueryOperator\nfrom airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator\nfrom airflow.contrib.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator\nfrom airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator\nfrom airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator\nfrom google.datalab.contrib.bigquery.operators._bq_load_operator import LoadOperator\nfrom google.datalab.contrib.bigquery.operators._bq_execute_operator import ExecuteOperator\nfrom google.datalab.contrib.bigquery.operators._bq_extract_operator import ExtractOperator\nfrom datetime import timedelta\n\ndefault_args = {\n    \'owner\': \'Google Cloud Datalab\',\n    \'email\': [],\n    \'start_date\': datetime.datetime.strptime(\'2017-12-01T00:00:00\', \'%Y-%m-%d

# Load data from GCS into BigQuery
Schedule loading data from Google Cloud Storage path into a BigQuery table by specifying a 'load' config section as illustrated below. The 'load' section has the following nested keys:
 - path: source GCS path
 - table: destination BigQuery table
 - schema: Required if the table does not exist and requires creation
 - mode: 'append' (default) or overwrite, for the BQ table
 - format: 'csv' (default) or 'json'
 
If format is csv, the following nested keys can be specified:
 - delimiter: The separator for fields in a CSV file. BigQuery converts the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the data as raw binary (default ',').
 - strict: If True, accept rows in CSV files that are missing trailing optionalcolumns; the missing values are treated as nulls (default False).
 - skip: A number of rows at the top of a CSV file to skip (default 0).
 - quote: The value used to quote data sections in a CSV file; default '"'.


In [0]:
iris_data_path='gs://cloud-ml-dev-nikhilko/iris/data/data_predict.csv'

## Parameterize GCS paths and BQ table names
GCS paths and BQ tables can be parameterized using built-in macros that capture common scenarios around using the pipeline's execution date values. 

 - '_ds': the date formatted as YYYY-MM-DD
 - '_ts': the full ISO-formatted timestamp YYYY-MM-DDTHH:MM:SS.mmmmmm
 - '_ds_nodash': the date formatted as YYYYMMDD (i.e. YYYY-MM-DD with 'no dashes')
 - '_ts_nodash': the timestamp formatted as YYYYMMDDTHHMMSSmmmmmm (i.e full ISO-formatted timestamp without dashes or colons)
 - '_ts_year': 4-digit year
 - '_ts_month': '1'-'12'
 - '_ts_day': '1'-'31'
 - '_ts_hour': '0'-'23'
 - '_ts_minute': '0'-'59'
 - '_ts_second': '0'-'59'

TODO(rajivpb): Very soon, one will also be able to specify user-defined macros here (https://github.com/googledatalab/pydatalab/issues/629) in addition to the above built-in macros.


In [0]:
iris_table='cloud-ml-dev.rajivpb_demo.iris_table_%(_ts_nodash)s'

In [92]:
%%bq pipeline --name load_pipeline --environment datalab-testing --location us-central1
load:
  path: $iris_data_path
  table: $iris_table
  mode: overwrite
  schema:
    - name: prediction
      type: INTEGER
    - name: col1
      type: FLOAT
    - name: col2
      type: FLOAT
    - name: col3
      type: FLOAT
    - name: col4
      type: FLOAT

'\nimport datetime\nfrom airflow import DAG\nfrom airflow.operators.bash_operator import BashOperator\nfrom airflow.contrib.operators.bigquery_operator import BigQueryOperator\nfrom airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator\nfrom airflow.contrib.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator\nfrom airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator\nfrom airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator\nfrom google.datalab.contrib.bigquery.operators._bq_load_operator import LoadOperator\nfrom google.datalab.contrib.bigquery.operators._bq_execute_operator import ExecuteOperator\nfrom google.datalab.contrib.bigquery.operators._bq_extract_operator import ExtractOperator\nfrom datetime import timedelta\n\ndefault_args = {\n    \'owner\': \'Google Cloud Datalab\',\n    \'email\': [],\n    \'start_date\': datetime.datetime.strptime(\'2017-12-01T22:48:58\', \'%Y-%m-%d

Alternately, separate input and output sections can be used for achieving the same.

In [93]:
%%bq pipeline --name load_pipeline_alternate --environment datalab-testing --location us-central1
input:
  path: $iris_data_path
output:
  table: $iris_table
  mode: overwrite
  schema:
    - name: prediction
      type: INTEGER
    - name: col1
      type: FLOAT
    - name: col2
      type: FLOAT
    - name: col3
      type: FLOAT
    - name: col4
      type: FLOAT

'\nimport datetime\nfrom airflow import DAG\nfrom airflow.operators.bash_operator import BashOperator\nfrom airflow.contrib.operators.bigquery_operator import BigQueryOperator\nfrom airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator\nfrom airflow.contrib.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator\nfrom airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator\nfrom airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator\nfrom google.datalab.contrib.bigquery.operators._bq_load_operator import LoadOperator\nfrom google.datalab.contrib.bigquery.operators._bq_execute_operator import ExecuteOperator\nfrom google.datalab.contrib.bigquery.operators._bq_extract_operator import ExtractOperator\nfrom datetime import timedelta\n\ndefault_args = {\n    \'owner\': \'Google Cloud Datalab\',\n    \'email\': [],\n    \'start_date\': datetime.datetime.strptime(\'2017-12-01T22:50:05\', \'%Y-%m-%d

# Extract data from BigQuery into GCS
This is similar to the previous 'load', except with table as input and path as output. 


In [0]:
iris_data_extracted_path='gs://rajivpb-airflow-testing/iris_data_extracted_path_%(_ts_nodash)s.csv'


In [97]:
%%bq pipeline --name extract_pipeline --environment datalab-testing --location us-central1
extract:
  table: $iris_table
  path: $iris_data_extracted_path
  format: csv
  csv:
    delimiter: '#'
     

'\nimport datetime\nfrom airflow import DAG\nfrom airflow.operators.bash_operator import BashOperator\nfrom airflow.contrib.operators.bigquery_operator import BigQueryOperator\nfrom airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator\nfrom airflow.contrib.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator\nfrom airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator\nfrom airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator\nfrom google.datalab.contrib.bigquery.operators._bq_load_operator import LoadOperator\nfrom google.datalab.contrib.bigquery.operators._bq_execute_operator import ExecuteOperator\nfrom google.datalab.contrib.bigquery.operators._bq_extract_operator import ExtractOperator\nfrom datetime import timedelta\n\ndefault_args = {\n    \'owner\': \'Google Cloud Datalab\',\n    \'email\': [],\n    \'start_date\': datetime.datetime.strptime(\'2017-12-01T22:51:30\', \'%Y-%m-%d

Alternately, similar to load, separate input and output sections can be used for achieving the same.

In [98]:
%%bq pipeline --name extract_pipeline_alternate --environment datalab-testing --location us-central1
input:
  table: $iris_table
output:
  path: $iris_data_extracted_path
  format: csv
  csv:
    delimiter: '#'

'\nimport datetime\nfrom airflow import DAG\nfrom airflow.operators.bash_operator import BashOperator\nfrom airflow.contrib.operators.bigquery_operator import BigQueryOperator\nfrom airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator\nfrom airflow.contrib.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator\nfrom airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator\nfrom airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator\nfrom google.datalab.contrib.bigquery.operators._bq_load_operator import LoadOperator\nfrom google.datalab.contrib.bigquery.operators._bq_execute_operator import ExecuteOperator\nfrom google.datalab.contrib.bigquery.operators._bq_extract_operator import ExtractOperator\nfrom datetime import timedelta\n\ndefault_args = {\n    \'owner\': \'Google Cloud Datalab\',\n    \'email\': [],\n    \'start_date\': datetime.datetime.strptime(\'2017-12-01T22:51:35\', \'%Y-%m-%d

# Schedule
The schedule section is specified with the following keys:
 - start: datetime (string) after which the pipeline execution will be scheduled. If this is in the past, backfill jobs will be created. Default is datetime.now()
 - end: datetime (string) after which pipeline execution will not be scheduled. Default is infinity (i.e. pipelines will be scheduled "forever")
 - interval: This can take the following values: @once, @hourly, @daily, @weekly, @montly, @yearly, or None (if the pipeline was not to be scheduled). Default is @once.
 
## "One-off" runs  
The schedule section is optional, and can be ommitted. When omitted, the defaults will be in effect, i.e. start = now(), end = forever, and interval = @once. Thus the pipeline is scheduled for execution exactly once. This is useful for debugging pipelines to verify that expected outcomes. 


# SQL-based data transformation pipeline for GCS data
Of special mention is the ability to directly query data in GCS by specifying 'input' as the query source. This works by using BigQuery's External Data Source: https://cloud.google.com/bigquery/external-data-sources

In [0]:
%%bq query --name foo_query
SELECT col1, col2 FROM input

In [101]:
%%bq pipeline --name gcs_transform_gcs --environment datalab-testing --location us-central1
input:
  path: $iris_data_path
  schema:
    - name: prediction
      type: INTEGER
    - name: col1
      type: FLOAT
    - name: col2
      type: FLOAT
    - name: col3
      type: FLOAT
    - name: col4
      type: FLOAT
transformation:
  query: foo_query
output:
  path: gs://rajivpb-airflow-testing/gcs_transform_gcs_output_%(_ts_nodash)s.csv
schedule:
  start: 2017-12-01T00:00:00
  end:  2017-12-04T00:00:00
  interval: '@daily'

'\nimport datetime\nfrom airflow import DAG\nfrom airflow.operators.bash_operator import BashOperator\nfrom airflow.contrib.operators.bigquery_operator import BigQueryOperator\nfrom airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator\nfrom airflow.contrib.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator\nfrom airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator\nfrom airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator\nfrom google.datalab.contrib.bigquery.operators._bq_load_operator import LoadOperator\nfrom google.datalab.contrib.bigquery.operators._bq_execute_operator import ExecuteOperator\nfrom google.datalab.contrib.bigquery.operators._bq_extract_operator import ExtractOperator\nfrom datetime import timedelta\n\ndefault_args = {\n    \'owner\': \'Google Cloud Datalab\',\n    \'email\': [],\n    \'start_date\': datetime.datetime.strptime(\'2017-12-01T00:00:00\', \'%Y-%m-%d