In this notebook we stage some data in GCS buckets for cloud-based training. We could do all of this in the main notebook but it's not too relevant for the demo itself, so we are breaking it out and only running it once.

# Setup GCP environment

* Your project id is the *unique* string that identifies your project (not the project name). You can find this from the GCP Console dashboard's Home page.  My dashboard reads:  <b>Project ID:</b> cloud-training-demos
* Cloud training often involves saving and restoring model files. Therefore, we should <b>create a single-region bucket</b>. If you don't have a bucket already, I suggest that you create one from the GCP console (because it will dynamically check whether the bucket name you want is available)

<b>Change the cell below</b> to reflect your Project ID and bucket name.

In [6]:
%%bash
source activate py2env
conda install -y pytz
pip uninstall -y google-cloud-dataflow
pip install --upgrade apache-beam[gcp]

Solving environment: ...working... done

## Package Plan ##

  environment location: /usr/local/envs/py2env

  added / updated specs: 
    - pytz


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    certifi-2018.10.15         |           py27_0         139 KB  defaults
    pytz-2018.5                |           py27_0         231 KB  defaults
    ------------------------------------------------------------
                                           Total:         369 KB

The following packages will be UPDATED:

    certifi: 2018.8.13-py27_0 defaults --> 2018.10.15-py27_0 defaults
    pytz:    2016.7-py27_0    defaults --> 2018.5-py27_0     defaults


Downloading and Extracting Packages
Preparing transaction: ...working... done
Verifying transaction: ...working... done
Executing transaction: ...working... done
Uninstalling google-cloud-dataflow-2.0.0:
  Successfully uninstalled google-cloud-



  current version: 4.5.10
  latest version: 4.5.11

Please update conda by running

    $ conda update -n base -c defaults conda


certifi-2018.10.15   | 139 KB    |            |   0% certifi-2018.10.15   | 139 KB    | ########## | 100% 
pytz-2018.5          | 231 KB    |            |   0% pytz-2018.5          | 231 KB    | #########  |  91% pytz-2018.5          | 231 KB    | ########## | 100% 
pandas-gbq 0.3.0 has requirement google-cloud-bigquery>=0.28.0, but you'll have google-cloud-bigquery 0.25.0 which is incompatible.
google-cloud-monitoring 0.28.0 has requirement google-cloud-core<0.29dev,>=0.28.0, but you'll have google-cloud-core 0.25.0 which is incompatible.


In [7]:
PROJECT = 'rostlab-181304'    # CHANGE THIS
BUCKET = 'rostlab-181304-ml' # REPLACE WITH YOUR BUCKET NAME. Use a regional bucket in the region you selected.
REGION = 'us-central1' # Choose an available region for Cloud MLE from https://cloud.google.com/ml-engine/docs/regions.

In [8]:
import datalab.bigquery as bq
import os
import shutil
import apache_beam as beam
import warnings
warnings.filterwarnings('ignore')
# for bash
os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION

## ensure we're using python2 env
os.environ['CLOUDSDK_PYTHON'] = 'python2'

In [9]:
%%bash
## ensure gcloud is up to date
gcloud components update

gcloud config set project $PROJECT
gcloud config set compute/region $REGION

## ensure we predict locally with our current Python environment
gcloud config set ml_engine/local_python `which python`



Your current Cloud SDK version is: 212.0.0
You will be upgraded to version: 221.0.0

+----------------------------------------------------------------------------+
|                     These components will be updated.                      |
+-----------------------------------------------------+------------+---------+
|                         Name                        |  Version   |   Size  |
+-----------------------------------------------------+------------+---------+
| BigQuery Command Line Tool                          |     2.0.35 | < 1 MiB |
| BigQuery Command Line Tool (Platform Specific)      |     2.0.34 | < 1 MiB |
| Cloud SDK Core Libraries                            | 2018.10.12 | 8.6 MiB |
| Cloud SDK Core Libraries (Platform Specific)        | 2018.09.24 | < 1 MiB |
| Cloud Storage Command Line Tool                     |       4.34 | 3.5 MiB |
| Cloud Storage Command Line Tool (Platform Specific) |       4.34 | < 1 MiB |
| gcloud Alpha Commands                     

## Specifying query to pull the data

Let's pull out a few extra columns from the timestamp.

In [10]:
def create_query(phase, EVERY_N):
  if EVERY_N == None:
    EVERY_N = 4 #use full dataset
    
  #select and pre-process fields
  base_query = """
SELECT
  (tolls_amount + fare_amount) AS fare_amount,
  DAYOFWEEK(pickup_datetime) AS dayofweek,
  HOUR(pickup_datetime) AS hourofday,
  pickup_longitude AS pickuplon,
  pickup_latitude AS pickuplat,
  dropoff_longitude AS dropofflon,
  dropoff_latitude AS dropofflat,
  passenger_count*1.0 AS passengers,
  CONCAT(STRING(pickup_datetime), STRING(pickup_longitude), STRING(pickup_latitude), STRING(dropoff_latitude), STRING(dropoff_longitude)) AS key
FROM
  [nyc-tlc:yellow.trips]
WHERE
  trip_distance > 0
  AND fare_amount >= 2.5
  AND pickup_longitude > -78
  AND pickup_longitude < -70
  AND dropoff_longitude > -78
  AND dropoff_longitude < -70
  AND pickup_latitude > 37
  AND pickup_latitude < 45
  AND dropoff_latitude > 37
  AND dropoff_latitude < 45
  AND passenger_count > 0
  """
  
  #add subsampling criteria by modding with hashkey
  if phase == 'train': 
    query = "{} AND ABS(HASH(pickup_datetime)) % {} < 2".format(base_query,EVERY_N)
  elif phase == 'valid': 
    query = "{} AND ABS(HASH(pickup_datetime)) % {} == 2".format(base_query,EVERY_N)
  elif phase == 'test':
    query = "{} AND ABS(HASH(pickup_datetime)) % {} == 3".format(base_query,EVERY_N)
  return query
    
print(create_query('valid', 100)) #example query using 1% of data


SELECT
  (tolls_amount + fare_amount) AS fare_amount,
  DAYOFWEEK(pickup_datetime) AS dayofweek,
  HOUR(pickup_datetime) AS hourofday,
  pickup_longitude AS pickuplon,
  pickup_latitude AS pickuplat,
  dropoff_longitude AS dropofflon,
  dropoff_latitude AS dropofflat,
  passenger_count*1.0 AS passengers,
  CONCAT(STRING(pickup_datetime), STRING(pickup_longitude), STRING(pickup_latitude), STRING(dropoff_latitude), STRING(dropoff_longitude)) AS key
FROM
  [nyc-tlc:yellow.trips]
WHERE
  trip_distance > 0
  AND fare_amount >= 2.5
  AND pickup_longitude > -78
  AND pickup_longitude < -70
  AND dropoff_longitude > -78
  AND dropoff_longitude < -70
  AND pickup_latitude > 37
  AND pickup_latitude < 45
  AND dropoff_latitude > 37
  AND dropoff_latitude < 45
  AND passenger_count > 0
   AND ABS(HASH(pickup_datetime)) % 100 == 2


Preprocessing Dataflow job from BigQuery This code reads from BigQuery and saves the data as-is on Google Cloud Storage. We can do additional preprocessing and cleanup inside Dataflow, but then we'll have to remember to repeat that prepreprocessing during inference. It is better to use tf.transform which will do this book-keeping for you, or to do preprocessing within your TensorFlow model. We will look at this in future notebooks. For now, we are simply moving data from BigQuery to CSV using Dataflow.
While we could read from BQ directly from TensorFlow (See: https://www.tensorflow.org/api_docs/python/tf/contrib/cloud/BigQueryReader), it is quite convenient to export to CSV and do the training off CSV. Let's use Dataflow to do this at scale.

Because we are running this on the Cloud, you should go to the GCP Console (https://console.cloud.google.com/dataflow) to look at the status of the job. It will take several minutes for the preprocessing job to launch.

In [8]:
%%bash
if gsutil ls | grep -q gs://${BUCKET}/taxifare/taxi_preproc/; then
  gsutil -m rm -rf gs://$BUCKET/taxifare/taxi_preproc/
fi

In [17]:
import datetime

####
# Arguments:
#   -rowdict: Dictionary. The beam bigquery reader returns a PCollection in
#     which each row is represented as a python dictionary
# Returns:
#   -rowstring: a comma separated string representation of the record with dayofweek
#     converted from int to string (e.g. 3 --> Tue)
####
def to_csv(rowdict):
  days = ['null', 'Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']
  CSV_COLUMNS = 'fare_amount,dayofweek,hourofday,pickuplon,pickuplat,dropofflon,dropofflat,passengers,key'.split(',')
  rowdict['dayofweek'] = days[rowdict['dayofweek']]
  rowstring = ','.join([str(rowdict[k]) for k in CSV_COLUMNS])
  return rowstring


####
# Arguments:
#   -EVERY_N: Integer. Sample one out of every N rows from the full dataset.
#     Larger values will yield smaller sample
#   -RUNNER: 'DirectRunner' or 'DataflowRunner'. Specfy to run the pipeline
#     locally or on Google Cloud respectively. 
# Side-effects:
#   -Creates and executes dataflow pipeline. 
#     See https://beam.apache.org/documentation/programming-guide/#creating-a-pipeline
####
def preprocess(EVERY_N, RUNNER):
  job_name = 'preprocess-taxifeatures' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
  print('Launching Dataflow job {} ... hang on'.format(job_name))
  OUTPUT_DIR = 'gs://{0}/taxifare/taxi_preproc/'.format(BUCKET)

  #dictionary of pipeline options
  options = {
    'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'job_name': 'preprocess-taxifeatures' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S'),
    'project': PROJECT,
    'runner': RUNNER
  }
  #instantiate PipelineOptions object using options dictionary
  opts = beam.pipeline.PipelineOptions(flags=[], **options)
  #instantantiate Pipeline object using PipelineOptions
  with beam.Pipeline(options=opts) as p:
      for phase in ['train', 'valid']:
        query = create_query(phase, EVERY_N) 
        outfile = os.path.join(OUTPUT_DIR, '{}.csv'.format(phase))
        (
          p | 'read_{}'.format(phase) >> beam.io.Read(beam.io.BigQuerySource(query=query))
            | 'tocsv_{}'.format(phase) >> beam.Map(to_csv)
            | 'write_{}'.format(phase) >> beam.io.Write(beam.io.WriteToText(outfile))
        )
  print("Done")

Run pipeline locally. This takes up to <b>5 minutes</b>.  You will see a message "Done" when it is done.

In [10]:
preprocess(50*10000, 'DirectRunner')

Launching Dataflow job preprocess-taxifeatures-181021-041751 ... hang on




Done


In [11]:
%%bash
gsutil ls -l gs://$BUCKET/taxifare/taxi_preproc/

  48668142  2018-10-21T04:46:10Z  gs://rostlab-181304-ml/taxifare/taxi_preproc/train.csv-00000-of-00001
  38751674  2018-10-21T05:20:48Z  gs://rostlab-181304-ml/taxifare/taxi_preproc/train.csv-00000-of-00002
  83124859  2018-10-21T05:20:48Z  gs://rostlab-181304-ml/taxifare/taxi_preproc/train.csv-00001-of-00002
  61397578  2018-10-21T05:19:49Z  gs://rostlab-181304-ml/taxifare/taxi_preproc/valid.csv-00000-of-00001
                                 gs://rostlab-181304-ml/taxifare/taxi_preproc/tmp/
TOTAL: 4 objects, 231942253 bytes (221.2 MiB)


## Run Beam pipeline on Cloud Dataflow
Run pipleline on cloud on a larger sample size.

In [2]:
%%bash
if gsutil ls | grep -q gs://${BUCKET}/taxifare/taxi_preproc/; then
  gsutil -m rm -rf gs://$BUCKET/taxifare/taxi_preproc/
fi

In [None]:
preprocess(20*100, 'DataflowRunner') 

#change first arg to None to preprocess full dataset

Launching Dataflow job preprocess-taxifeatures-181021-051247 ... hang on


Once the job completes, observe the files created in Google Cloud Storage

In [12]:
%%bash
gsutil ls -l gs://$BUCKET/taxifare/taxi_preproc/

  48668142  2018-10-21T04:46:10Z  gs://rostlab-181304-ml/taxifare/taxi_preproc/train.csv-00000-of-00001
  38751674  2018-10-21T05:20:48Z  gs://rostlab-181304-ml/taxifare/taxi_preproc/train.csv-00000-of-00002
  83124859  2018-10-21T05:20:48Z  gs://rostlab-181304-ml/taxifare/taxi_preproc/train.csv-00001-of-00002
  61397578  2018-10-21T05:19:49Z  gs://rostlab-181304-ml/taxifare/taxi_preproc/valid.csv-00000-of-00001
                                 gs://rostlab-181304-ml/taxifare/taxi_preproc/tmp/
TOTAL: 4 objects, 231942253 bytes (221.2 MiB)


In [13]:
%%bash
#print first 10 lines of first shard of train.csv
gsutil cat "gs://$BUCKET/taxifare/taxi_preproc/train.csv-00000-of-*" | head

8.5,Thu,22,-73.98067,40.759296,-73.95608,40.773807,1.0,2013-04-04 22:58:55.000000-73.980740.759340.7738-73.9561
6.5,Thu,22,-73.980478,40.754022,-73.961178,40.768916,2.0,2012-04-26 22:46:41.000000-73.980540.75440.7689-73.9612
16.5,Thu,22,-73.9824447632,40.7675895691,-73.9795074463,40.7259635925,3.0,2015-04-16 22:56:18.000000-73.982440.767640.726-73.9795
7.3,Thu,22,-73.954668,40.780712,-73.975593,40.778688,1.0,2010-04-15 22:11:00.000000-73.954740.780740.7787-73.9756
6.1,Thu,22,-73.991701,40.7512,-73.975416,40.746262,1.0,2010-11-25 22:05:46.000000-73.991740.751240.7463-73.9754
17.0,Thu,22,-73.991871,40.754238,-73.991871,40.754238,1.0,2013-05-30 22:58:09.000000-73.991940.754240.7542-73.9919
11.3,Thu,22,-74.002235,40.740053,-73.973771,40.789713,1.0,2011-03-24 22:22:09.000000-74.002240.740140.7897-73.9738
9.0,Thu,22,-73.99683,40.725467,-73.981288,40.74436,5.0,2013-06-06 22:19:00.000000-73.996840.725540.7444-73.9813
5.7,Thu,22,-73.954177,40.787283,-73.969479,40.785209,2.0,2010-06-17 22:06:16.

Copyright 2016 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License