# Advanced Model Training Workflow with TensorFlow 2.0

## Project Setup

In [1]:
pip install tensorflow-gpu==2.0.0-rc0



In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import time

In [3]:
tf.__version__

'2.0.0-rc0'

In [4]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

Authenticated


## Staging Data

In [5]:
%%bigquery flights_df --project tensorflow-ml-course --verbose

SELECT 

  -- Departure delay
  departure_delay,
    
  -- Distance
  distance,

  -- Airlines
  airline,
    
  -- Airports 
  departure_airport,
  arrival_airport, 

  -- Date information
  CAST(EXTRACT(DAYOFWEEK FROM departure_date) AS STRING) as departure_weekday,
  CAST(EXTRACT(MONTH FROM departure_date) AS STRING) as departure_month,

  -- Target column
  CASE WHEN (arrival_delay >= 15) THEN 1 ELSE 0 END as delayed
  
  FROM ( 
    
    -- Inner Query
    SELECT
      
      departure_delay,
      ROUND(ST_DISTANCE(ST_GEOGPOINT(departure_lon, departure_lat), ST_GEOGPOINT(arrival_lon, arrival_lat))/1000) as distance,
      airline,
      arrival_airport,
      departure_airport,
      PARSE_DATE("%Y-%m-%d", date) AS departure_date,
      
      arrival_delay
      
      
    FROM
      `bigquery-samples.airline_ontime_data.flights`
    WHERE date >= '2009-01-01' 
    AND date <= '2009-12-31'
    AND departure_delay > 0
    
  )


W0904 19:11:24.507439 140223078004608 _default.py:280] No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable


Executing query with job ID: d58a6747-a3d7-465d-8794-4cbf46e8afa8
Query executing: 1.33s
Query complete after 2.15s


In [6]:
%%bigquery high_traffic_airports --project tensorflow-ml-course --verbose

SELECT * FROM
 
 (SELECT departure_airport as airport_code,
  COUNT(*) as flights
  
  FROM
    `bigquery-samples.airline_ontime_data.flights`    
  
  WHERE date >= '2009-01-01' 
    AND date <= '2009-12-31'
    
  GROUP BY departure_airport
  ORDER BY airport_code)

WHERE flights > 10000

Executing query with job ID: 8956d804-b455-478b-a8bc-739f20b3a433
Query executing: 1.20s
Query complete after 1.78s


In [7]:
%%bigquery airline_codes --project tensorflow-ml-course --verbose

SELECT DISTINCT(airline)
  
FROM
    `bigquery-samples.airline_ontime_data.flights`
    
WHERE date >= '2009-01-01' 
    AND date <= '2009-12-31'
    
ORDER BY airline


Executing query with job ID: fb7f62fb-e795-42af-b1c2-f9da8a36bcf9
Query executing: 0.51s
Query complete after 1.07s


In [8]:
flights_df.shape

(2302332, 8)

In [9]:
flights_df.sample(n = 5)

Unnamed: 0,departure_delay,distance,airline,departure_airport,arrival_airport,departure_weekday,departure_month,delayed
2178920,3.0,247.0,OO,ATL,GSP,5,12,0
869721,85.0,335.0,EV,ATL,VLD,3,9,1
2084917,4.0,462.0,YV,IAD,CLE,5,5,0
864734,1.0,679.0,WN,OMA,MDW,2,10,0
915963,3.0,2432.0,AA,ORD,LAS,4,10,0


In [10]:
flights_df.dtypes

departure_delay      float64
distance             float64
airline               object
departure_airport     object
arrival_airport       object
departure_weekday     object
departure_month       object
delayed                int64
dtype: object

## Data Preprocessing

### Training-Testing-Split

In [None]:
train_df = flights_df.sample(frac=0.8,random_state=123)
test_df = flights_df.drop(train_df.index)

In [12]:
print(len(train_df), 'train examples')
print(len(test_df), 'test examples')

1841866 train examples
460466 test examples


#### Check Label distribution

In [13]:
print(round(flights_df.delayed.mean(),3)*100, '% delay in total dataset')
print(round(train_df.delayed.mean(),3)*100, '% delay in total dataset')
print(round(test_df.delayed.mean(),3)*100, '% delay in total dataset')

45.1 % delay in total dataset
45.1 % delay in total dataset
45.0 % delay in total dataset


### Create input pipeline using tf.data

#### Build a tf.data.Dataset 

Create a Batch Dataset from a Pandas Dataframe

In [None]:
def dataframe_to_dataset(dataframe, labels = 'delayed', shuffle=True, batch_size=32):
    # Creates a tf.data dataset from a Pandas Dataframe
    dataframe = dataframe.copy()
    labels = dataframe.pop(labels)
    dataset = tf.data.Dataset.from_tensor_slices((dict(dataframe), labels))
    if shuffle:
        dataset = dataset.shuffle(buffer_size=len(dataframe))
    dataset = dataset.batch(batch_size)
    return dataset

In [None]:
batch_size = 256

In [None]:
tf.keras.backend.set_floatx('float64')
train_ds = dataframe_to_dataset(train_df, batch_size=batch_size)
test_ds = dataframe_to_dataset(test_df, shuffle=False, batch_size=batch_size)

In [17]:
train_ds

<BatchDataset shapes: ({departure_delay: (None,), distance: (None,), airline: (None,), departure_airport: (None,), arrival_airport: (None,), departure_weekday: (None,), departure_month: (None,)}, (None,)), types: ({departure_delay: tf.float64, distance: tf.float64, airline: tf.string, departure_airport: tf.string, arrival_airport: tf.string, departure_weekday: tf.string, departure_month: tf.string}, tf.int32)>

The dataset returns a dictionary of column names (from the dataframe) that map to column values from rows in the dataframe.

#### Build Features using tf.feature_column

Demo for numeric variables:

In [18]:
example_batch = next(iter(train_ds))[0]

departure_delay = tf.feature_column.numeric_column("departure_delay")

feature_layer_demo = tf.keras.layers.DenseFeatures(departure_delay)
feature_layer_demo(example_batch).numpy()[:5]

array([[ 3.],
       [11.],
       [ 5.],
       [10.],
       [15.]], dtype=float32)

Demo for bucketized variables:

In [19]:
departure_delay_bucketized = tf.feature_column.bucketized_column(departure_delay, boundaries = [2, 3, 6, 9, 13, 19, 28, 44, 76])

feature_layer_demo = tf.keras.layers.DenseFeatures(departure_delay_bucketized)
feature_layer_demo(example_batch).numpy()[:5]

array([[0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 1., 0., 0., 0., 0., 0.],
       [0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 1., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 1., 0., 0., 0., 0.]], dtype=float32)

#### Setting Bins for numeric and vocabularies for categorical variables

In [None]:
departure_delay_bins = [2, 3, 6, 9, 13, 19, 28, 44, 76]
distance_bins = [600, 1200]
airports_voc = high_traffic_airports['airport_code']
airlines_voc = airline_codes['airline']
weekdays_voc = ['1', '2', '3', '4', '5', '6', '7']
months_voc = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12']

#### Build the input pipeline

In [None]:
feature_columns = []

# bucketized columns
distance = tf.feature_column.numeric_column("distance")
distance_buckets = tf.feature_column.bucketized_column(distance, boundaries = distance_bins)
feature_columns.append(distance_buckets)

departure_delay = tf.feature_column.numeric_column("departure_delay")
departure_delay_buckets = tf.feature_column.bucketized_column(departure_delay, boundaries = departure_delay_bins)
feature_columns.append(departure_delay_buckets) 

# categorical columns
arrival_airports = tf.feature_column.categorical_column_with_vocabulary_list('arrival_airport', airports_voc)
arrival_airports_dummy = tf.feature_column.indicator_column(arrival_airports)
feature_columns.append(arrival_airports_dummy)

departure_airports = tf.feature_column.categorical_column_with_vocabulary_list('departure_airport', airports_voc)
departure_airports_dummy = tf.feature_column.indicator_column(departure_airports)
feature_columns.append(departure_airports_dummy)

airlines = tf.feature_column.categorical_column_with_vocabulary_list('airline', airlines_voc)
airlines_dummy = tf.feature_column.indicator_column(airlines)
feature_columns.append(airlines_dummy)

weekdays = tf.feature_column.categorical_column_with_vocabulary_list('departure_weekday', weekdays_voc)
weekdays_dummy = tf.feature_column.indicator_column(weekdays)
feature_columns.append(weekdays_dummy)

months = tf.feature_column.categorical_column_with_vocabulary_list('departure_month', months_voc)
months_dummy = tf.feature_column.indicator_column(months)
feature_columns.append(months_dummy)

In [None]:
feature_layer_demo = tf.keras.layers.DenseFeatures(feature_columns)
feature_layer_demo(example_batch).shape

In [23]:
feature_layer_demo(example_batch).numpy()[:1]

array([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 

## Defining our model

### Define the feature layer

In [None]:
feature_layer = tf.keras.layers.DenseFeatures(feature_columns)

### Build the model

#### Non-distributed model

In [None]:
model_normal = tf.keras.models.Sequential([
    
    feature_layer,
    tf.keras.layers.Dense(1, activation='sigmoid', kernel_regularizer=tf.keras.regularizers.l2(0.0001))
    
    ])

model_normal.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy']
             )

## Defining the Distribution Strategy

### Mirrored Strategy

![](Mirrored_Strategy.jpg)

### Multi-Workers Mirrored Strategy

![](Multi_workers_Mirrored_Strategy.jpg)

### Creating the Mirrored Strategy instance

In [None]:
distribute = tf.distribute.MirroredStrategy()

## Distributed Training

### Defining a distributed model

In [None]:
with distribute.scope():
    model_distributed = tf.keras.models.Sequential([
        feature_layer,
        tf.keras.layers.Dense(1, activation='sigmoid', kernel_regularizer=tf.keras.regularizers.l2(0.0001))
        ])

    model_distributed.compile(optimizer='adam',
                  loss='binary_crossentropy',
                  metrics=['accuracy']
                 )

### Training the model: Normal vs. distributed training

#### Normal Training

In [28]:
start_time = time.time()
history = model_normal.fit(train_ds, 
                    epochs = 5,
                    callbacks = [tf.keras.callbacks.TensorBoard("logs/normal_training")])
print("Normal training took: {}".format(time.time() - start_time))

W0904 19:15:35.735445 140223078004608 base_layer.py:1772] Layer dense is casting an input tensor from dtype float32 to the layer's dtype of float64, which is new behavior in TensorFlow 2.  The layer has dtype float64 because it's dtype defaults to floatx.


To change all layers to have dtype float32 by default, call `tf.keras.backend.set_floatx('float32')`. To change just this layer, pass dtype='float32' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



Epoch 1/5


W0904 19:15:36.126768 140223078004608 deprecation.py:323] From /usr/local/lib/python3.6/dist-packages/tensorflow_core/python/ops/nn_impl.py:183: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Normal training took: 368.0146293640137


#### Distributed Training

In [29]:
start_time = time.time()
history = model_distributed.fit(train_ds,
                    epochs = 5,
                    callbacks = [tf.keras.callbacks.TensorBoard("logs/distributed_training")])
print("Distributed training took: {}".format(time.time() - start_time))

W0904 19:21:32.483937 140223078004608 base_layer.py:1772] Layer dense_1 is casting an input tensor from dtype float32 to the layer's dtype of float64, which is new behavior in TensorFlow 2.  The layer has dtype float64 because it's dtype defaults to floatx.


To change all layers to have dtype float32 by default, call `tf.keras.backend.set_floatx('float32')`. To change just this layer, pass dtype='float32' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Distributed training took: 386.49786019325256
