<h1> OPTIONAL: Scalable retrieval of millions of rows of data from BigQuery </h1>

---
Before you start, **make sure that you are logged in with your student account**. Otherwise you may incur Google Cloud charges for using this notebook. 

---

In [0]:
!pip install apache-beam[gcp] google-apitools

Make sure to **restart the runtime** as instructed in the previous cell output before proceeding. 

**IMPORTANT:** Make sure that you **DO NOT reset the runtime**. Restarting and resetting the runtime are different operations. Reseting the runtime wipes out the file system and then restarts the runtime.

In [0]:
#@markdown Copy-paste your GCP Project ID in the following field:

PROJECT = "" #@param {type: "string"}


#@markdown When running this cell you will need to **uncheck "Reset all runtimes before running"** as shown on the following screenshot:
#@markdown ![](https://i.imgur.com/9dgw0h0.png)
#@markdown Next, use Shift-Enter to run this cell and to complete authentication.

try:  
  from google.colab import auth
  auth.authenticate_user()  
  print("AUTHENTICATED")
except:
  print("FAILED to authenticate")
  
REGION = "us-central1"   
BUCKET = "kmo-us-central1-misc"

# Copy taxi-*.csv files from github if they are missing from the runtime.
!wget -nc --quiet https://github.com/osipov/training-data-analyst/raw/master/bootcamps/serverless_ml/taxi-11k-datasets.zip  
!unzip -q -n taxi-11k-datasets.zip  

<h2> Environment variables for project and bucket </h2>



In [0]:
# for bash
import os
os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION
os.environ['TF_VERSION'] = '1.12' 

In [0]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

In [0]:
import tensorflow as tf
import apache_beam as beam
import shutil
print(tf.__version__)

<h2>Specifying query to pull the data </h2>

Let's pull out a few extra columns from the timestamp.

In [0]:
def create_query(phase, EVERY_N):
  """
  phase: 1=train 2=valid
  """
  base_query = """
    SELECT
      (tolls_amount + fare_amount) AS fare_amount,
      
      CONCAT( STRING(pickup_datetime), 
              CAST(pickup_longitude AS STRING), 
              CAST(pickup_latitude AS STRING),
              CAST(dropoff_latitude AS STRING), 
              CAST(dropoff_longitude AS STRING)) AS key,
              
      EXTRACT(DAYOFWEEK FROM pickup_datetime) AS dayofweek,
      EXTRACT(HOUR FROM pickup_datetime) AS hourofday,
      pickup_longitude AS pickuplon,
      pickup_latitude AS pickuplat,
      dropoff_longitude AS dropofflon,
      dropoff_latitude AS dropofflat,
      passenger_count*1.0 AS passengers
    FROM
      `nyc-tlc.yellow.trips`
    WHERE
      {}
      AND trip_distance > 0
      AND fare_amount >= 2.5
      AND pickup_longitude > -78
      AND pickup_longitude < -70
      AND dropoff_longitude > -78
      AND dropoff_longitude < -70
      AND pickup_latitude > 37
      AND pickup_latitude < 45
      AND dropoff_latitude > 37
      AND dropoff_latitude < 45
      AND passenger_count > 0
  """
  if EVERY_N == None:
    if phase < 2:
      # training
      selector = "MOD(ABS(FARM_FINGERPRINT(STRING(pickup_datetime))), 4) < 2"
    else:
      selector = "MOD(ABS(FARM_FINGERPRINT(STRING(pickup_datetime))), 4) = 2"
  else:
      selector = "MOD(ABS(FARM_FINGERPRINT(STRING(pickup_datetime))), %d) = %d" % (EVERY_N, phase)
    
  query = base_query.format(selector)

  return query

sql = create_query(2, 100000)

<h2>Preprocessing Dataflow job from BigQuery </h2>

While we could read from BQ directly from TensorFlow (See: https://www.tensorflow.org/api_docs/python/tf/contrib/cloud/BigQueryReader), it is quite convenient to export to CSV and do the training off CSV.  Let's use Dataflow to do this at scale.

In [0]:
%%bash
gsutil -m rm -rf gs://$BUCKET/taxifare/taxi_preproc/

In [0]:
import os
import datetime

####
# Arguments:
#   -rowdict: Dictionary. The beam bigquery reader returns a PCollection in
#     which each row is represented as a python dictionary
# Returns:
#   -rowstring: a comma separated string representation of the record with dayofweek
#     converted from int to string (e.g. 3 --> Tue)
####
def to_csv(rowdict):
  days = ['null', 'Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']
  CSV_COLUMNS = 'fare_amount,dayofweek,hourofday,pickuplon,pickuplat,dropofflon,dropofflat,passengers,key'.split(',')
  rowdict['dayofweek'] = days[rowdict['dayofweek']]
  rowstring = ','.join([str(rowdict[k]) for k in CSV_COLUMNS])
  return rowstring


####
# Arguments:
#   -EVERY_N: Integer. Sample one out of every N rows from the full dataset.
#     Larger values will yield smaller sample
#   -RUNNER: 'DirectRunner' or 'DataflowRunner'. Specfy to run the pipeline
#     locally or on Google Cloud respectively. 
# Side-effects:
#   -Creates and executes dataflow pipeline. 
#     See https://beam.apache.org/documentation/programming-guide/#creating-a-pipeline
####
def preprocess(EVERY_N, RUNNER):
  phase_name = ['', 'train', 'valid']
  job_name = 'preprocess-taxifeatures' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
  print 'Launching Dataflow job {} ... hang on'.format(job_name)
  OUTPUT_DIR = 'gs://{0}/taxifare/taxi_preproc/'.format(BUCKET)

  #dictionary of pipeline options
  options = {
    'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'job_name': 'preprocess-taxifeatures' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S'),
    'project': PROJECT,
    'runner': RUNNER
  }
  #instantiate PipelineOptions object using options dictionary
  opts = beam.pipeline.PipelineOptions(flags=[], **options)
  #instantantiate Pipeline object using PipelineOptions
  p = beam.Pipeline(options=opts)
  for phase in [1,2]:
    query = create_query(phase, EVERY_N) 
    outfile = os.path.join(OUTPUT_DIR, '{}.csv'.format(phase_name[phase]))
    (
      p | 'read_{}'.format(phase) >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
        | 'tocsv_{}'.format(phase) >> beam.Map(to_csv)
        | 'write_{}'.format(phase) >> beam.io.Write(beam.io.WriteToText(outfile))
    )

  p.run().wait_until_finish()

Run pipeline locally using `DirectRunner`

In [0]:
#50*10000 / 2.2k/  20s
#50*1000 / 22k / 5 min
#50*100 / 220k / >60 min
preprocess(50*10000, 'DirectRunner') 
# preprocess(50*10000, 'DataflowRunner') 

To run the pipeline on cloud on a larger sample size, change the arguments to preprocess to use `DataflowRunner` and a different sample size. When running this on Cloud Dataflow, you should go to the GCP Console (https://console.cloud.google.com/dataflow) to look at the status of the job. Note that it will take several minutes for the preprocessing job to launch.

Once the job completes, observe the files created in Google Cloud Storage

In [0]:
%%bash
gsutil ls -l gs://$BUCKET/taxifare/taxi_preproc/

In [0]:
%%bash
#print first 10 lines of first shard of train.csv
gsutil cat "gs://$BUCKET/taxifare/taxi_preproc/train.csv-00000-of-*" | head

Copyright 2019 Counter Factual .AI LLC. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License