In [1]:
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow_io.bigquery import BigQueryClient
import datetime

In [2]:
print(tf.__version__)

1.15.3


### Input function

In [3]:
PROJECT_ID = "event-driven-ml"
DATASET_GCP_PROJECT_ID = "event-driven-ml"
DATASET_ID = "edml_nyc_yellow_taxi_us"
TABLE_ID = "viz_gis_feat_eng"

In [4]:
INPUT_COLUMNS = [
    "uuid", "dayofweek", "hourofday", "weekofyear", "pickup_zone_name", "dropoff_zone_name", "centroide_distance",
    "flag_airport", "flag_5_seat_car", "trip_duration"
]
LABEL_COLUMN = "trip_duration"
KEY_COLUMN = "uuid"

# Set default values for each column
DEFAULTS = [
    ['no_key'], [7], [25], [54], [""], [""], [0.0], [2], [2], []
]

NEMBEDS = 15

In [5]:
# Create an input function reading a file using the Dataset API
# Then provide the results to the Estimator API
def read_dataset(suffix, mode, batch_size):
    
    def _input_fn():
        client = BigQueryClient()
        read_session = client.read_session(
            parent="projects/" + PROJECT_ID,
            project_id=DATASET_GCP_PROJECT_ID,
            table_id="{}_{}".format(TABLE_ID, suffix), 
            dataset_id=DATASET_ID,
            selected_fields=INPUT_COLUMNS,
            output_types=[
                tf.string, tf.int64, tf.int64, tf.int64, tf.string, tf.string, tf.float64, tf.int64, tf.int64, tf.int64
            ],
            requested_streams=10
        )
        
        def decode_row(records):
            features = records
            label = tf.cast(features.pop(LABEL_COLUMN), tf.float32)
            return features, label
        
        dataset = read_session.parallel_read_rows(sloppy=True).map(decode_row)

        if mode == tf.estimator.ModeKeys.TRAIN:
            num_epochs = None  # indefinitely
            dataset = dataset.shuffle(buffer_size=1000*batch_size, seed=1234)
        else:
            num_epochs = 1  # end-of-input after this

        dataset = dataset.repeat(num_epochs).batch(batch_size)
        return dataset
    
    return _input_fn

### Feature columns

In [12]:
VOCABULARY = [
    'Allerton/Pelham Gardens', 'Alphabet City', 'Arden Heights', 'Arrochar/Fort Wadsworth', 
    'Astoria', 'Astoria Park', 'Auburndale', 'Baisley Park', 'Bath Beach', 'Battery Park',
    'Battery Park City', 'Bay Ridge', 'Bay Terrace/Fort Totten', 'Bayside', 'Bedford', 
    'Bedford Park', 'Bellerose', 'Belmont', 'Bensonhurst East', 'Bensonhurst West',
    'Bloomfield/Emerson Hill', 'Bloomingdale', 'Boerum Hill', 'Borough Park', 
    'Breezy Point/Fort Tilden/Riis Beach', 'Briarwood/Jamaica Hills', 'Brighton Beach',
    'Broad Channel', 'Bronx Park', 'Bronxdale', 'Brooklyn Heights', 'Brooklyn Navy Yard', 
    'Brownsville', 'Bushwick North', 'Bushwick South', 'Cambria Heights', 'Canarsie', 'Carroll Gardens',
    'Central Harlem', 'Central Harlem North', 'Central Park', 'Charleston/Tottenville', 'Chinatown',
    'City Island', 'Claremont/Bathgate', 'Clinton East', 'Clinton Hill', 'Clinton West', 'Co-Op City',
    'Cobble Hill', 'College Point', 'Columbia Street', 'Coney Island', 'Corona', 'Country Club', 'Crotona Park',
    'Crotona Park East', 'Crown Heights North', 'Crown Heights South', 'Cypress Hills', 'DUMBO/Vinegar Hill',
    'Douglaston', 'Downtown Brooklyn/MetroTech', 'Dyker Heights', 'East Chelsea', 'East Concourse/Concourse Village',
    'East Elmhurst', 'East Flatbush/Farragut', 'East Flatbush/Remsen Village', 'East Flushing', 'East Harlem North',
    'East Harlem South', 'East New York', 'East New York/Pennsylvania Avenue', 'East Tremont', 'East Village',
    'East Williamsburg', 'Eastchester', 'Elmhurst', 'Elmhurst/Maspeth', "Eltingville/Annadale/Prince's Bay", 'Erasmus',
    'Far Rockaway', 'Financial District North', 'Financial District South', 'Flatbush/Ditmas Park', 'Flatiron',
    'Flatlands', 'Flushing', 'Flushing Meadows-Corona Park', 'Fordham South', 'Forest Hills',
    'Forest Park/Highland Park', 'Fort Greene', 'Fresh Meadows', 'Freshkills Park', 'Garment District', 'Glen Oaks',
    'Glendale', 'Gowanus', 'Gramercy', 'Gravesend', 'Great Kills', 'Great Kills Park', 'Green-Wood Cemetery',
    'Greenpoint', 'Greenwich Village North', 'Greenwich Village South', 'Grymes Hill/Clifton', 'Hamilton Heights',
    'Hammels/Arverne', 'Heartland Village/Todt Hill', 'Highbridge', 'Highbridge Park', 'Hillcrest/Pomonok', 'Hollis',
    'Homecrest', 'Howard Beach', 'Hudson Sq', 'Hunts Point', 'Inwood', 'Inwood Hill Park', 'JFK Airport',
    'Jackson Heights', 'Jamaica', 'Jamaica Bay', 'Jamaica Estates', 'Kensington', 'Kew Gardens', 'Kew Gardens Hills',
    'Kingsbridge Heights', 'Kips Bay', 'LaGuardia Airport', 'Laurelton', 'Lenox Hill East', 'Lenox Hill West',
    'Lincoln Square East', 'Lincoln Square West', 'Little Italy/NoLiTa', 'Long Island City/Hunters Point',
    'Long Island City/Queens Plaza', 'Longwood', 'Lower East Side', 'Madison', 'Manhattan Beach', 'Manhattan Valley',
    'Manhattanville', 'Marble Hill', 'Marine Park/Floyd Bennett Field', 'Marine Park/Mill Basin', 'Mariners Harbor',
    'Maspeth', 'Meatpacking/West Village West', 'Melrose South', 'Middle Village', 'Midtown Center', 'Midtown East',
    'Midtown North', 'Midtown South', 'Midwood', 'Morningside Heights', 'Morrisania/Melrose', 'Mott Haven/Port Morris',
    'Mount Hope', 'Murray Hill', 'Murray Hill-Queens', 'New Dorp/Midland Beach', 'Newark Airport', 'North Corona',
    'Norwood', 'Oakland Gardens', 'Oakwood', 'Ocean Hill', 'Ocean Parkway South', 'Old Astoria', 'Ozone Park',
    'Park Slope', 'Parkchester', 'Pelham Bay', 'Pelham Bay Park', 'Pelham Parkway', 'Penn Station/Madison Sq West',
    'Port Richmond', 'Prospect Heights', 'Prospect Park', 'Prospect-Lefferts Gardens', 'Queens Village',
    'Queensboro Hill', 'Queensbridge/Ravenswood', 'Randalls Island', 'Red Hook', 'Rego Park', 'Richmond Hill',
    'Ridgewood', 'Rikers Island', 'Riverdale/North Riverdale/Fieldston', 'Rockaway Park', 'Roosevelt Island', 'Rosedale',
    'Rossville/Woodrow', 'Saint Albans', 'Saint George/New Brighton', 'Saint Michaels Cemetery/Woodside',
    'Schuylerville/Edgewater Park', 'Seaport', 'Sheepshead Bay', 'SoHo', 'Soundview/Bruckner', 'Soundview/Castle Hill',
    'South Beach/Dongan Hills', 'South Jamaica', 'South Ozone Park', 'South Williamsburg', 'Springfield Gardens North',
    'Springfield Gardens South', 'Spuyten Duyvil/Kingsbridge', 'Stapleton', 'Starrett City', 'Steinway',
    'Stuy Town/Peter Cooper Village', 'Stuyvesant Heights', 'Sunnyside', 'Sunset Park East', 'Sunset Park West',
    'Sutton Place/Turtle Bay North', 'Times Sq/Theatre District', 'TriBeCa/Civic Center', 'Two Bridges/Seward Park',
    'UN/Turtle Bay South', 'Union Sq', 'University Heights/Morris Heights', 'Upper East Side North',
    'Upper East Side South', 'Upper West Side North', 'Upper West Side South', 'Van Cortlandt Park',
    'Van Cortlandt Village', 'Van Nest/Morris Park', 'Washington Heights North', 'Washington Heights South',
    'West Brighton', 'West Chelsea/Hudson Yards', 'West Concourse', 'West Farms/Bronx River', 'West Village',
    'Westchester Village/Unionport', 'Westerleigh', 'Whitestone', 'Willets Point', 'Williamsbridge/Olinville',
    'Williamsburg (North Side)', 'Williamsburg (South Side)', 'Windsor Terrace', 'Woodhaven', 'Woodlawn/Wakefield',
    'Woodside', 'World Trade Center', 'Yorkville East', 'Yorkville West'
]

_CATEGORICAL_STR_VOCAB = {
    "pickup_zone_name": VOCABULARY,
    "dropoff_zone_name": VOCABULARY
}

_CATEGORICAL_NUM_BUCKETS = {
    "dayofweek": 7,
    "hourofday": 24,
    "weekofyear": 53
}

In [6]:
def get_wide_deep():
    
    # One hot encode categorical features
    fc_dayofweek = tf.feature_column.categorical_column_with_identity(key="dayofweek",
                                                                                num_buckets=_CATEGORICAL_NUM_BUCKETS["dayofweek"])
    
    fc_hourofday = tf.feature_column.categorical_column_with_identity(key="hourofday",
                                                                                num_buckets=_CATEGORICAL_NUM_BUCKETS["hourofday"])
    
    fc_weekofyear = tf.feature_column.categorical_column_with_identity(key="weekofyear",
                                                                                 num_buckets=_CATEGORICAL_NUM_BUCKETS["weekofyear"])
    # 2 steps to one hot encoding
    fc_pickuploc = tf.feature_column.categorical_column_with_vocabulary_list(key="pickup_zone_name",
                                                                                vocabulary_list=_CATEGORICAL_STR_VOCAB["pickup_zone_name"])
    fc_pickuploc_ooe = tf.feature_column.indicator_column(fc_pickuploc)
    
    fc_dropoffloc = tf.feature_column.categorical_column_with_vocabulary_list(key="dropoff_zone_name",
                                                                            vocabulary_list=_CATEGORICAL_STR_VOCAB["pickup_zone_name"])
    
    fc_dropoffloc_ooe = tf.feature_column.indicator_column(fc_dropoffloc)
    
    # Cross features to get combination of day and hour and pickup-dropoff locations
    fc_crossed_day_hr = tf.feature_column.crossed_column(keys=[fc_dayofweek, fc_hourofday], hash_bucket_size=100)
    fc_crossed_pd_pair = tf.feature_column.crossed_column(keys=[fc_pickuploc, fc_dropoffloc], hash_bucket_size=10000)
    fc_crossed_hour_pu = tf.feature_column.crossed_column(keys=[fc_hourofday, fc_pickuploc], hash_bucket_size=1000)
    fc_crossed_hour_df = tf.feature_column.crossed_column(keys=[fc_hourofday, fc_dropoffloc], hash_bucket_size=1000)
    
    # binarize feature cross
    fc_crossed_day_hr_ooe = tf.feature_column.indicator_column(fc_crossed_day_hr)
    fc_crossed_pd_pair_ooe = tf.feature_column.indicator_column(fc_crossed_pd_pair)
    fc_crossed_hour_pu_ooe = tf.feature_column.indicator_column(fc_crossed_hour_pu)
    fc_crossed_hour_df_ooe = tf.feature_column.indicator_column(fc_crossed_hour_df)
    
    fc_airport = tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_identity(key="flag_airport", num_buckets=2))
    fc_big_car = tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_identity(key="flag_5_seat_car", num_buckets=2))
    fc_distance = tf.feature_column.bucketized_column(source_column=tf.feature_column.numeric_column("centroide_distance"), 
                                                      boundaries=[1248.0, 2077.0, 3778.0])
    fc_distance_ooe = tf.feature_column.indicator_column(fc_distance)
    
    wide = [        
        # Sparse columns
        fc_weekofyear, fc_crossed_day_hr_ooe, fc_crossed_pd_pair_ooe, fc_crossed_hour_pu_ooe, fc_crossed_hour_df_ooe, fc_airport, fc_big_car, 
        fc_distance_ooe
    ]
        
    # Embedding_column
    fc_embed_weekofyear = tf.compat.v1.feature_column.embedding_column(categorical_column=fc_weekofyear, dimension=NEMBEDS)
    fc_embed_crossed_day_hr = tf.feature_column.embedding_column(fc_crossed_day_hr, dimension=NEMBEDS)
    fc_embed_crossed_pd_pair = tf.feature_column.embedding_column(fc_crossed_pd_pair, dimension=NEMBEDS)
    fc_embed_crossed_hour_pu = tf.feature_column.embedding_column(fc_crossed_hour_pu, dimension=NEMBEDS)
    fc_embed_crossed_hour_df = tf.feature_column.embedding_column(fc_crossed_hour_df, dimension=NEMBEDS)
    
    
    deep = [
        fc_embed_weekofyear,
        fc_embed_crossed_day_hr,
        fc_embed_crossed_pd_pair,
        fc_embed_crossed_hour_pu,
        fc_embed_crossed_hour_df
    ]
    
    return wide, deep

### Serving input reciver function

In [7]:
def serving_input_receiver_fn():
    receiver_tensors = {
        'dayofweek': tf.compat.v1.placeholder(dtype=tf.int64, shape=[None], name="dayofweek"),
        'hourofday': tf.compat.v1.placeholder(dtype=tf.int64, shape=[None], name="hourofday"),
        'weekofyear': tf.compat.v1.placeholder(dtype=tf.int64, shape=[None], name="weekofyear"),
        'pickup_zone_name': tf.compat.v1.placeholder(dtype=tf.string, shape=[None], name="pickup_zone_name"),
        'dropoff_zone_name': tf.compat.v1.placeholder(dtype=tf.string, shape=[None], name="dropoff_zone_name"),
        'centroide_distance': tf.compat.v1.placeholder(dtype=tf.float64, shape=[None], name="centroide_distance"),
        'flag_airport': tf.compat.v1.placeholder(dtype=tf.int64, shape=[None], name="flag_airport"),
        'flag_5_seat_car': tf.compat.v1.placeholder(dtype=tf.int64, shape=[None], name="flag_5_seat_car"),
        KEY_COLUMN: tf.compat.v1.placeholder_with_default(tf.constant(['no_key']), [None], name="uuid")
    }
    
    features = {
        key: tf.expand_dims(tensor, -1) for key, tensor in receiver_tensors.items()
    }
        
    return tf.estimator.export.ServingInputReceiver(features=features, receiver_tensors=receiver_tensors)

### Estimator

In [8]:
def my_estimator(output_dir, throttle_secs, nnsize, batch_size, train_steps, eval_steps, eval_delay_secs):
    
    run_config = tf.estimator.RunConfig(save_checkpoints_secs=throttle_secs,
                                        tf_random_seed=2810,
                                        keep_checkpoint_max=3)
    
    # Add custom evaluation metric
    def my_rmse(labels, predictions):
        pred_values = tf.squeeze(input=predictions["predictions"], axis=-1)
        return {"rmse": tf.compat.v1.metrics.root_mean_squared_error(labels=labels, predictions=pred_values)}
    
    # Feature engineering
    wide, deep = get_wide_deep()
    
    estimator = tf.estimator.DNNLinearCombinedRegressor(
        model_dir=output_dir,
        linear_feature_columns=wide,
        dnn_feature_columns=deep,
        dnn_hidden_units=nnsize,
        dnn_activation_fn=tf.nn.leaky_relu,
        batch_norm=True,
        dnn_dropout=0.2,
        config=run_config)
    
    estimator = tf.estimator.add_metrics(estimator=estimator, metric_fn=my_rmse)
    
    train_spec = tf.estimator.TrainSpec(
        input_fn=read_dataset('train', tf.estimator.ModeKeys.TRAIN, batch_size),
        max_steps=train_steps)
    
    exporter = tf.estimator.BestExporter('exporter', serving_input_receiver_fn=serving_input_receiver_fn)
    
    eval_spec = tf.estimator.EvalSpec(
        input_fn=read_dataset('test', tf.estimator.ModeKeys.EVAL, 2**15),
        steps=eval_steps,
        start_delay_secs=eval_delay_secs,  # start evaluating after N seconds
        throttle_secs=throttle_secs,  # evaluate every N seconds
        exporters=exporter)
    
    return estimator, train_spec, eval_spec

### Instantiate model

In [9]:
# Parameters
BUCKET = "edml"
OUTDIR = "gs://{}/ai-platform/models/edml_trainer_{}".format(BUCKET, datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))
THROTTLE_SECS = 300
NNSIZE = [1024, 512, 256]
BATCH_SIZE = 64
TRAIN_STEPS = 100000 #5M per fare 3 apochs con batch_size 128 (input size 217M)
EVAL_STEPS = 1
EVAL_DECAY_SECS = 10

In [10]:
tf.compat.v1.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file
tf.compat.v1.logging.set_verbosity(v = tf.logging.INFO) # so loss is printed during training

In [13]:
estimator, train_spec, eval_spec = my_estimator(output_dir=OUTDIR, throttle_secs=THROTTLE_SECS, nnsize=NNSIZE, batch_size=BATCH_SIZE, train_steps=TRAIN_STEPS,
                                                eval_steps=EVAL_STEPS, eval_delay_secs=EVAL_DECAY_SECS)

INFO:tensorflow:Using config: {'_model_dir': 'gs://edml/ai-platform/models/edml_trainer_20201008_132337', '_tf_random_seed': 2810, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 300, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 3, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fa7293c0c10>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Using config: {'_model_dir': 'gs://ed

### Train, evaluate, predict

In [None]:
estimator.train(input_fn=read_dataset("2017", tf.estimator.ModeKeys.TRAIN, BATCH_SIZE), max_steps=10000)

In [None]:
estimator.evaluate(input_fn=read_dataset("2018", tf.estimator.ModeKeys.EVAL, 2**15), steps=None)

In [None]:
predictions = estimator.predict(input_fn=read_dataset("2019", tf.estimator.ModeKeys.PREDICT, 2**15), yield_single_examples=True)

In [None]:
i = 0
while i <= 10:
    print(next(predictions))
    i += 1


### train_and_evaluate

In [14]:
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 300.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
Instructions for updating:
Use `tf.data.Dataset.interleave(map_func, cycle_length, block_length, num_parallel_calls=tf.data.experimental.AUTOTUNE)` instead. If sloppy execution is desired, use `tf.data.Options.experimental_determinstic`.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureCo

({'average_loss': 187.93692,
  'label/mean': 24.622375,
  'loss': 6158317.0,
  'prediction/mean': 19.885351,
  'rmse': 13.709009,
  'global_step': 100000},
 [b'gs://edml/ai-platform/models/edml_trainer_20201008_132337/export/exporter/1602166109'])