# TF Transform

In [1]:
!pip install --user apache-beam[gcp]==2.16.0
!pip install --user tensorflow-transform==0.15.0

Collecting apache-beam[gcp]==2.16.0
  Downloading apache_beam-2.16.0-cp37-cp37m-manylinux1_x86_64.whl (3.0 MB)
[K     |████████████████████████████████| 3.0 MB 4.9 MB/s eta 0:00:01
Collecting pyyaml<4.0.0,>=3.12
  Downloading PyYAML-3.13.tar.gz (270 kB)
[K     |████████████████████████████████| 270 kB 55.9 MB/s eta 0:00:01
Collecting mock<3.0.0,>=1.0.1
  Downloading mock-2.0.0-py2.py3-none-any.whl (56 kB)
[K     |████████████████████████████████| 56 kB 6.4 MB/s  eta 0:00:01
Collecting httplib2<=0.12.0,>=0.8
  Downloading httplib2-0.12.0.tar.gz (218 kB)
[K     |████████████████████████████████| 218 kB 63.8 MB/s eta 0:00:01
[?25hCollecting pyarrow<0.15.0,>=0.11.1; python_version >= "3.0" or platform_system != "Windows"
  Downloading pyarrow-0.14.1-cp37-cp37m-manylinux2010_x86_64.whl (58.1 MB)
[K     |████████████████████████████████| 58.1 MB 5.9 kB/s  eta 0:00:01     |██████████████████▉             | 34.1 MB 17.5 MB/s eta 0:00:02
Collecting google-cloud-datastore<1.8.0,>=1.7.1; ex

In [1]:
!pip download tensorflow-transform==0.15.0 --no-deps

Collecting tensorflow-transform==0.15.0
  Using cached tensorflow-transform-0.15.0.tar.gz (222 kB)
  Saved ./tensorflow-transform-0.15.0.tar.gz
Successfully downloaded tensorflow-transform


In [1]:
%%bash
pip freeze | grep -e 'flow\|beam'

apache-beam==2.16.0
tensorflow==2.1.0
tensorflow-data-validation==0.21.5
tensorflow-datasets==2.0.0
tensorflow-estimator==2.1.0
tensorflow-hub==0.7.0
tensorflow-io==0.11.0
tensorflow-metadata==0.15.2
tensorflow-model-analysis==0.21.6
tensorflow-probability==0.9.0
tensorflow-serving-api==2.1.0
tensorflow-transform==0.15.0


In [2]:
import tensorflow as tf
import tensorflow_transform as tft
import shutil
print(tf.__version__)

2.1.0-dlenv_tfe


In [3]:
bucket = 'qwiklabs-gcp-03-cc939e8b47a1'
project = 'qwiklabs-gcp-03-cc939e8b47a1'
region = 'us-west1'

In [4]:
import os
os.environ['BUCKET'] = bucket
os.environ['PROJECT'] = project
os.environ['REGION'] = region

In [5]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

Updated property [core/project].
Updated property [compute/region].


In [6]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/; then
    gstuil mv -l ${REGION} gs://${BUCKET}
fi

In [7]:
from google.cloud import bigquery

def create_query(phase, EVERY_N):
    farm_fingerprint_value = 4 if EVERY_N is None else EVERY_N
    fingerprint_predicate = "< 2" if EVERY_N is None and phase < 2 else f"= {phase}" 
    return f"""
        with daynames as (
             select ['Sun', 'Mon', 'Tues', 'Wed', 'Thurs', 'Fri', 'Sat'] AS daysofweek
        )
        select
            (tolls_amount + fare_amount) AS fare_amount,
            daysofweek[ORDINAL(EXTRACT(DAYOFWEEK FROM pickup_datetime))] AS dayofweek,
            EXTRACT(HOUR FROM pickup_datetime) AS hourofday,
            pickup_longitude AS pickuplon,
            pickup_latitude AS pickuplat,
            dropoff_longitude AS dropofflon,
            dropoff_latitude AS dropofflat,
            passenger_count AS passengers,
            'notneeded' AS key
        from
            `nyc-tlc.yellow.trips`,
            daynames
        where trip_distance > 0
        and fare_amount > 0
        and abs(mod(farm_fingerprint(cast(pickup_datetime as string)), {farm_fingerprint_value})) {fingerprint_predicate}
    """

query = create_query(2, 100000)

In [8]:
df_valid = bigquery.Client().query(query).to_dataframe()
display(df_valid.head())
df_valid.describe()

Unnamed: 0,fare_amount,dayofweek,hourofday,pickuplon,pickuplat,dropofflon,dropofflat,passengers,key
0,5.0,Sun,0,-73.97427,40.751047,-73.981585,40.748087,6,notneeded
1,15.5,Mon,0,-73.97873,40.724122,-73.982335,40.762377,5,notneeded
2,16.5,Sun,0,-73.971868,40.797323,-73.933123,40.85632,1,notneeded
3,5.0,Sun,0,-73.98416,40.72915,-73.98435,40.73632,1,notneeded
4,5.0,Sun,0,-74.00383,40.722317,-74.002335,40.727867,1,notneeded


Unnamed: 0,fare_amount,hourofday,pickuplon,pickuplat,dropofflon,dropofflat,passengers
count,11181.0,11181.0,11181.0,11181.0,11181.0,11181.0,11181.0
mean,11.242599,13.244075,-72.576852,39.973146,-72.748974,40.006091,1.722118
std,9.447462,6.548354,10.133452,5.777329,12.981577,5.664887,1.351062
min,2.5,0.0,-78.133333,-73.991278,-751.4,-73.97797,0.0
25%,6.0,9.0,-73.991849,40.734954,-73.991236,40.734008,1.0
50%,8.5,14.0,-73.981824,40.75264,-73.980164,40.753427,1.0
75%,12.5,19.0,-73.967418,40.7667,-73.964153,40.767832,2.0
max,143.0,23.0,40.806487,41.366138,40.7854,41.366138,6.0


## Create ML dataset using tf.transform and dataflow

In [9]:
import datetime
import tensorflow as tf
import apache_beam as beam
import tensorflow_transform as tft
import tensorflow_metadata as tfmd
from tensorflow_transform.beam import impl as beam_impl

def is_valid(inputs):
    """Args: inputs, dict, dictionary of TableRow data from BQ"""
    try:
        pickup_longitude = inputs["pickuplon"]
        pickup_latitude = inputs["pickuplat"]
        dropoff_longitude = inputs["dropofflon"]
        dropoff_latitude = inputs["dropofflat"]
        passengers = inputs["passengers"]
        hour = inputs["hourofday"]
        day = inputs["dayofweek"]
        fare = inputs["fare_amount"]
        return (fare >= 2.5
                and pickup_longitude > -78 and pickup_longitude < -70
                and pickup_latitude > 37 and pickup_longitude < 45
                and dropoff_longitude > -78 and dropoff_longitude < -70
                and dropoff_latitude > 37 and dropoff_longitude < 45
                and passengers > 0
               )
    except:
        return False


def preprocess_tft(inputs):
    """add engineered features with tf transform.
    Args: dict, dictionary of TableRow data from BQ
    returns dict of preprocessed data after scaling and feature engineering
    """
    import datetime
    print(inputs)
    pickup_longitude = inputs["pickuplon"]
    pickup_latitude = inputs["pickuplat"]
    dropoff_longitude = inputs["dropofflon"]
    dropoff_latitude = inputs["dropofflat"]
    passengers = inputs["passengers"]
    hour = inputs["hourofday"]
    day = inputs["dayofweek"]
    fare = inputs["fare_amount"]
    
    result = {}
    result['fare'] = tf.identity(fare)
    result['day'] = tft.string_to_int(day)
    result['hour'] = tf.identity(hour)
    result['pickup_latitude'] = tft.scale_to_0_1(pickup_latitude)
    result['pickup_longitude'] = tft.scale_to_0_1(pickup_longitude)
    result['dropoff_latitude'] = tft.scale_to_0_1(dropoff_latitude)
    result['dropoff_longitude'] = tft.scale_to_0_1(dropoff_longitude)
    result['passengers'] = tf.cast(passengers, tf.float32)
    # arbitrary TF func
    result['key'] = tf.as_string(tf.ones_like(passengers))
    ## engineered features
    lat_diff = pickup_latitude - dropoff_latitude
    long_diff = pickup_longitude - dropoff_longitude
    result['lat_diff'] = tft.scale_to_0_1(lat_diff)
    result['long_diff'] = tft.scale_to_0_1(long_diff)
    dist = tf.sqrt(lat_diff ** 2 + long_diff ** 2)
    result['euclidean'] = tft.scale_to_0_1(dist)
    return result


def preprocess(in_test_mode):
    """ setup preprocessing pipeline. Arg boolean to run locally or in Dataflow"""
    import os
    import os.path
    import tempfile
    from apache_beam.io import tfrecordio
    from tensorflow_transform.coders import example_proto_coder
    from tensorflow_transform.tf_metadata import dataset_metadata, dataset_schema
    from tensorflow_transform.beam import tft_beam_io
    from tensorflow_transform.beam.tft_beam_io import transform_fn_io
    
    dt = datetime.datetime.now().strftime("%y%m%d-%H%M%S")
    job_name = f'preprocess-taxi-features-{dt}'
    OUTPUT_DIR = "./preproc_tft" if in_test_mode else f"gs://{bucket}/taxifare/preproc_tft/"
    EVERY_N = 10000 if in_test_mode else 100000
    if in_test_mode:
        import shutil
        print("Launching local job...")
        shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
    else:
        print(f"Launching dataflow job {job_name}")
        import subprocess
#         subprocess.call(f"gsutil rm -r {OUTPUT_DIR}")
    
    options = {
        'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
        'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
        'job_name': job_name,
        'project': project,
        'num_workers': 1,
        'max_num_workers': 1,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True,
        'direct_num_workers': 1,
        'extra_packages': ['tensorflow-transform-0.15.0.tar.gz']
    }
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    runner = "DirectRunner" if in_test_mode else "DataflowRunner"
    
    raw_data_schema = {
        colname: dataset_schema.ColumnSchema(tf.string, [], dataset_schema.FixedColumnRepresentation())
            for colname in ['dayofweek','key']
    }
    raw_data_schema.update({
        colname: dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.FixedColumnRepresentation())
            for colname in ['fare_amount','pickuplon','pickuplat','dropofflon','dropofflat']
    })
    raw_data_schema.update({
        colname: dataset_schema.ColumnSchema(tf.int64, [], dataset_schema.FixedColumnRepresentation())
            for colname in ['hourofday', 'passengers']
    })
    
    raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))
    
    with beam.Pipeline(runner, options=opts) as p:
        dir_tmp = os.path.join(OUTPUT_DIR, 'tmp')
        dir_rawdata_md = os.path.join(OUTPUT_DIR, 'metadata', 'rawdata_metadata')
        dir_train = os.path.join(OUTPUT_DIR, 'train')
        dir_eval = os.path.join(OUTPUT_DIR, 'eval')
        dir_md = os.path.join(OUTPUT_DIR, 'metadata')
        
        with beam_impl.Context(temp_dir=dir_tmp):
            raw_data_metadata | 'Write Input Metadata' >> tft_beam_io.WriteMetadata(dir_rawdata_md, pipeline=p)
            
            raw_data = (p
                | "Read Training from BQ" >> beam.io.Read(beam.io.BigQuerySource(query=create_query(1, EVERY_N), use_standard_sql=True))
                | "Train Filter" >> beam.Filter(is_valid)
            )
            raw_dataset = (raw_data, raw_data_metadata)
            
            raw_test_data = (p
                 | "Read Eval" >> beam.io.Read(beam.io.BigQuerySource(query=create_query(2, EVERY_N), use_standard_sql=True))
                 | "Eval Filter" >> beam.Filter(is_valid)
            )
            raw_test_dataset = (raw_test_data, raw_data_metadata)
            
            transformed_dataset, transform_fn = (raw_dataset | "Analyse and Transform Train" >> beam_impl.AnalyzeAndTransformDataset(preprocess_tft))
            transformed_data, transformed_metadata = transformed_dataset
            
            transformed_test_dataset = (raw_test_dataset, transform_fn) | "Transform Test" >> beam_impl.TransformDataset()
            transformed_test_data, _ = transformed_test_dataset
            
            transformed_data | 'Write Train Data' >> tfrecordio.WriteToTFRecord(dir_train,
                                                                                file_name_suffix='.gz',
                                                                                coder=example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))
            transformed_test_data | "Write Test Data" >> tfrecordio.WriteToTFRecord(dir_eval,
                                                                               file_name_suffix='.gz',
                                                                               coder=example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))
            
            # save transformation function to disk for use at serving time
            transform_fn | "Write Transform Function" >> transform_fn_io.WriteTransformFn(dir_md)


preprocess(in_test_mode=False)

Launching dataflow job preprocess-taxi-features-200503-110527
Instructions for updating:
ColumnSchema is a deprecated, use from_feature_spec to create a `Schema`
Instructions for updating:
Schema is a deprecated, use schema_utils.schema_from_feature_spec to create a `Schema`








{'dayofweek': <tf.Tensor 'inputs/inputs/dayofweek_copy:0' shape=(None,) dtype=string>, 'dropofflat': <tf.Tensor 'inputs/inputs/dropofflat_copy:0' shape=(None,) dtype=float32>, 'dropofflon': <tf.Tensor 'inputs/inputs/dropofflon_copy:0' shape=(None,) dtype=float32>, 'fare_amount': <tf.Tensor 'inputs/inputs/F_fare_amount_copy:0' shape=(None,) dtype=float32>, 'hourofday': <tf.Tensor 'inputs/inputs/hourofday_copy:0' shape=(None,) dtype=int64>, 'key': <tf.Tensor 'inputs/inputs/key_copy:0' shape=(None,) dtype=string>, 'passengers': <tf.Tensor 'inputs/inputs/passengers_copy:0' shape=(None,) dtype=int64>, 'pickuplat': <tf.Tensor 'inputs/inputs/pickuplat_copy:0' shape=(None,) dtype=float32>, 'pickuplon': <tf.Tensor 'inputs/inputs/pickuplon_copy:0' shape=(None,) dtype=float32>}
Instructions for updating:
Use `tft.compute_and_apply_vocabulary()` instead.


Instructions for updating:
Use `tft.compute_and_apply_vocabulary()` instead.


Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.


Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:No assets to write.


INFO:tensorflow:No assets to write.


INFO:tensorflow:SavedModel written to: gs://qwiklabs-gcp-03-cc939e8b47a1/taxifare/preproc_tft/tmp/tftransform_tmp/1397e5573e8e4a208c88c354697db6ae/saved_model.pb


INFO:tensorflow:SavedModel written to: gs://qwiklabs-gcp-03-cc939e8b47a1/taxifare/preproc_tft/tmp/tftransform_tmp/1397e5573e8e4a208c88c354697db6ae/saved_model.pb


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:Assets added to graph.


INFO:tensorflow:No assets to write.


INFO:tensorflow:No assets to write.


INFO:tensorflow:SavedModel written to: gs://qwiklabs-gcp-03-cc939e8b47a1/taxifare/preproc_tft/tmp/tftransform_tmp/c9320c2720524ca8885c9b0d48d1dfab/saved_model.pb


INFO:tensorflow:SavedModel written to: gs://qwiklabs-gcp-03-cc939e8b47a1/taxifare/preproc_tft/tmp/tftransform_tmp/c9320c2720524ca8885c9b0d48d1dfab/saved_model.pb










In [10]:
%%bash
gsutil ls gs://${BUCKET}/taxifare/preproc_tft/

gs://qwiklabs-gcp-03-cc939e8b47a1/taxifare/preproc_tft/
gs://qwiklabs-gcp-03-cc939e8b47a1/taxifare/preproc_tft/eval-00000-of-00003.gz
gs://qwiklabs-gcp-03-cc939e8b47a1/taxifare/preproc_tft/eval-00001-of-00003.gz
gs://qwiklabs-gcp-03-cc939e8b47a1/taxifare/preproc_tft/eval-00002-of-00003.gz
gs://qwiklabs-gcp-03-cc939e8b47a1/taxifare/preproc_tft/train-00000-of-00003.gz
gs://qwiklabs-gcp-03-cc939e8b47a1/taxifare/preproc_tft/train-00001-of-00003.gz
gs://qwiklabs-gcp-03-cc939e8b47a1/taxifare/preproc_tft/train-00002-of-00003.gz
gs://qwiklabs-gcp-03-cc939e8b47a1/taxifare/preproc_tft/metadata/
gs://qwiklabs-gcp-03-cc939e8b47a1/taxifare/preproc_tft/tmp/


In [18]:
%%bash
python3 -m tft_trainer.task \
    --train_data_path="gs://${BUCKET}/taxifare/preproc_tft/train*" \
    --eval_data_path="gs://${BUCKET}/taxifare/preproc_tft/eval*" \
    --output_dir="./taxi_trained"

{'day': <tf.Tensor 'ParseSingleExample/ParseExample/ParseExampleV2:0' shape=() dtype=int64>, 'dropoff_latitude': <tf.Tensor 'ParseSingleExample/ParseExample/ParseExampleV2:1' shape=() dtype=float32>, 'dropoff_longitude': <tf.Tensor 'ParseSingleExample/ParseExample/ParseExampleV2:2' shape=() dtype=float32>, 'fare': <tf.Tensor 'ParseSingleExample/ParseExample/ParseExampleV2:3' shape=() dtype=float32>, 'hour': <tf.Tensor 'ParseSingleExample/ParseExample/ParseExampleV2:4' shape=() dtype=int64>, 'passengers': <tf.Tensor 'ParseSingleExample/ParseExample/ParseExampleV2:5' shape=() dtype=float32>, 'pickup_latitude': <tf.Tensor 'ParseSingleExample/ParseExample/ParseExampleV2:6' shape=() dtype=float32>, 'pickup_longitude': <tf.Tensor 'ParseSingleExample/ParseExample/ParseExampleV2:7' shape=() dtype=float32>}
{'day': <tf.Tensor 'ParseSingleExample/ParseExample/ParseExampleV2:0' shape=() dtype=int64>, 'dropoff_latitude': <tf.Tensor 'ParseSingleExample/ParseExample/ParseExampleV2:1' shape=() dtype=

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': './taxi_trained', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distribu

In [19]:
!ls $PWD/taxi_trained/export/exporter

1588508465


In [20]:
%%writefile /tmp/test.json
{"day":0, "hour":17, "pickup_longitude": -73.885262, "pickup_latitude": 40.773008, "dropoff_longitude": -73.987232, "dropoff_latitude": 40.732403, "passengers": 2.0}

Writing /tmp/test.json


In [21]:
%%bash
sudo find "/usr/lib/google-cloud-sdk/lib/googlecloudsdk/command_lib/ml_engine" -name '*.pyc' -delete

In [22]:
%%bash
model_dir=$(ls $PWD/taxi_trained/export/exporter/)
gcloud ai-platform local predict \
    --model-dir="./taxi_trained/export/exporter/${model_dir}" \
    --json-instances="/tmp/test.json"

PREDICTIONS
[69.15485382080078]


If the signature defined in the model is not serving_default then you must specify it via --signature-name flag, otherwise the command may fail.
Instructions for updating:
non-resource variables are not supported in the long term
2020-05-03 12:26:14.034921: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2200000000 Hz
2020-05-03 12:26:14.035297: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x555ffed7d080 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-05-03 12:26:14.035333: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2020-05-03 12:26:14.035541: I tensorflow/core/common_runtime/process_util.cc:147] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.l

In [None]:
%%bash
MODEL_NAME="feateng"
MODEL_VERSION="v1"
MODEL_LOCATION=$(gsutil ls gs://${BUCKET}/taxifare/ch4/taxi_trained/export/exporter | tail -1)
echo "Run these commands one-by-one (the very first time, you'll create a model and then create a version)"
#gcloud ai-platform versions delete ${MODEL_VERSION} --model ${MODEL_NAME}
#gcloud ai-platform delete ${MODEL_NAME}
gcloud ai-platform models create ${MODEL_NAME} --regions $REGION
gcloud ai-platform versions create ${MODEL_VERSION} --model ${MODEL_NAME} --origin ${MODEL_LOCATION} --runtime-version $TFVERSION

In [None]:
gcloud ai-platform predict --model=feateng --version=v1 --json-instances=/tmp/test.json
