In [1]:
DATASET_GCP_PROJECT_ID = "bigquery-public-data"
DATASET_ID = 'covid19_geotab_mobility_impact'
TABLE_ID = 'us_border_volumes'

In [2]:
import tensorflow as tf
from tensorflow import feature_column
from tensorflow_io.bigquery import BigQueryClient
import numpy as np

client = BigQueryClient()

PROJECT_ID = "project1-190517" # A project ID in your GCP subscription.
DATASET_GCP_PROJECT_ID = "bigquery-public-data"
DATASET_ID = 'covid19_geotab_mobility_impact'
TABLE_ID = 'us_border_volumes'


In [3]:
read_session3 = client.read_session(
   "projects/" + PROJECT_ID,
   DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
   ["trip_direction",
    "day_type",
    "day_of_week",
    "avg_crossing_duration",
    "percent_of_normal_volume",
    "avg_crossing_duration_truck",
    "percent_of_normal_volume_truck"
   
    ],
   [tf.string,
    tf.string,
    tf.int64,
    tf.double,
    tf.int64,
    tf.double,
    tf.int64
    ],
     requested_streams=10
)
dataset3 = read_session3.parallel_read_rows()


In [4]:
def transfrom_row(row_dict):
 	# Identify column names for features.
 	feature_dict = { column:
                 	(tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor)
                 	for (column,tensor) in row_dict.items()
                 	}
 	# Remove target column from data
 	target = feature_dict.pop('avg_crossing_duration_truck')
 	# Return a tuple of features and target
 	return (feature_dict, target)


In [5]:
transformed_ds = dataset3.map(transfrom_row)

In [6]:
BATCH_SIZE = 32
SHUFFLE_BUFFER = 1024
training_dataset3 = transformed_ds.shuffle(SHUFFLE_BUFFER).batch(BATCH_SIZE)  


In [7]:
def get_categorical_feature_values(column):
    query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, DATASET_GCP_PROJECT_ID, DATASET_ID, TABLE_ID)
    client = bigquery.Client(project=PROJECT_ID)
    dataset_ref = client.dataset(DATASET_ID)
    job_config = bigquery.QueryJobConfig()
    query_job = client.query(query, job_config=job_config)
    result = query_job.to_dataframe()
    return result.values[:,0]


In [8]:
feature_columns = []
from google.cloud import bigquery
# Numeric columns
for header in ['day_of_week',     
             'avg_crossing_duration',
             'percent_of_normal_volume',
             'percent_of_normal_volume_truck']:
 feature_columns.append(feature_column.numeric_column(header))

# Categorical columns
for header in ['trip_direction', 'day_type']:
 categorical_feature = feature_column.categorical_column_with_vocabulary_list(
       header, get_categorical_feature_values(header))
 categorical_feature_one_hot = feature_column.indicator_column(categorical_feature)
 feature_columns.append(categorical_feature_one_hot)


In [9]:
feature_layer = tf.keras.layers.DenseFeatures(feature_columns)
Dense = tf.keras.layers.Dense

model = tf.keras.Sequential(
 [
   feature_layer,
   Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'),
   Dense(75, activation=tf.nn.relu),
   Dense(50, activation=tf.nn.relu),
   Dense(25, activation=tf.nn.relu),
   Dense(1)
 ])  


In [10]:
model.compile(
   loss='mse',
   metrics=['mae', 'mse'])


In [11]:
model.fit(training_dataset3, epochs=5)

Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x7f3b9ff9a0d0>

In [13]:
test_samples = {
   'trip_direction' : np.array(['Mexico to US', 'US to Canada']),
   'day_type' : np.array(['Weekdays', 'Weekends']),
   'day_of_week' : np.array([4, 7]),
   'avg_crossing_duration' : np.array([32.8, 10.4]),
   'percent_of_normal_volume' : np.array([102, 89]),
   'percent_of_normal_volume_truck' : np.array([106, 84])
}

In [14]:
model.predict(test_samples)

array([[33.476192],
       [11.808698]], dtype=float32)

## TensorFlow Estimators

In [24]:
import tensorflow as tf
from tensorflow_io.bigquery import BigQueryClient
from tensorflow import feature_column
from google.cloud import bigquery
import pandas as pd
import numpy as np
import datetime, os
import itertools


In [25]:
PROJECT_ID = "project1-190517"
DATASET_GCP_PROJECT_ID = "bigquery-public-data"
DATASET_ID = 'covid19_geotab_mobility_impact'
TABLE_ID = 'us_border_volumes'

In [26]:
def input_fn():
 PROJECT_ID = "project1-190517" # This is from what you created in your Google Cloud Account.
 DATASET_GCP_PROJECT_ID = "bigquery-public-data"
 TABLE_ID = 'us_border_volumes'
 DATASET_ID = 'covid19_geotab_mobility_impact'  
 client = BigQueryClient()
 read_session = client.read_session(
   "projects/" + PROJECT_ID,
   DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
   ["trip_direction",
    "day_type",
    "day_of_week",
    "avg_crossing_duration",
    "percent_of_normal_volume",
    "avg_crossing_duration_truck",
    "percent_of_normal_volume_truck"
   
    ],
   [tf.string,
    tf.string,
    tf.int64,
    tf.double,
    tf.int64,
    tf.double,
    tf.int64
    ],
     requested_streams=10
   )
 dataset = read_session.parallel_read_rows()

 def transfrom_row(row_dict):
   # Trim all string tensors
   feature_dict = { column:
                   (tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor)
                   for (column,tensor) in row_dict.items()
                   }
   # Extract target from features
   target = feature_dict.pop('avg_crossing_duration_truck')
   # return a tuple of features and target
   return (feature_dict, target)

 transformed_ds = dataset.map(transfrom_row)
 transformed_ds = transformed_ds.batch(32)

 return transformed_ds


In [27]:
feature_columns = []
# Numeric columns
for header in ['day_of_week',     
             'avg_crossing_duration',
             'percent_of_normal_volume',
             'percent_of_normal_volume_truck']:
 feature_columns.append(feature_column.numeric_column(header))

# Categorical columns
for header in ['trip_direction', 'day_type']:
 categorical_feature = feature_column.categorical_column_with_vocabulary_list(
       header, get_categorical_feature_values(header))
 categorical_feature_one_hot = feature_column.indicator_column(categorical_feature)
 feature_columns.append(categorical_feature_one_hot)


In [28]:
feature_columns

[NumericColumn(key='day_of_week', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None),
 NumericColumn(key='avg_crossing_duration', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None),
 NumericColumn(key='percent_of_normal_volume', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None),
 NumericColumn(key='percent_of_normal_volume_truck', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None),
 IndicatorColumn(categorical_column=VocabularyListCategoricalColumn(key='trip_direction', vocabulary_list=('US to Canada', 'US to Mexico', 'Canada to US', 'Mexico to US'), dtype=tf.string, default_value=-1, num_oov_buckets=0)),
 IndicatorColumn(categorical_column=VocabularyListCategoricalColumn(key='day_type', vocabulary_list=('Weekends', 'Weekdays'), dtype=tf.string, default_value=-1, num_oov_buckets=0))]

In [29]:
MODEL_DIR = os.path.join("models", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

In [30]:
%mkdir models
%mkdir {MODEL_DIR}


mkdir: cannot create directory ‘models’: File exists


In [31]:
linear_est = tf.estimator.LinearRegressor(feature_columns=feature_columns, model_dir=MODEL_DIR)
linear_est.train(input_fn)


INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': 'models/20201029-033738', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
Instructions for updating:
If using Keras pass *_constraint arguments to layers.
Instructions for updating:
Use

<tensorflow_estimator.python.estimator.canned.linear.LinearRegressorV2 at 0x7f3b9402ddd0>

In [32]:
test_samples = {
   'trip_direction' : np.array(['Mexico to US', 'US to Canada']),
   'day_type' : np.array(['Weekdays', 'Weekends']),
   'day_of_week' : np.array([4, 7]),
   'avg_crossing_duration' : np.array([32.8, 10.4]),
   'percent_of_normal_volume' : np.array([102, 89]),
   'percent_of_normal_volume_truck' : np.array([106, 84])
}


In [33]:
def scoring_input_fn():
 return tf.data.Dataset.from_tensor_slices(test_samples).batch(2)


In [34]:
y = linear_est.predict(   
        input_fn=scoring_input_fn)


In [35]:
predictions = list(p["predictions"] for p in itertools.islice(y, 2))
print("Predictions: {}".format(str(predictions)))


INFO:tensorflow:Calling model_fn.


To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from models/20201029-033738/model.ckpt-29
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
Predictions: [array([26.892534], dtype=float32), array([12.188314], dtype=float32)]
