<h1> Exploring tf.transform </h1>

- While Pandas is fine for experimenting, for operationalization of your workflow, it is better to do preprocessing in Apache Beam. 
- This will also help if you need to preprocess data in flight, since Apache Beam also allows for streaming.

- Get specific combinations of TensorFlow/Beam that are supported by tf.transform. So make sure to get a combo that is.

    - TFT 0.5.0
    * TF 1.5 or higher
    * Apache Beam [GCP] 2.2.0 or higher

In [1]:
# install Apache-beam
%bash
pip uninstall -y google-cloud-dataflow   # Dataflow and Beam are the same-- Dataflow is the infrastructure on GCP
pip install --upgrade --force tensorflow_transform==0.5.0 apache-beam[gcp]

Uninstalling google-cloud-dataflow-2.0.0:
  Successfully uninstalled google-cloud-dataflow-2.0.0
Collecting tensorflow_transform==0.5.0
  Downloading https://files.pythonhosted.org/packages/0a/10/c80d03416eb7b5a4c95c274a2dd01a93200d1fab958cab66b59060e80eee/tensorflow-transform-0.5.0.tar.gz (111kB)
Collecting apache-beam[gcp]
  Downloading https://files.pythonhosted.org/packages/28/9f/e4be9a3fda95b34aca8e37ab53d355d346d493e7ebeac0e7d26ebeb7625d/apache_beam-2.4.0-cp27-cp27mu-manylinux1_x86_64.whl (2.1MB)
Collecting protobuf<4,>=3.3 (from tensorflow_transform==0.5.0)
  Downloading https://files.pythonhosted.org/packages/9d/61/54c3a9cfde6ffe0ca6a1786ddb8874263f4ca32e7693ad383bd8cf935015/protobuf-3.5.2.post1-cp27-cp27mu-manylinux1_x86_64.whl (6.4MB)
Collecting six<1.11,>=1.9 (from tensorflow_transform==0.5.0)
  Downloading https://files.pythonhosted.org/packages/c8/0a/b6723e1bc4c516cb687841499455a8505b44607ab535be01091c0f24f079/six-1.10.0-py2.py3-none-any.whl
Collecting grpcio<2,>=1.0 (from

You are using pip version 9.0.3, however version 10.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
    DEPRECATION: Uninstalling a distutils installed project (crcmod) has been deprecated and will be removed in a future version. This is due to the fact that uninstalling a distutils project will only partially uninstall the project.
    DEPRECATION: Uninstalling a distutils installed project (pyyaml) has been deprecated and will be removed in a future version. This is due to the fact that uninstalling a distutils project will only partially uninstall the project.
    DEPRECATION: Uninstalling a distutils installed project (certifi) has been deprecated and will be removed in a future version. This is due to the fact that uninstalling a distutils project will only partially uninstall the project.
    DEPRECATION: Uninstalling a distutils installed project (future) has been deprecated and will be removed in a future version. This is due to the 

In [1]:
# check version of flow and transform
%bash
pip freeze | grep -e 'flow\|beam'   # get any file with flow or beam

apache-airflow==1.9.0
apache-beam==2.4.0
tensorflow==1.8.0
tensorflow-transform==0.5.0


You are using pip version 9.0.3, however version 10.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.


In [9]:
# import tf
import tensorflow as tf
import tensorflow_transform as tft
import shutil
print(tf.__version__)

1.8.0


In [3]:
# change these to try this notebook out
BUCKET = 'qwiklabs-gcp-7b1b10e5e24c910c'
PROJECT = 'qwiklabs-gcp-7b1b10e5e24c910c'
REGION = 'us-west1'

In [4]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

In [5]:
%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

Updated property [core/project].
Updated property [compute/region].


## Input source: BigQuery

Get data from BigQuery but defer filtering etc. to Beam.
Note that the dayofweek column is now strings.

- Filtering and preprocessing will be done in Apache Beam. This will handle data feed during serving

In [6]:
import google.datalab.bigquery as bq
def create_query(phase, EVERY_N):
  """
  phase: 1=train 2=valid
  """
  base_query = """
WITH daynames AS
  (SELECT ['Sun', 'Mon', 'Tues', 'Wed', 'Thurs', 'Fri', 'Sat'] AS daysofweek)
SELECT
  (tolls_amount + fare_amount) AS fare_amount,
  daysofweek[ORDINAL(EXTRACT(DAYOFWEEK FROM pickup_datetime))] AS dayofweek,
  EXTRACT(HOUR FROM pickup_datetime) AS hourofday,
  pickup_longitude AS pickuplon,
  pickup_latitude AS pickuplat,
  dropoff_longitude AS dropofflon,
  dropoff_latitude AS dropofflat,
  passenger_count AS passengers,
  'notneeded' AS key
FROM
  `nyc-tlc.yellow.trips`, daynames
WHERE
  trip_distance > 0 AND fare_amount > 0
  """

  if EVERY_N == None:
    if phase < 2:
      # training
      query = "{0} AND MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING)),4) < 2".format(base_query)
    else:
      query = "{0} AND MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING)),4) = {1}".format(base_query, phase)
  else:
      query = "{0} AND MOD(ABS(FARM_FINGERPRINT(CAST(pickup_datetime AS STRING))),{1}) = {2}".format(base_query, EVERY_N, phase)
    
  return query

query = create_query(2, 100000)

In [7]:
# Get Panda dataframe
df_valid = bq.Query(query).execute().result().to_dataframe()
df_valid.head()
df_valid.describe()

Unnamed: 0,fare_amount,hourofday,pickuplon,pickuplat,dropofflon,dropofflat,passengers
count,11181.0,11181.0,11181.0,11181.0,11181.0,11181.0,11181.0
mean,11.242599,13.244075,-72.576852,39.973146,-72.748974,40.006091,1.722118
std,9.447462,6.548354,10.133452,5.777329,12.981577,5.664887,1.351062
min,2.5,0.0,-78.133333,-73.991278,-751.4,-73.97797,0.0
25%,6.0,9.0,-73.991849,40.734954,-73.991236,40.734008,1.0
50%,8.5,14.0,-73.981824,40.75264,-73.980164,40.753427,1.0
75%,12.5,19.0,-73.967418,40.7667,-73.964153,40.767832,2.0
max,143.0,23.0,40.806487,41.366138,40.7854,41.366138,6.0


## Create ML dataset using tf.transform and Dataflow

Let's use Cloud Dataflow to read in the BigQuery data and write it out as CSV files. Along the way, let's use tf.transform to do scaling and transforming. Using tf.transform allows us to save the metadata to ensure that the appropriate transformations get carried out during prediction as well.

In [None]:
# install 
%writefile requirements.txt
tensorflow-transform==0.5.0

In [11]:
import datetime
import tensorflow as tf
import apache_beam as beam
import tensorflow_transform as tft
from tensorflow_transform.beam import impl as beam_impl

# check is input from BQ and Serving JSON inputs are valid  
def is_valid(inputs):
  try:
    pickup_longitude = inputs['pickuplon']
    dropoff_longitude = inputs['dropofflon']
    pickup_latitude = inputs['pickuplat']
    dropoff_latitude = inputs['dropofflat']
    hourofday = inputs['hourofday']
    dayofweek = inputs['dayofweek']
    passenger_count = inputs['passengers']
    fare_amount = inputs['fare_amount']
    # asserting if data points satisfy these criterion
    return (fare_amount >= 2.5 and pickup_longitude > -78 and pickup_longitude < -70 \
      and dropoff_longitude > -78 and dropoff_longitude < -70 and pickup_latitude > 37 \
      and pickup_latitude < 45 and dropoff_latitude > 37 and dropoff_latitude < 45 \
      and passenger_count > 0)
  except:
    return False

# preprocess and create engineered features  
def preprocess_tft(inputs):
      import datetime   
      print inputs
      result = {}
      result['fare_amount'] = tf.identity(inputs['fare_amount'])     
      result['dayofweek'] = tft.string_to_int(inputs['dayofweek']) # builds a vocabulary learn to convert it to string
      result['hourofday'] = tf.identity(inputs['hourofday']) # pass through
      result['pickuplon'] = (tft.scale_to_0_1(inputs['pickuplon'])) # scaling numeric values between 0 and 1-- needs min and max
      result['pickuplat'] = (tft.scale_to_0_1(inputs['pickuplat']))  # in the analyze phase min and max will be calculated
      result['dropofflon'] = (tft.scale_to_0_1(inputs['dropofflon']))
      result['dropofflat'] = (tft.scale_to_0_1(inputs['dropofflat']))
      result['passengers'] = tf.cast(inputs['passengers'], tf.float32) # a cast
      result['key'] = tf.as_string(tf.ones_like(inputs['passengers'])) # arbitrary TF func
      # engineered features
      latdiff = inputs['pickuplat'] - inputs['dropofflat']
      londiff = inputs['pickuplon'] - inputs['dropofflon']
      result['latdiff'] = tft.scale_to_0_1(latdiff)
      result['londiff'] = tft.scale_to_0_1(londiff)
      dist = tf.sqrt(latdiff * latdiff + londiff * londiff)
      result['euclidean'] = dist
      return result

def preprocess(in_test_mode):
  import os
  import os.path
  import tempfile
  from apache_beam.io import tfrecordio
  from tensorflow_transform.coders import example_proto_coder
  from tensorflow_transform.tf_metadata import dataset_metadata
  from tensorflow_transform.tf_metadata import dataset_schema
  from tensorflow_transform.beam import tft_beam_io
  from tensorflow_transform.beam.tft_beam_io import transform_fn_io

  job_name = 'preprocess-taxi-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')    
  if in_test_mode:
    import shutil
    print 'Launching local job ... hang on'
    OUTPUT_DIR = './preproc_tft'
    shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
    EVERY_N = 100000
  else:
    print 'Launching Dataflow job {} ... hang on'.format(job_name)
    OUTPUT_DIR = 'gs://{0}/taxifare/preproc_tft/'.format(BUCKET)
    import subprocess
    subprocess.call('gsutil rm -r {}'.format(OUTPUT_DIR).split())
    EVERY_N = 10000
    
  options = {
    'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
    'job_name': job_name,
    'project': PROJECT,
    'max_num_workers': 24,
    'teardown_policy': 'TEARDOWN_ALWAYS',
    'no_save_main_session': True,
    'requirements_file': 'requirements.txt'   # tell Dataflow to go through requirement file ans pip install 
  }
  opts = beam.pipeline.PipelineOptions(flags=[], **options)
  if in_test_mode:
    RUNNER = 'DirectRunner'
  else:
    RUNNER = 'DataflowRunner'

  # set up metadata for of raw data coming from BQ
  raw_data_schema = {
    colname : dataset_schema.ColumnSchema(tf.string, [], dataset_schema.FixedColumnRepresentation())
                   for colname in 'dayofweek,key'.split(',')
  }
  raw_data_schema.update({
      colname : dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.FixedColumnRepresentation())
                   for colname in 'fare_amount,pickuplon,pickuplat,dropofflon,dropofflat'.split(',')
    })
  raw_data_schema.update({
      colname : dataset_schema.ColumnSchema(tf.int64, [], dataset_schema.FixedColumnRepresentation())
                   for colname in 'hourofday,passengers'.split(',')
    })
  raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))

  # run Beam  
  with beam.Pipeline(RUNNER, options=opts) as p:
    with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):
      # save the raw data metadata
      _ = (raw_data_metadata
        | 'WriteInputMetadata' >> tft_beam_io.WriteMetadata(
            os.path.join(OUTPUT_DIR, 'metadata/rawdata_metadata'),
            pipeline=p))
      
      # analyze and transform training data    
      raw_data = (p 
        | 'train_read' >> beam.io.Read(beam.io.BigQuerySource(query=create_query(1, EVERY_N), use_standard_sql=True))
        | 'train_filter' >> beam.Filter(is_valid))   # call is_valid function 

      raw_dataset = (raw_data, raw_data_metadata)
      transformed_dataset, transform_fn = (
          raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))   # analyze and transform using the scaling function from tf.transform
      transformed_data, transformed_metadata = transformed_dataset
      # take transformed data and write it out as tfrecords and compress it
      _ = transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
          os.path.join(OUTPUT_DIR, 'train'),
          file_name_suffix='.gz', 
          coder=example_proto_coder.ExampleProtoCoder(
              transformed_metadata.schema))
      
      # transform eval data
      raw_test_data = (p 
        | 'eval_read' >> beam.io.Read(beam.io.BigQuerySource(query=create_query(2, EVERY_N), use_standard_sql=True))
        | 'eval_filter' >> beam.Filter(is_valid))
      
      raw_test_dataset = (raw_test_data, raw_data_metadata)
      transformed_test_dataset = (
          (raw_test_dataset, transform_fn) | beam_impl.TransformDataset())
      transformed_test_data, _ = transformed_test_dataset
      _ = transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord(
          os.path.join(OUTPUT_DIR, 'eval'),
          file_name_suffix='.gz',
          coder=example_proto_coder.ExampleProtoCoder(
              transformed_metadata.schema))
      # write out a model that consist of tf ops that will be added into the graph when user use the model 
      _ = (transform_fn
           | 'WriteTransformFn' >>
           transform_fn_io.WriteTransformFn(os.path.join(OUTPUT_DIR, 'metadata')))

preprocess(in_test_mode=False)

Launching Dataflow job preprocess-taxi-features-180520-024657 ... hang on
{'dayofweek': <tf.Tensor 'inputs/dayofweek_copy:0' shape=(?,) dtype=string>, 'passengers': <tf.Tensor 'inputs/passengers_copy:0' shape=(?,) dtype=int64>, 'fare_amount': <tf.Tensor 'inputs/fare_amount_copy:0' shape=(?,) dtype=float32>, 'pickuplat': <tf.Tensor 'inputs/pickuplat_copy:0' shape=(?,) dtype=float32>, 'dropofflat': <tf.Tensor 'inputs/dropofflat_copy:0' shape=(?,) dtype=float32>, 'key': <tf.Tensor 'inputs/key_copy:0' shape=(?,) dtype=string>, 'hourofday': <tf.Tensor 'inputs/hourofday_copy:0' shape=(?,) dtype=int64>, 'pickuplon': <tf.Tensor 'inputs/pickuplon_copy:0' shape=(?,) dtype=float32>, 'dropofflon': <tf.Tensor 'inputs/dropofflon_copy:0' shape=(?,) dtype=float32>}
INFO:tensorflow:Assets added to graph.
INFO:tensorflow:No assets to write.
INFO:tensorflow:SavedModel written to: gs://qwiklabs-gcp-7b1b10e5e24c910c/taxifare/preproc_tft/tmp/tftransform_tmp/63c876342c0849cbad85f939ced66ab6/saved_model.pb
IN

T4: <class 'tensorflow_transform.beam.impl._ComputeAnalyzerOutputs'>
# T4
D2: <dict object at 0x7f5ce56b0b40>
T4: <class 'tensorflow_transform.analyzers.Analyzer'>
# T4
D2: <dict object at 0x7f5ce581e168>
T4: <class 'tensorflow_transform.analyzers._UniquesSpec'>
# T4
D2: <dict object at 0x7f5ce581e398>
# D2
T4: <class 'tensorflow.python.framework.ops.Tensor'>
# T4
D2: <dict object at 0x7f5ce57a6a28>
T4: <class 'tensorflow.python.framework.ops.Operation'>
# T4
D2: <dict object at 0x7f5ce57a6c58>
T4: <class 'tensorflow.python.framework.ops.Graph'>
# T4
D2: <dict object at 0x7f5ce581cc58>
D2: <dict object at 0x7f5ce581fd70>
# D2
D2: <dict object at 0x7f5ce581f910>
D2: <dict object at 0x7f5ce5799050>
T4: <class 'tensorflow_transform.analyzers._NumPyCombinerSpec'>
# T4
D2: <dict object at 0x7f5ce581e050>
F2: <function amax at 0x7f5d2af66320>
# F2
# D2
D2: <dict object at 0x7f5ce57b5a28>
D2: <dict object at 0x7f5ce57b5280>
B2: <built-in function getattr>
# B2
M2: <module 'runpy' from '/usr/l

TypeError: can't pickle SwigPyObject objects

  chunks = self.iterencode(o, _one_shot=True)


In [None]:
%bash
# ls preproc_tft
gsutil ls -l gs://${BUCKET}/taxifare/preproc_tft/
gsutil ls gs://${BUCKET}/taxifare/preproc_tft/metadata

<h2> Train off preprocessed data </h2>

In [None]:
%bash
rm -rf taxifare_tft.tar.gz taxi_trained
export PYTHONPATH=${PYTHONPATH}:$PWD/taxifare_tft
python -m trainer.task \
   --train_data_paths="gs://${BUCKET}/taxifare/preproc_tft/train*" \
   --eval_data_paths="gs://${BUCKET}/taxifare/preproc_tft/eval*"  \
   --output_dir=./taxi_trained \
   --train_steps=10 --job-dir=/tmp \
   --metadata_path=gs://${BUCKET}/taxifare/preproc_tft/metadata

In [None]:
!ls $PWD/taxi_trained/export/exporter

In [None]:
%writefile /tmp/test.json
{"dayofweek":"Thu","hourofday":17,"pickuplon": -73.885262,"pickuplat": 40.773008,"dropofflon": -73.987232,"dropofflat": 40.732403,"passengers": 2}

In [None]:
%bash
model_dir=$(ls $PWD/taxi_trained/export/exporter/)
gcloud ml-engine local predict \
    --model-dir=./taxi_trained/export/exporter/${model_dir} \
    --json-instances=/tmp/test.json

Copyright 2016 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License