# Data Preprocessing for Machine Learning

**Learning Objectives**
* Understand the different approaches for data preprocessing in developing ML models
* Use Dataflow to perform data preprocessing steps

## Introduction

In the previous notebook we achieved an RMSE of **3.85**. Let's see if we can improve upon that by creating a data preprocessing pipeline in Cloud Dataflow.

Preprocessing data for a machine learning model involves both data engineering and feature engineering. During data engineering, we convert raw data into prepared data which is necessary for the model. Feature engineering then takes that prepared data and creates the features expected by the model. We have already seen various ways we can engineer new features for a machine learning model and where those steps take place. We also have flexibility as to where data preprocessing steps can take place; for example, BigQuery, Cloud Dataflow and Tensorflow. In this lab, we'll explore different data preprocessing strategies and see how they can be accomplished with Cloud Dataflow.

One perspective in which to categorize different types of data preprocessing operations is in terms of the granularity of the operation. Here, we will consider the following three types of operations:
1. Instance-level transformations
2. Full-pass transformations
3. Time-windowed aggregations

Cloud Dataflow can perform each of these types of operations and is particularly useful when performing computationally expensive operations as it is an autoscaling service for batch and streaming data processing pipelines. We'll say a few words about each of these below. For more information, have a look at this article about [data preprocessing for machine learning from Google Cloud](https://cloud.google.com/solutions/machine-learning/data-preprocessing-for-ml-with-tf-transform-pt1).

**1. Instance-level transformations**
These are transformations which take place during training and prediction, looking only at values from a single data point. For example, they might include clipping the value of a feature, polynomially expand a feature, multiply two features, or compare two features to create a Boolean flag.

It is necessary to apply the same transformations at training time and at prediction time. Failure to do this results in training/serving skew and will negatively affect the performance of the model.

**2. Full-pass transformations**
These transformations occur during training, but occur as instance-level operations during prediction. That is, during training you must analyze the entirety of the training data to compute quantities such as maximum, minimum, mean or variance while at prediction time you need only use those values to rescale or normalize a single data point. 

A good example to keep in mind is standard scaling (z-score normalization) of features for training. You need to compute the mean and standard deviation of that feature across the whole training data set, thus it is called a full-pass transformation. At prediction time you use those previously computed values to appropriately normalize the new data point. Failure to do so results in training/serving skew.

**3. Time-windowed aggregations**
These types of transformations occur during training and at prediction time. They involve creating a feature by summarizing real-time values by aggregating over some temporal window clause. For example, if we wanted our model to estimate the taxi trip time based on the traffic metrics for the route in the last 5 minutes, in the last 10 minutes or the last 30 minutes we would want to create a time-window to aggreagate these values. 

At prediction time these aggregations have to be computed in real-time from a data stream.

### Set environment variables and load necessary libraries

Apache Beam only works in Python 2 at the moment, so switch to the Python 2 kernel in the upper right hand side. Then execute the following cells to install the necessary libraries if they have not been installed already.

In [None]:
# Ensure that we have Tensorflow 1.13 installed.
!pip3 freeze | grep tensorflow==1.13.1 || pip3 install tensorflow==1.13.1

In [None]:
#Ensure that we have Apache Beam 2.10 installed.
!pip3 freeze | grep apache-beam==2.10.0 || pip3 install apache-beam[gcp]

In [1]:
import tensorflow as tf
import apache_beam as beam
import shutil
import os
print(tf.__version__)

1.14.0


Next, set the environment variables related to your GCP Project.

In [2]:
PROJECT = "qwiklabs-gcp-636667ae83e902b6"  # Replace with your PROJECT
BUCKET =  "qwiklabs-gcp-636667ae83e902b6_al"  # Replace with your BUCKET
REGION = "us-east1"            # Choose an available region for AI Platform  
TFVERSION = "1.13"                # TF version for AI Platform

In [3]:
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = TFVERSION 

In [4]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

## ensure we predict locally with our current Python environment
gcloud config set ml_engine/local_python `which python`

Updated property [core/project].
Updated property [compute/region].
Updated property [ml_engine/local_python].


## Create data preprocessing job with Cloud Dataflow

The following code reads from BigQuery and saves the data as-is on Google Cloud Storage. We could also do additional preprocessing and cleanup inside Dataflow. Note that, in this case we'd have to remember to repeat that prepreprocessing at prediction time to avoid training/serving skew. In general, it is better to use tf.transform which will do this book-keeping for you, or to do preprocessing within your TensorFlow model. We will look at how tf.transform works in another notebook. For now, we are simply moving data from BigQuery to CSV using Dataflow.

It's worth noting that while we could read from [BQ directly from TensorFlow](https://www.tensorflow.org/api_docs/python/tf/contrib/cloud/BigQueryReader), it is quite convenient to export to CSV and do the training off CSV. We can do this at scale with Cloud Dataflow. Furthermore, because we are running this on the cloud, you should go to the [GCP Console](https://console.cloud.google.com/dataflow) to view the status of the job. It will take several minutes for the preprocessing job to launch.

### Define our query and pipeline functions

To start we'll copy over the `create_query` function we created in the `01_bigquery/c_extract_and_benchmark` notebook. 

In [5]:
def create_query(phase, sample_size):
    basequery = """
    SELECT
        (tolls_amount + fare_amount) AS fare_amount,
        EXTRACT(DAYOFWEEK from pickup_datetime) AS dayofweek,
        EXTRACT(HOUR from pickup_datetime) AS hourofday,
        pickup_longitude AS pickuplon,
        pickup_latitude AS pickuplat,
        dropoff_longitude AS dropofflon,
        dropoff_latitude AS dropofflat
    FROM
        `nyc-tlc.yellow.trips`
    WHERE
        trip_distance > 0
        AND fare_amount >= 2.5
        AND pickup_longitude > -78
        AND pickup_longitude < -70
        AND dropoff_longitude > -78
        AND dropoff_longitude < -70
        AND pickup_latitude > 37
        AND pickup_latitude < 45
        AND dropoff_latitude > 37
        AND dropoff_latitude < 45
        AND passenger_count > 0
        AND MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING))), EVERY_N) = 1
    """

    if phase == "TRAIN":
        subsample = """
        AND MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING))), EVERY_N * 100) >= (EVERY_N * 0)
        AND MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING))), EVERY_N * 100) <  (EVERY_N * 70)
        """
    elif phase == "VALID":
        subsample = """
        AND MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING))), EVERY_N * 100) >= (EVERY_N * 70)
        AND MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING))), EVERY_N * 100) <  (EVERY_N * 85)
        """
    elif phase == "TEST":
        subsample = """
        AND MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING))), EVERY_N * 100) >= (EVERY_N * 85)
        AND MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING))), EVERY_N * 100) <  (EVERY_N * 100)
        """

    query = basequery + subsample
    return query.replace("EVERY_N", sample_size)

Then, we'll write the csv we create to a Cloud Storage bucket. So, we'll look to see that the location is empty, and if not clear out its contents so that it is.

In [6]:
%%bash
if gsutil ls | grep -q gs://${BUCKET}/taxifare/ch4/taxi_preproc/; then
    gsutil -m rm -rf gs://$BUCKET/taxifare/ch4/taxi_preproc/
fi

Next, we'll create a function and pipeline for preprocessing the data. First, we'll define a `to_csv` function which takes a row dictionary (a dictionary created from a BigQuery reader representing each row of a dataset) and returns a comma separated string for each record

In [7]:
def to_csv(rowdict):
    """
    Arguments:
        -rowdict: Dictionary. The beam bigquery reader returns a PCollection in
        which each row is represented as a python dictionary
    Returns:
        -rowstring: a comma separated string representation of the record
    """
    days = ["null", "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"]
    CSV_COLUMNS = "fare_amount,dayofweek,hourofday,pickuplon,pickuplat,dropofflon,dropofflat".split(',')
    rowstring = ','.join([str(rowdict[k]) for k in CSV_COLUMNS])
    return rowstring

Next, we define our primary preprocessing function. Reading through the code this creates a pipeline to read data from BigQuery, use our `to_csv` function above to make a comma separated string, then write to a file in Google Cloud Storage. 

#### **Exercise 1**

In the code below, complete the pipeline to accomplish the tasks stated above. Have a look at [the Apache Beam documentation](https://beam.apache.org/documentation/) to remind yourself how to [read data from BigQuery](https://beam.apache.org/documentation/io/built-in/google-bigquery/) and apply map functions. Then write the comma separated string to a file in Cloud Storage. 

In [12]:
import datetime

import datetime

def preprocess(EVERY_N, RUNNER):
    """
    Arguments:
        -EVERY_N: Integer. Sample one out of every N rows from the full dataset.
        Larger values will yield smaller sample
        -RUNNER: "DirectRunner" or "DataflowRunner". Specfy to run the pipeline
        locally or on Google Cloud respectively. 
    Side-effects:
        -Creates and executes dataflow pipeline. 
        See https://beam.apache.org/documentation/programming-guide/#creating-a-pipeline
    """
    job_name = "preprocess-taxifeatures" + "-" + datetime.datetime.now().strftime("%y%m%d-%H%M%S")
    print("Launching Dataflow job {} ... hang on".format(job_name))
    OUTPUT_DIR = "gs://{0}/taxifare/ch4/taxi_preproc/".format(BUCKET)

    #dictionary of pipeline options
    options = {
        "staging_location": os.path.join(OUTPUT_DIR, "tmp", "staging"),
        "temp_location": os.path.join(OUTPUT_DIR, "tmp"),
        "job_name": "preprocess-taxifeatures" + "-" + datetime.datetime.now().strftime("%y%m%d-%H%M%S"),
        "project": PROJECT,
        "runner": RUNNER
    }
  
    #instantiate PipelineOptions object using options dictionary
    opts = beam.pipeline.PipelineOptions(flags = [], **options)

    #instantantiate Pipeline object using PipelineOptions
    with beam.Pipeline(options=opts) as p:
        for phase in ["TRAIN", "VALID", "TEST"]:
            query = create_query(phase, EVERY_N)
            outfile = os.path.join(OUTPUT_DIR, "{}.csv".format(phase))
            (
                p | "read_{}".format(phase) >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
                  | "tocsv_{}".format(phase) >> beam.Map(to_csv)
                  | "write_{}".format(phase) >> beam.io.WriteToText(outfile)
            )
    print("Done")

Now that we have the preprocessing pipeline function, we can execute the pipeline locally or on the cloud. To run our pipeline locally, we specify the `RUNNER` variable as `DirectRunner`. To run our pipeline in the cloud, we set `RUNNER` to be `DataflowRunner`. In either case, this variable is passed to the options dictionary that we use to instantiate the pipeline. 

As with training a model, it is good practice to test your preprocessing pipeline locally with a subset of your data before running it against your entire dataset.

### Run Beam pipeline locally

We'll start by testing our pipeline locally. This takes upto 5 minutes. You will see a message "Done" when it has finished.

In [13]:
preprocess("50*10000", "DirectRunner")

Launching Dataflow job preprocess-taxifeatures-190719-163930 ... hang on


W0719 16:39:31.587076 140470923552512 bigquery_tools.py:543] Dataset qwiklabs-gcp-636667ae83e902b6:temp_dataset_850c5973b8554a08b48c23e27a67aa32 does not exist so we will create it as temporary with location=US
W0719 16:39:42.909831 140470923552512 bigquery_tools.py:543] Dataset qwiklabs-gcp-636667ae83e902b6:temp_dataset_2a660e479a6a4a4c9a5361c5572b0ad4 does not exist so we will create it as temporary with location=US
W0719 16:39:53.460899 140470923552512 bigquery_tools.py:543] Dataset qwiklabs-gcp-636667ae83e902b6:temp_dataset_7a2961553e2d4a0285d08cb99da9eca4 does not exist so we will create it as temporary with location=US


Done


### Run Beam pipeline on Cloud Dataflow¶

Again, we'll clear out our bucket to GCS to ensure a fresh run.

In [14]:
%%bash
if gsutil ls -r gs://${BUCKET} | grep -q gs://${BUCKET}/taxifare/ch4/taxi_preproc/; then
    gsutil -m rm -rf gs://${BUCKET}/taxifare/ch4/taxi_preproc/
fi

Removing gs://qwiklabs-gcp-636667ae83e902b6_al/taxifare/ch4/taxi_preproc/TEST.csv-00000-of-00001#1563554410859084...
Removing gs://qwiklabs-gcp-636667ae83e902b6_al/taxifare/ch4/taxi_preproc/TRAIN.csv-00000-of-00002#1563554412172701...
Removing gs://qwiklabs-gcp-636667ae83e902b6_al/taxifare/ch4/taxi_preproc/TRAIN.csv-00001-of-00002#1563554412168483...
Removing gs://qwiklabs-gcp-636667ae83e902b6_al/taxifare/ch4/taxi_preproc/VALID.csv-00000-of-00001#1563554409796167...
/ [4/4 objects] 100% Done                                                       
Operation completed over 4 objects.                                              


The following step will take **15-20 minutes**. Monitor job progress on the Dataflow section of [Cloud Console](https://pantheon.corp.google.com/dataflow?project=munn-sandbox&folder&organizationId=433637338589). Note, you can change the first arugment to "None" to process the full dataset.

In [15]:
preprocess("50*100", "DataflowRunner")

Launching Dataflow job preprocess-taxifeatures-190719-164054 ... hang on
Done


Once the job finishes, we can look at the files that have been created and have a look at what they contain. You will notice that the files have been sharded into many csv files.

In [16]:
%%bash
gsutil ls -l gs://$BUCKET/taxifare/ch4/taxi_preproc/

   1758414  2019-07-19T16:46:10Z  gs://qwiklabs-gcp-636667ae83e902b6_al/taxifare/ch4/taxi_preproc/TEST.csv-00000-of-00001
   7813761  2019-07-19T16:46:50Z  gs://qwiklabs-gcp-636667ae83e902b6_al/taxifare/ch4/taxi_preproc/TRAIN.csv-00000-of-00001
   1638718  2019-07-19T16:46:20Z  gs://qwiklabs-gcp-636667ae83e902b6_al/taxifare/ch4/taxi_preproc/VALID.csv-00000-of-00001
                                 gs://qwiklabs-gcp-636667ae83e902b6_al/taxifare/ch4/taxi_preproc/tmp/
TOTAL: 3 objects, 11210893 bytes (10.69 MiB)


In [17]:
%%bash
gsutil cat "gs://$BUCKET/taxifare/ch4/taxi_preproc/TRAIN.csv-00000-of-*" | head

2.5,3,0,-73.912556,40.847338,-73.912556,40.847337
2.5,1,0,-73.983477,40.741278,-73.984838,40.741248
2.5,2,0,-73.78817,40.641467,-73.794338,40.664787
2.5,2,0,-73.987925,40.756263,-73.985725,40.756558
2.5,5,0,-74.001277,40.741765,-73.99112,40.751328
2.5,6,0,-74.004138,40.743293,-74.003546,40.743132
2.5,6,0,-73.991488,40.728046,-73.99002,40.729696
2.5,5,0,-73.974525,40.783234,-73.972104,40.782116
2.5,7,0,-73.781832,40.644773,-73.78104,40.645017
2.5,1,0,-74.000582,40.72078,-74.001115,40.720127


### Develop a model with new inputs

We can now develop a model with these inputs. Download the first shard of the preprocessed data to a subfolder called `sample` so we can develop locally first. 

In [18]:
%%bash
if [ -d sample ]; then
    rm -rf sample
fi
mkdir sample
gsutil cat "gs://$BUCKET/taxifare/ch4/taxi_preproc/TRAIN.csv-00000-of-*" > sample/train.csv
gsutil cat "gs://$BUCKET/taxifare/ch4/taxi_preproc/VALID.csv-00000-of-*" > sample/valid.csv

To begin let's copy the `model.py` and `task.py` we developed in the previous notebooks here.

In [21]:
!mkdir ./taxifaremodel
!cp -r ../../03_model_performance/taxifaremodel/* ./taxifaremodel

Let's have a look at the files contained within the `taxifaremodel` folder. Within `model.py` we see that `feature_cols` has  three engineered features. 

In [22]:
%%bash
grep -A 15 "feature_cols =" taxifaremodel/model.py

feature_cols = [
  #1. Engineered using tf.feature_column module
  tf.feature_column.indicator_column(categorical_column = fc_day_hr),
  fc_bucketized_plat,
  fc_bucketized_plon,
  fc_bucketized_dlat,
  fc_bucketized_dlon,
  #2. Engineered in input functions
  tf.feature_column.numeric_column(key = "latdiff"),
  tf.feature_column.numeric_column(key = "londiff"),
  tf.feature_column.numeric_column(key = "euclidean_dist") 
]

#3. Serving Input Receiver Function
def serving_input_receiver_fn():
    receiver_tensors = {


We can also see the engineered features that are created by the `add_engineered_features` function here.

In [23]:
%%bash
grep -A 5 "add_engineered_features(" taxifaremodel/model.py

        features = add_engineered_features(features)
        
        # Separate the label from the features
        label = features.pop("fare_amount") # remove label from features and store

        return features, label
--
def add_engineered_features(features):
    features["dayofweek"] = features["dayofweek"] - 1 # subtract one since our days of week are 1-7 instead of 0-6
    
    features["latdiff"] = features["pickuplat"] - features["dropofflat"] # East/West
    features["londiff"] = features["pickuplon"] - features["dropofflon"] # North/South
    features["euclidean_dist"] = tf.sqrt(features["latdiff"]**2 + features["londiff"]**2)
--
    features = add_engineered_features(receiver_tensors) # 'features' is what is passed on to the model
    
    return tf.estimator.export.ServingInputReceiver(features = features, receiver_tensors = receiver_tensors)
  
#4. Train and Evaluate
def train_and_evaluate(params):


We can try out this model on the local sample we've created to make sure everything works as expected. Note, this takes about **5 minutes** to complete.

In [24]:
%%bash
rm -rf taxifare.tar.gz taxi_trained
export PYTHONPATH=${PYTHONPATH}:${PWD}/taxifare
python -m taxifaremodel.task \
    --train_data_path=${PWD}/sample/train.csv \
    --eval_data_path=${PWD}/sample/valid.csv  \
    --output_dir=${PWD}/taxi_trained \
    --train_steps=10 \
    --job-dir=/tmp

1.14.0


W0719 17:52:07.814587 139853308237248 lazy_loader.py:50] 
The TensorFlow contrib module will not be included in TensorFlow 2.0.
For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
  * https://github.com/tensorflow/io (for I/O related ops)
If you depend on functionality not listed there, please file an issue.

W0719 17:52:07.815973 139853308237248 deprecation_wrapper.py:119] From taxifaremodel/model.py:143: The name tf.logging.set_verbosity is deprecated. Please use tf.compat.v1.logging.set_verbosity instead.

W0719 17:52:07.816164 139853308237248 deprecation_wrapper.py:119] From taxifaremodel/model.py:143: The name tf.logging.INFO is deprecated. Please use tf.compat.v1.logging.INFO instead.

I0719 17:52:07.816512 139853308237248 estimator_training.py:186] Not using Distribute Coordinator.
I0719 17:52:07.816709 139853308237248 training.py:612] Running training and evaluation lo

We've only done 10 training steps, so we don't expect the model to have good performance. Let's have a look at the exported files from our training job. 

In [25]:
%%bash
ls -R taxi_trained/export

taxi_trained/export:
exporter

taxi_trained/export/exporter:
1563558736

taxi_trained/export/exporter/1563558736:
saved_model.pb
variables

taxi_trained/export/exporter/1563558736/variables:
variables.data-00000-of-00002
variables.data-00001-of-00002
variables.index


You can use `saved_model_cli` to look at the exported signature. Note that the model doesn't need any of the engineered features as inputs. It will compute latdiff, londiff, euclidean from the provided inputs, thanks to the add_engineered call in the serving_input_fn.

In [26]:
%%bash
model_dir=$(ls ${PWD}/taxi_trained/export/exporter | tail -1)
saved_model_cli show --dir ${PWD}/taxi_trained/export/exporter/${model_dir} --all


MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['predict']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['dayofweek'] tensor_info:
        dtype: DT_INT32
        shape: (-1)
        name: sub:0
    inputs['dropofflat'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: Placeholder_4:0
    inputs['dropofflon'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: Placeholder_5:0
    inputs['euclidean_dist'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: Sqrt:0
    inputs['hourofday'] tensor_info:
        dtype: DT_INT32
        shape: (-1)
        name: Placeholder_1:0
    inputs['latdiff'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: sub_1:0
    inputs['londiff'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: sub_2:0
    inputs['pickuplat'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
    

To test out prediciton with out model, we create a temporary json file containing the expected feature values.

In [27]:
%%writefile /tmp/test.json
{"dayofweek": 0, "hourofday": 17, "pickuplon": -73.885262, "pickuplat": 40.773008, "dropofflon": -73.987232, "dropofflat": 40.732403}

Writing /tmp/test.json


In [34]:
%%bash
model_dir=$(ls ${PWD}/taxi_trained/export/exporter)
gcloud ai-platform local predict \
    --model-dir=${PWD}/taxi_trained/export/exporter/${model_dir} \
    --json-instances=/tmp/test.json

PREDICTIONS
[5.766441822052002]


To enable them in non-MKL-DNN operations, rebuild TensorFlow with the appropriate compiler flags.
2019-07-19 18:03:53.046729: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2200000000 Hz
2019-07-19 18:03:53.047251: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x559e9ab47200 executing computations on platform Host. Devices:
2019-07-19 18:03:53.047294: I tensorflow/compiler/xla/service/service.cc:175]   StreamExecutor device (0): <undefined>, <undefined>
2019-07-19 18:03:53.047761: I tensorflow/core/common_runtime/process_util.cc:115] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
W0719 18:03:53.048280 140693120812480 deprecation.py:323] From /usr/lib/google-cloud-sdk/lib/third_party/ml_sdk/cloud/ml/prediction/frameworks/tf_prediction_lib.py:210: load (from tensorflow.python.saved_model.loader_impl) is deprecated and will be removed in a future version.
Instructions for up

## Train on the Cloud

This will take 10-15 minutes even though the prompt immediately returns after the job is submitted. Monitor job progress on the [ML Engine section of Cloud Console](https://pantheon.corp.google.com/mlengine/jobs) and wait for the training job to complete.

In [29]:
%%bash
OUTDIR=gs://${BUCKET}/taxifare/ch4/taxi_trained
JOBNAME=lab4a_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ai-platform jobs submit training $JOBNAME \
    --region=$REGION \
    --module-name=taxifaremodel.task \
    --package-path=${PWD}/taxifaremodel \
    --job-dir=$OUTDIR \
    --staging-bucket=gs://$BUCKET \
    --scale-tier=BASIC \
    --runtime-version=$TFVERSION \
    -- \
    --train_data_path="gs://${BUCKET}/taxifare/ch4/taxi_preproc/TRAIN*" \
    --eval_data_path="gs://${BUCKET}/taxifare/ch4/taxi_preproc/VALID*"  \
    --train_steps=5000 \
    --output_dir=$OUTDIR

gs://qwiklabs-gcp-636667ae83e902b6_al/taxifare/ch4/taxi_trained us-east1 lab4a_190719_175328
jobId: lab4a_190719_175328
state: QUEUED


CommandException: 1 files/objects could not be removed.
Job [lab4a_190719_175328] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ai-platform jobs describe lab4a_190719_175328

or continue streaming the logs with the command

  $ gcloud ai-platform jobs stream-logs lab4a_190719_175328


Once the model has finished training on the cloud, we can check the export folder to see that a model has been correctly saved. 

In [30]:
%%bash
gsutil ls gs://${BUCKET}/taxifare/ch4/taxi_trained/export/exporter | tail -1

gs://qwiklabs-gcp-636667ae83e902b6_al/taxifare/ch4/taxi_trained/export/exporter/1563559162/


As before, we can use the `saved_model_cli` to examine the exported signature.

In [31]:
%%bash
model_dir=$(gsutil ls gs://${BUCKET}/taxifare/ch4/taxi_trained/export/exporter | tail -1)
saved_model_cli show --dir ${model_dir} --all


MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['predict']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['dayofweek'] tensor_info:
        dtype: DT_INT32
        shape: (-1)
        name: sub:0
    inputs['dropofflat'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: Placeholder_4:0
    inputs['dropofflon'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: Placeholder_5:0
    inputs['euclidean_dist'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: Sqrt:0
    inputs['hourofday'] tensor_info:
        dtype: DT_INT32
        shape: (-1)
        name: Placeholder_1:0
    inputs['latdiff'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: sub_1:0
    inputs['londiff'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: sub_2:0
    inputs['pickuplat'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
    

And check out model's prediction with a local predict job on our test file. 

In [33]:
%%bash
model_dir=$(gsutil ls gs://${BUCKET}/taxifare/ch4/taxi_trained/export/exporter | tail -1)
gcloud ai-platform local predict \
    --model-dir=${model_dir} \
    --json-instances=/tmp/test.json

PREDICTIONS
[26.38582420349121]


To enable them in non-MKL-DNN operations, rebuild TensorFlow with the appropriate compiler flags.
2019-07-19 18:02:49.116193: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2200000000 Hz
2019-07-19 18:02:49.116574: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x558e6bab4f80 executing computations on platform Host. Devices:
2019-07-19 18:02:49.116621: I tensorflow/compiler/xla/service/service.cc:175]   StreamExecutor device (0): <undefined>, <undefined>
2019-07-19 18:02:49.117014: I tensorflow/core/common_runtime/process_util.cc:115] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
W0719 18:02:49.117352 140454869448128 deprecation.py:323] From /usr/lib/google-cloud-sdk/lib/third_party/ml_sdk/cloud/ml/prediction/frameworks/tf_prediction_lib.py:210: load (from tensorflow.python.saved_model.loader_impl) is deprecated and will be removed in a future version.
Instructions for up

Copyright 2019 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.