# Predicting Housing Prices using Tensorflow + Cloud ML Engine

This notebook will show you how to create a tensorflow model, train it on the cloud in a distributed fashion across multiple CPUs or GPUs, explore the results using Tensorboard, and finally deploy the model for online prediction. We will demonstrate this by building a model to predict housing prices.


In [None]:
# Ensure the right version of Tensorflow is installed.
!pip freeze | grep tensorflow==2.2 || pip install tensorflow==2.2

In [None]:
import pandas as pd
import tensorflow as tf

from tensorflow import keras

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, DenseFeatures

In [None]:
print(tf.__version__)

In [None]:
#downlad data from GCS and store as pandas dataframe 
data_train = pd.read_csv(
  filepath_or_buffer='https://storage.googleapis.com/vijay-public/boston_housing/housing_train.csv',
  names=["CRIM","ZN","INDUS","CHAS","NOX","RM","AGE","DIS","RAD","TAX","PTRATIO","MEDV"])

data_test = pd.read_csv(
  filepath_or_buffer='https://storage.googleapis.com/vijay-public/boston_housing/housing_test.csv',
  names=["CRIM","ZN","INDUS","CHAS","NOX","RM","AGE","DIS","RAD","TAX","PTRATIO","MEDV"])

In [None]:
data_train.head()

#### Column Descriptions:

1. CRIM: per capita crime rate by town 
2. ZN: proportion of residential land zoned for lots over 25,000 sq.ft. 
3. INDUS: proportion of non-retail business acres per town 
4. CHAS: Charles River dummy variable (= 1 if tract bounds river; 0 otherwise) 
5. NOX: nitric oxides concentration (parts per 10 million) 
6. RM: average number of rooms per dwelling 
7. AGE: proportion of owner-occupied units built prior to 1940 
8. DIS: weighted distances to five Boston employment centres 
9. RAD: index of accessibility to radial highways 
10. TAX: full-value property-tax rate per $10,000 
11. PTRATIO: pupil-teacher ratio by town 
12. MEDV: Median value of owner-occupied homes

### 2) Write Tensorflow Code

### 3) Package Code

You've now written all the tensoflow code you need!

To make it compatible with Cloud ML Engine we'll combine the above tensorflow code into a single python file with two simple changes

1. Add some boilerplate code to parse the command line arguments required for gcloud.
2. Use the learn_runner.run() function to run the experiment

We also add an empty \__init__\.py file to the folder. This is just the python convention for identifying modules.

In [None]:
%%bash
mkdir trainer
touch trainer/__init__.py

In [84]:
%%writefile trainer/task.py

import os
import argparse
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, DenseFeatures

print(tf.__version__)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)



FEATURE_COLUMNS = ["CRIM", "ZN", "INDUS", "NOX", "RM",
                  "AGE", "DIS", "TAX", "PTRATIO"]
LABEL_COLUMN = "MEDV"
UNUSED_COLUMNS= ["CHAS", "RAD"]
DEFAULTS = [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]]

TRAIN_BATCH_SIZE = 32

def input_fn(features, labels, shuffle, num_epochs, batch_size):
    """Generates an input function to be used for model training.
    Args:
      features: numpy array of features used for training or inference
      labels: numpy array of labels for each example
      shuffle: boolean for whether to shuffle the data or not (set True for
        training, False for evaluation)
      num_epochs: number of epochs to provide the data for
      batch_size: batch size for training
    Returns:
      A tf.data.Dataset that can provide data to the Keras model for training or
        evaluation
    """
    if labels is None:
        inputs = features
    else:
        inputs = (features, labels)
    dataset = tf.data.Dataset.from_tensor_slices(inputs)

    if shuffle:
        dataset = dataset.shuffle(buffer_size=len(features))

    # We call repeat after shuffling, rather than before, to prevent separate
    # epochs from blending together.
    dataset = dataset.repeat(num_epochs)
    dataset = dataset.batch(batch_size)
    return dataset

#TODO optimize model for this problem
def create_keras_model(input_dim, learning_rate):
    """Creates Keras Model for Binary Classification.
    The single output node + Sigmoid activation makes this a Logistic
    Regression.
    Args:
      input_dim: How many features the input has
      learning_rate: Learning rate for training
    Returns:
      The compiled Keras model (still needs to be trained)
    """
    Dense = tf.keras.layers.Dense
    model = tf.keras.Sequential(
        [
            Dense(100, activation=tf.nn.relu, kernel_initializer='uniform',
                  input_shape=(input_dim,)),
            Dense(75, activation=tf.nn.relu),
            Dense(50, activation=tf.nn.relu),
            Dense(25, activation=tf.nn.relu),
            Dense(1),
        ])

    # Custom Optimizer:
    # https://www.tensorflow.org/api_docs/python/tf/train/RMSPropOptimizer
    # optimizer = tf.keras.optimizers.RMSprop(lr=learning_rate)

    # Compile Keras model
    model.compile(
        optimizer=tf.keras.optimizers.RMSprop(lr=args.learning_rate),
                loss="mean_squared_error",
                metrics=[tf.keras.metrics.RootMeanSquaredError()])
    return model

def preprocess(dataframe):
    """Converts categorical features to numeric. Removes unused columns.
    Args:
      dataframe: Pandas dataframe with raw data
    Returns:
      Dataframe with preprocessed data
    """
    dataframe = dataframe.drop(columns=UNUSED_COLUMNS)
    
    # Convert integer valued (numeric) columns to floating point
    numeric_columns = dataframe.select_dtypes(['int64']).columns
    dataframe[numeric_columns] = dataframe[numeric_columns].astype('float32')
    return dataframe


def standardize(dataframe):
    """Scales numerical columns using their means and standard deviation to get
    z-scores: the mean of each numerical column becomes 0, and the standard
    deviation becomes 1. This can help the model converge during training.
    Args:
      dataframe: Pandas dataframe
    Returns:
      Input dataframe with the numerical columns scaled to z-scores
    """
    dtypes = list(zip(dataframe.dtypes.index, map(str, dataframe.dtypes)))
    # Normalize numeric columns.
    for column, dtype in dtypes:
        if dtype == 'float32':
            dataframe[column] -= dataframe[column].mean()
            dataframe[column] /= dataframe[column].std()
    return dataframe


def load_data():
    """Loads data into preprocessed (train_x, train_y, eval_y, eval_y)
    dataframes.
    Returns:
      A tuple (train_x, train_y, eval_x, eval_y), where train_x and eval_x are
      Pandas dataframes with features for training and train_y and eval_y are
      numpy arrays with the corresponding labels.
    """
    train_df = data_train
    eval_df = data_test

    train_df = preprocess(train_df)
    eval_df = preprocess(eval_df)

    # Split train and eval data with labels. The pop method copies and removes
    # the label column from the dataframe.
    train_x, train_y = train_df, train_df.pop(LABEL_COLUMN)
    eval_x, eval_y = eval_df, eval_df.pop(LABEL_COLUMN)

    # Join train_x and eval_x to normalize on overall means and standard
    # deviations. Then separate them again.
    all_x = pd.concat([train_x, eval_x], keys=['train', 'eval'])
    all_x = standardize(all_x)
    train_x, eval_x = all_x.xs('train'), all_x.xs('eval')

    # Reshape label columns for use with tf.data.Dataset
    train_y = np.asarray(train_y).astype('float32').reshape((-1, 1))
    eval_y = np.asarray(eval_y).astype('float32').reshape((-1, 1))

    return train_x, train_y, eval_x, eval_y

def train_and_evaluate(args):
    """Trains and evaluates the Keras model.
    Uses the Keras model defined in model.py and trains on data loaded and
    preprocessed in util.py. Saves the trained model in TensorFlow SavedModel
    format to the path defined in part by the --job-dir argument.
    Args:
      args: dictionary of arguments - see get_args() for details
    """
    
    data_train = pd.read_csv(
      filepath_or_buffer='https://storage.googleapis.com/vijay-public/boston_housing/housing_train.csv',
      names=["CRIM","ZN","INDUS","CHAS","NOX","RM","AGE","DIS","RAD","TAX","PTRATIO","MEDV"])

    data_test = pd.read_csv(
      filepath_or_buffer='https://storage.googleapis.com/vijay-public/boston_housing/housing_test.csv',
      names=["CRIM","ZN","INDUS","CHAS","NOX","RM","AGE","DIS","RAD","TAX","PTRATIO","MEDV"])

    data_train = preprocess(data_train)
    data_test = preprocess(data_test)
    
    train_x, train_y = data_train, data_train.pop(LABEL_COLUMN)
    eval_x, eval_y = data_test, data_test.pop(LABEL_COLUMN)

    #train_x, train_y, eval_x, eval_y = load_data()

    # dimensions
    num_train_examples, input_dim = train_x.shape
    num_eval_examples = eval_x.shape[0]

    # Create the Keras Model
    keras_model = create_keras_model(
        input_dim=input_dim, learning_rate=args.learning_rate)

    # Pass a numpy array by passing DataFrame.values
    training_dataset = input_fn(
        features=train_x.values,
        labels=train_y,
        shuffle=True,
        num_epochs=args.num_epochs,
        batch_size=args.batch_size)

    # Pass a numpy array by passing DataFrame.values
    validation_dataset = input_fn(
        features=eval_x.values,
        labels=eval_y,
        shuffle=False,
        num_epochs=args.num_epochs,
        batch_size=num_eval_examples)

    # Setup Learning Rate decay.
    #lr_decay_cb = tf.keras.callbacks.LearningRateScheduler(
     #   lambda epoch: args.learning_rate + 0.02 * (0.5 ** (1 + epoch)),
      #  verbose=True)

    # Setup TensorBoard callback.
    tensorboard_cb = tf.keras.callbacks.TensorBoard(
        os.path.join(args.job_dir, 'keras_tensorboard'),
        histogram_freq=1)

    # Train model
    keras_model.fit(
        training_dataset,
        steps_per_epoch=int(num_train_examples / args.batch_size),
        epochs=args.num_epochs,
        validation_data=validation_dataset,
        validation_steps=1,
        verbose=1,
        callbacks=[tensorboard_cb])

    export_path = os.path.join(args.job_dir, 'keras_export')
    tf.keras.models.save_model(keras_model, export_path)
    print('Model exported to: {}'.format(export_path))

def get_args():
    parser = argparse.ArgumentParser()
    # Input Arguments
    parser.add_argument(
        '--job-dir',
        type=str,
        required=True,
        help='local or GCS location for writing checkpoints and exporting '
             'models')
    parser.add_argument(
        '--num-epochs',
        type=int,
        default=20,
        help='number of times to go through the data, default=20')
    parser.add_argument(
        '--batch-size',
        default=128,
        type=int,
        help='number of records to read during each training step, default=128')
    parser.add_argument(
        '--learning-rate',
        default=.01,
        type=float,
        help='learning rate for gradient descent, default=.01')
    parser.add_argument(
        '--verbosity',
        choices=['DEBUG', 'ERROR', 'FATAL', 'INFO', 'WARN'],
        default='INFO')
    args, _ = parser.parse_known_args()
    return args

#initiate training job
if __name__ == '__main__':
    args = get_args()
    tf.compat.v1.logging.set_verbosity(args.verbosity)
    train_and_evaluate(args)

Overwriting trainer/task.py


### 4) Train
Now that our code is packaged we can invoke it using the gcloud command line tool to run the training. 

Note: Since our dataset is so small and our model is simple the overhead of provisioning the cluster is longer than the actual training time. Accordingly you'll notice the single VM cloud training takes longer than the local training, and the distributed cloud training takes longer than single VM cloud. For larger datasets and more complex models this will reverse

#### Set Environment Vars
We'll create environment variables for our project name GCS Bucket and reference this in future commands.

If you do not have a GCS bucket, you can create one using [these](https://cloud.google.com/storage/docs/creating-buckets) instructions.

In [85]:
GCS_BUCKET = 'gs://BUCKET_NAME' #CHANGE THIS TO YOUR BUCKET
PROJECT = 'PROJECT_ID' #CHANGE THIS TO YOUR PROJECT ID
REGION = 'us-central1' #OPTIONALLY CHANGE THIS

In [86]:
import os
os.environ['GCS_BUCKET'] = GCS_BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION

#### Run local
It's a best practice to first run locally on a small dataset to check for errors. Note you can ignore the warnings in this case, as long as there are no errors.

In [87]:
%%bash
gcloud ai-platform local train \
   --module-name=trainer.task \
   --package-path=trainer \
   -- \
   --job-dir='./output'

1.5.0


  from ._conv import register_converters as _register_converters
2018-03-05 18:56:25.561527: I tensorflow/core/platform/cpu_feature_guard.cc:137] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 FMA


#### Run on cloud (1 cloud ML unit)

First we specify which GCP project to use.

In [88]:
%%bash
gcloud config set project $PROJECT

Updated property [core/project].


Then we specify which GCS bucket to write to and a job name.
Job names submitted to the ml engine must be project unique, so we append the system date/time. Update the cell below to point to a GCS bucket you own.

In [89]:
%%bash
JOBNAME=housing_$(date -u +%y%m%d_%H%M%S)

gcloud ai-platform jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=trainer.task \
   --package-path=./trainer \
   --job-dir=$GCS_BUCKET/$JOBNAME/ \
   --runtime-version 2.1 \
   --python-version 3.7 \
   -- \
   --job-dir=$GCS_BUCKET/$JOBNAME/output


jobId: housing_180305_185634
state: QUEUED


Job [housing_180305_185634] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ai-platform jobs describe housing_180305_185634

or continue streaming the logs with the command

  $ gcloud ai-platform jobs stream-logs housing_180305_185634


#### Run on cloud (10 cloud ML units)
Because we are using the TF Estimators interface, distributed computing just works! The only change we need to make to run in a distributed fashion is to add the [--scale-tier](https://cloud.google.com/ml/pricing#ml_training_units_by_scale_tier) argument. Cloud ML Engine then takes care of distributing the training across devices for you!


In [90]:
%%bash
JOBNAME=housing_$(date -u +%y%m%d_%H%M%S)

gcloud ai-platform jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=trainer.task \
   --package-path=./trainer \
   --job-dir=$GCS_BUCKET/$JOBNAME \
   --runtime-version 1.4 \
   --scale-tier=STANDARD_1 \
   -- \
   --output_dir=$GCS_BUCKET/$JOBNAME/output

jobId: housing_180305_185638
state: QUEUED


Job [housing_180305_185638] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ai-platform jobs describe housing_180305_185638

or continue streaming the logs with the command

  $ gcloud ai-platform jobs stream-logs housing_180305_185638


#### Run on cloud GPU (3 cloud ML units)

Also works with GPUs!

"BASIC_GPU" corresponds to one Tesla K80 at the time of this writing, hardware subject to change. 1 GPU is charged as 3 cloud ML units.

In [78]:
%%bash
JOBNAME=housing_$(date -u +%y%m%d_%H%M%S)

gcloud ai-platform jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=trainer.task \
   --package-path=./trainer \
   --job-dir=$GCS_BUCKET/$JOBNAME \
   --runtime-version 1.4 \
   --scale-tier=BASIC_GPU \
   -- \
   --output_dir=$GCS_BUCKET/$JOBNAME/output

jobId: housing_180305_183840
state: QUEUED


Job [housing_180305_183840] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ai-platform jobs describe housing_180305_183840

or continue streaming the logs with the command

  $ gcloud ai-platform jobs stream-logs housing_180305_183840


#### Run on 8 cloud GPUs (24 cloud ML units)
To train across multiple GPUs you use a [custom scale tier](https://cloud.google.com/ml/docs/concepts/training-overview#job_configuration_parameters).

You specify the number and types of machines you want to run on in a config.yaml, then reference that config.yaml via the --config config.yaml command line argument.

Here I am specifying a master node with machine type complex_model_m_gpu and one worker node of the same type. Each complex_model_m_gpu has 4 GPUs so this job will run on 2x4=8 GPUs total. 

WARNING: The default project quota is 10 cloud ML units, so unless you have requested a quota increase you will get a quota exceeded error. This command is just for illustrative purposes.

In [79]:
%%writefile config.yaml
trainingInput:
  scaleTier: CUSTOM
  masterType: complex_model_m_gpu
  workerType: complex_model_m_gpu
  workerCount: 1

Overwriting config.yaml


In [80]:
%%bash
JOBNAME=housing_$(date -u +%y%m%d_%H%M%S)

gcloud ai-platform jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=trainer.task \
   --package-path=./trainer \
   --job-dir=$GCS_BUCKET/$JOBNAME \
   --runtime-version 1.4 \
   --config config.yaml \
   -- \
   --output_dir=$GCS_BUCKET/$JOBNAME/output

jobId: housing_180305_183843
state: QUEUED


Job [housing_180305_183843] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ai-platform jobs describe housing_180305_183843

or continue streaming the logs with the command

  $ gcloud ai-platform jobs stream-logs housing_180305_183843


### 5) Inspect Results Using Tensorboard

Tensorboard is a utility that allows you to visualize your results.

Expand the 'loss' graph. What is your evaluation loss? This is squared error, so take the square root of it to get the average error in dollars. Does this seem like a reasonable margin of error for predicting a housing price?

To activate TensorBoard within the JupyterLab UI navigate to **File** - **New Launcher**. Then double-click the 'Tensorboard' icon on the bottom row.

TensorBoard 1 will appear in the new tab. Navigate through the three tabs to see the active TensorBoard. The 'Graphs' and 'Projector' tabs offer very interesting information including the ability to replay the tests.

You may close the TensorBoard tab when you are finished exploring.

### 6) Deploy Model For Predictions

Cloud ML Engine has a prediction service that will wrap our tensorflow model with a REST API and allow remote clients to get predictions.

You can deploy the model from the Google Cloud Console GUI, or you can use the gcloud command line tool. We will use the latter method. Note this will take up to 5 minutes.

In [96]:
%%bash
MODEL_NAME="housing_prices"
MODEL_VERSION="v1"
MODEL_LOCATION=output/export/Servo/$(ls output/export/Servo | tail -1) 

#gcloud ai-platform versions delete ${MODEL_VERSION} --model ${MODEL_NAME} #Uncomment to overwrite existing version
#gcloud ai-platform models delete ${MODEL_NAME} #Uncomment to overwrite existing model
gcloud ai-platform models create ${MODEL_NAME} --regions $REGION
gcloud ai-platform versions create ${MODEL_VERSION} --model ${MODEL_NAME} --origin ${MODEL_LOCATION} --staging-bucket=$GCS_BUCKET

Creating version (this might take a few minutes)......
............................................................................................done.


### 7) Get Predictions

There are two flavors of the ML Engine Prediction Service: Batch and online.

Online prediction is more appropriate for latency sensitive requests as results are returned quickly and synchronously. 

Batch prediction is more appropriate for large prediction requests that you only need to run a few times a day.

The prediction services expects prediction requests in standard JSON format so first we will create a JSON file with a couple of housing records.


In [68]:
%%writefile records.json
{"CRIM": 0.00632,"ZN": 18.0,"INDUS": 2.31,"NOX": 0.538, "RM": 6.575, "AGE": 65.2, "DIS": 4.0900, "TAX": 296.0, "PTRATIO": 15.3}
{"CRIM": 0.00332,"ZN": 0.0,"INDUS": 2.31,"NOX": 0.437, "RM": 7.7, "AGE": 40.0, "DIS": 5.0900, "TAX": 250.0, "PTRATIO": 17.3}

Writing records.json


Now we will pass this file to the prediction service using the gcloud command line tool. Results are returned immediatley!

In [69]:
!gcloud ai-platform predict --model housing_prices --json-instances records.json

PREDICTIONS
[26098.3671875]
[20871.384765625]


Updates are available for some Cloud SDK components.  To install them,
please run:
  $ gcloud components update



### Conclusion

#### What we covered
1. How to use Tensorflow's high level Estimator API
2. How to deploy tensorflow code for distributed training in the cloud
3. How to evaluate results using TensorBoard
4. How deploy the resulting model to the cloud for online prediction

#### What we didn't cover
1. How to leverage larger than memory datasets using Tensorflow's queueing system
2. How to create synthetic features from our raw data to aid learning (Feature Engineering)
3. How to improve model performance by finding the ideal hyperparameters using Cloud ML Engine's [HyperTune](https://cloud.google.com/ml-engine/docs/how-tos/using-hyperparameter-tuning) feature

This lab is a great start, but adding in the above concepts is critical in getting your models to production ready quality. These concepts are covered in Google's 1-week on-demand Tensorflow + Cloud ML course: https://www.coursera.org/learn/serverless-machine-learning-gcp