# Predicting House Sale Prices
### Distributed Machine Learning using Tensorflow + Cloud ML Engine

A barebones example showing the simplest path from raw data to distributed cloud training with the fewest lines of code

**This notebook is intended to be run on Google Cloud Datalab**: https://cloud.google.com/datalab/docs/quickstarts

Datalab will have the required libraries installed by default for this code to work. If you choose to run this code outside of Datalab you may run in to version and dependency issues which you will need to resolve.

In [2]:
import pandas as pd
import tensorflow as tf

In [3]:
print(tf.__version__)

1.0.0


## Tensorflow APIs
<img src="assets/TFHierarchy.png">

Tensorflow is a heirarchical framework. The further down the heirarchy you go, the more flexibility you have, but that more code you have to write. A best practice is start at the highest level of abstraction. Then if you need additional flexibility for some reason drop down one layer. 

For this tutorial we will be operating at the highest levels of Tensorflow abstraction, using Estimator and Experiments APIs.

## Steps

1. Load raw data

2. Define feature columns

3. Define Estimator

4. Define input function

5. Define Experiment

6. Package Code

7. Run!

### 1) Load Raw Data

We will be using a dataset containing house sales prices for Kings County in Washington, which includes Seattle. It includes homes sold between May 2014 and May 2015. You can read more about the dataset on [kaggle](https://www.kaggle.com/harlfoxem/housesalesprediction). It is a comma separated file hosted in a public GCS bucket.


In [15]:
#downlad data from GCS and store as pandas dataframe 

df = pd.read_csv(
  filepath_or_buffer='https://storage.googleapis.com/ml-taw/housing/kc_house_data.csv',
  header=0
  )

In [17]:
df.head()

Unnamed: 0,id,date,price,bedrooms,bathrooms,sqft_living,sqft_lot,floors,waterfront,view,...,grade,sqft_above,sqft_basement,yr_built,yr_renovated,zipcode,lat,long,sqft_living15,sqft_lot15
0,7129300520,20141013T000000,221900.0,3,1.0,1180,5650,1.0,0,0,...,7,1180,0,1955,0,98178,47.5112,-122.257,1340,5650
1,6414100192,20141209T000000,538000.0,3,2.25,2570,7242,2.0,0,0,...,7,2170,400,1951,1991,98125,47.721,-122.319,1690,7639
2,5631500400,20150225T000000,180000.0,2,1.0,770,10000,1.0,0,0,...,6,770,0,1933,0,98028,47.7379,-122.233,2720,8062
3,2487200875,20141209T000000,604000.0,4,3.0,1960,5000,1.0,0,0,...,7,1050,910,1965,0,98136,47.5208,-122.393,1360,5000
4,1954400510,20150218T000000,510000.0,3,2.0,1680,8080,1.0,0,0,...,8,1680,0,1987,0,98074,47.6168,-122.045,1800,7503


### 2) Define feature columns

Feature columns are your Estimator's data "interface." They tell the estimator in what format they should expect data and how to interpret it (is it one-hot? sparse? dense? categorical? continuous?).  https://www.tensorflow.org/api_docs/python/tf/contrib/layers/feature_column

The id column is a number that does not have predictive value, and we are not regressing on the selling date, so we exclude those as features from our data set.  Since we are trying to predict price, price is not a feature but rather the label.


In [26]:
REAL_VALUED_FEATURES = [u'bedrooms', u'bathrooms', u'sqft_living',
       u'sqft_lot', u'floors', u'condition', 
       u'sqft_above', u'sqft_basement', u'yr_built', u'yr_renovated',
       u'lat', u'long', u'sqft_living15', u'sqft_lot15']

CATEGORICAL_FEATURES = [u'waterfront', u'sqft_above',
                u'sqft_basement', u'zipcode']   

LABEL = u'price'

feature_cols = [tf.contrib.layers.real_valued_column(k) for k in REAL_VALUED_FEATURES] + [tf.contrib.layers.sparse_column_with_hash_bucket(k,hash_bucket_size=100) for k in CATEGORICAL_FEATURES]
    
'''
REAL_FEATURES = [u'bedrooms', u'bathrooms', u'sqft_living',
                 u'sqft_lot', u'floors', u'sqft_above', u'sqft_basement', 
                 u'sqft_living15', u'sqft_lot15']

real_features = [tf.contrib.layers.real_valued_column(k) for k in REAL_FEATURES]
cat_features = [tf.contrib.layers.sparse_column_with_hash_bucket(k,hash_bucket_size=100) for k in CAT_FEATURES]
feature_cols = real_features + cat_features

LABEL = u'price'

FEATURES = REAL_FEATURES + CAT_FEATURES
'''



"\nREAL_FEATURES = [u'bedrooms', u'bathrooms', u'sqft_living',\n                 u'sqft_lot', u'floors', u'sqft_above', u'sqft_basement', \n                 u'sqft_living15', u'sqft_lot15']\n\nreal_features = [tf.contrib.layers.real_valued_column(k) for k in REAL_FEATURES]\ncat_features = [tf.contrib.layers.sparse_column_with_hash_bucket(k,hash_bucket_size=100) for k in CAT_FEATURES]\nfeature_cols = real_features + cat_features\n\nLABEL = u'price'\n\nFEATURES = REAL_FEATURES + CAT_FEATURES\n"

### 3) An Estimator is what actually implements your training, eval and prediction loops. Every estimator has the following methods:

- fit() for training
- eval() for evaluation
- predict() for prediction
- export_savedmodel() for writing model state to disk

Tensorflow has several canned estimator that already implement these methods (DNNClassifier, LogisticClassifier etc..) or you can implement a custom estimator. For an example of implementing a custom estimator see [here](https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/blogs/timeseries/rnn_cloudmle.ipynb).

For simplicity we will use a canned estimator. To instantiate an estimator simply pass it what Feature Columns to expect.

In [20]:
def generate_estimator(output_dir):
  return tf.contrib.learn.DNNRegressor(feature_columns=feature_cols,
                                            hidden_units=[10, 10],
                                            model_dir=output_dir)

### 4) Define input function

Now that you have an estimator and it konws what type of data to expect and how to intepret, you need to actually pass the data to it! This is the job of the input function. 

The input function return a (features, label) tuple
- features: A dictionary. Each key is a feature column name and its value is the Tensor containing the data for that Feature
- label: A Tensor containing the label column

In [21]:
def generate_input_fn(data_set):
    def input_fn():
      features = {k: tf.constant(data_set[k].values) for k in FEATURES}
      labels = tf.constant(data_set[LABEL].values)
      return features, labels
    return input_fn

### 5) Define Experiment

An experiment is ultimatley what you pass to learn_runner.run() for Cloud ML training. It encapsulated the Estimator (which in turn encapsulates the feature column definitions) and the Input Function.

Notice that to instantiate an Experiment you don't pass the Estimator or Input Functions directly. Instead you pass a function that *returns* an Estimator or Input Function. This is why we wrapped the Estimator and Input Function instantiations in generator functions during steps 3 and 4 respectively. It is also why we wrap the Experiment itself in a generator function, because a downstream function will expect it in this format.

In [22]:
def generate_experiment_fn(output_dir):
  return tf.contrib.learn.Experiment(
    generate_estimator(output_dir),
    train_input_fn=generate_input_fn(df), 
    eval_input_fn=generate_input_fn(df), #Normally you would pass a different eval set
    train_steps=1000,
    eval_steps=1000
  )
  

### 6) Package Code

You've now written all the tensoflow code you need!

To make it compatible with Cloud ML Engine we'll combine the above tensorflow code into a single python file with two simple changes

1. Add some boilerplate code to parse the command line arguments required for gcloud.
2. Use the learn_runner.run() function to run the experiment

We also add an \__init__\.py file to the folder. This is just python convention for identifying modules

In [27]:
%%bash
rm -rf trainer
mkdir trainer
touch trainer/__init__.py

In [28]:
%%writefile trainer/task.py

import argparse
import pandas as pd
import tensorflow as tf
from tensorflow.contrib.learn.python.learn import learn_runner
tf.logging.set_verbosity(tf.logging.ERROR)

df = pd.read_csv(
  filepath_or_buffer='https://storage.googleapis.com/ml-taw/housing/kc_house_data.csv',
  header=0,
  )
  

REAL_VALUED_FEATURES = [u'bedrooms', u'bathrooms', u'sqft_living',
       u'sqft_lot', u'floors', u'condition', 
       u'sqft_above', u'sqft_basement', u'yr_built', u'yr_renovated',
       u'lat', u'long', u'sqft_living15', u'sqft_lot15']

CATEGORICAL_FEATURES = [u'waterfront', u'sqft_above',
                u'sqft_basement', u'zipcode']   

LABEL = u'price'

feature_cols = [tf.contrib.layers.real_valued_column(k) for k in REAL_VALUED_FEATURES] + [tf.contrib.layers.sparse_column_with_hash_bucket(k,hash_bucket_size=100) for k in CATEGORICAL_FEATURES]
'''
REAL_FEATURES = [u'bedrooms', u'bathrooms', u'sqft_living',
                 u'sqft_lot', u'floors', u'sqft_above', u'sqft_basement', 
                 u'sqft_living15', u'sqft_lot15']

CAT_FEATURES = [u'waterfront', u'view', u'condition', u'grade', u'sqft_above',
                u'sqft_basement', u'yr_built', u'yr_renovated', u'zipcode']

real_features = [tf.contrib.layers.real_valued_column(k) for k in REAL_FEATURES]
cat_features = [tf.contrib.layers.sparse_column_with_hash_bucket(k,hash_bucket_size=100) for k in CAT_FEATURES]
feature_cols = real_features + cat_features

LABEL = u'price'

FEATURES = REAL_FEATURES + CAT_FEATURES

'''

def generate_estimator(output_dir):
  return tf.contrib.learn.DNNRegressor(feature_columns=feature_cols,
                                            hidden_units=[10, 10],
                                            model_dir=output_dir)

def generate_input_fn(data_set):
    def input_fn():
      features = {k: tf.constant(data_set[k].values) for k in FEATURES}
      labels = tf.constant(data_set[LABEL].values)
      return features, labels
    return input_fn

def generate_experiment_fn(output_dir):
  return tf.contrib.learn.Experiment(
    generate_estimator(output_dir),
    train_input_fn=generate_input_fn(df),
    eval_input_fn=generate_input_fn(df),
    train_steps=1000,
    eval_steps=1000
  )

######CLOUD ML ENGINE BOILERPLATE CODE BELOW######
if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  # Input Arguments
  parser.add_argument(
      '--output_dir',
      help='GCS location to write checkpoints and export models',
      required=True
  )
  parser.add_argument(
        '--job-dir',
        help='this model ignores this field, but it is required by gcloud',
        default='junk'
    )
  args = parser.parse_args()
  arguments = args.__dict__
  output_dir = arguments.pop('output_dir')

  learn_runner.run(generate_experiment_fn, output_dir)

Writing trainer/task.py


### 7) Run using gcloud command line tool
Now that our code is packaged we can invoke it using gcloud to run the training. 

Note: Since our dataset is so small and our model is simple the overhead of provisioning the cluster is longer than the actual training time. Accordingly you'll notice the single VM cloud training takes longer than the local training, and the distributed cloud training takes longer than single VM cloud. For larger datasets and more complex models this will reverse

#### Run local
It's a best practice to first run locally on a small dataset to check for errors. Note you can ignore the warnings in this case, as long as there are no errors.

In [29]:
%%bash
gcloud ml-engine local train \
   --module-name=trainer.task \
   --package-path=trainer \
   -- \
   --output_dir='./output'

Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 162, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/Users/reddyv/Code/tensorflow_teaching_examples/trainer/task.py", line 10, in <module>
    header=0,
  File "/usr/local/lib/python2.7/site-packages/pandas/io/parsers.py", line 645, in parser_f
    return _read(filepath_or_buffer, kwds)
  File "/usr/local/lib/python2.7/site-packages/pandas/io/parsers.py", line 374, in _read
    compression=kwds.get('compression', None))
  File "/usr/local/lib/python2.7/site-packages/pandas/io/common.py", line 238, in get_filepath_or_buffer
    req = _urlopen(str(filepath_or_buffer))
  File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urll

#### Run on cloud (1 cloud ML unit)

First we specify which GCP project to use. Update the cell below to use your project name.

In [13]:
%%bash
gcloud config set project vijays-sandbox #CHANGE THIS TO YOUR PROJECT

Updated property [core/project].


Then we specify which GCS bucket to write to and a job name.
Job names submitted to the ml engine must be project unique, so we append the system date/time. Update the cell below to point to a GCS bucket you own.

In [30]:
%%bash
GCS_BUCKET=gs://vijays-sandbox-ml/housing #CHANGE THIS TO YOUR GCS BUCKET
JOBNAME=housing_$(date -u +%y%m%d_%H%M%S)

gcloud ml-engine jobs submit training $JOBNAME \
   --region=us-central1 \
   --module-name=trainer.task \
   --package-path=./trainer \
   --job-dir=$GCS_BUCKET/$JOBNAME/ \
   -- \
   --output_dir=$GCS_BUCKET/$JOBNAME/output

jobId: housing_170512_193905
state: QUEUED


Job [housing_170512_193905] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ml-engine jobs describe housing_170512_193905

or continue streaming the logs with the command

  $ gcloud ml-engine jobs stream-logs housing_170512_193905


#### Run on cloud (10 cloud ML units)
Because we are using the TF Experiments interface, distributed computing just works! The only change we need to make to run in a distributed fashion is to add the [--scale-tier](https://cloud.google.com/ml/pricing#ml_training_units_by_scale_tier) argument. Cloud ML Engine then takes care of distributing the training across devices for you!


In [95]:
%%bash
GCS_BUCKET=gs://ml-taw/housing  #CHANGE THIS TO YOUR GCS BUCKET
JOBNAME=housing_$(date -u +%y%m%d_%H%M%S)

gcloud ml-engine jobs submit training $JOBNAME \
   --region=us-central1 \
   --module-name=trainer.task \
   --package-path=./trainer \
   --job-dir=$GCS_BUCKET/$JOBNAME \
   --scale-tier=STANDARD_1 \
   -- \
   --output_dir=$GCS_BUCKET/$JOBNAME/output

jobId: housing_170511_054611
state: QUEUED


Job [housing_170511_054611] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ml-engine jobs describe housing_170511_054611

or continue streaming the logs with the command

  $ gcloud ml-engine jobs stream-logs housing_170511_054611


#### Run on cloud GPU (3 cloud ML units)

Also works with GPUs!

"BASIC_GPU" corresponds to one Tesla K80 at the time of this writing, hardware subject to change. 1 GPU is charged as 3 cloud ML units.

In [97]:
%%bash
GCS_BUCKET=gs://ml-taw/housing  #CHANGE THIS TO YOUR GCS BUCKET
JOBNAME=housing_$(date -u +%y%m%d_%H%M%S)

gcloud ml-engine jobs submit training $JOBNAME \
   --region=us-east1 \
   --module-name=trainer.task \
   --package-path=./trainer \
   --job-dir=$GCS_BUCKET/$JOBNAME \
   --scale-tier=BASIC_GPU \
   -- \
   --output_dir=$GCS_BUCKET/$JOBNAME/output

jobId: housing_170511_054706
state: QUEUED


Job [housing_170511_054706] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ml-engine jobs describe housing_170511_054706

or continue streaming the logs with the command

  $ gcloud ml-engine jobs stream-logs housing_170511_054706


#### Run on 8 cloud GPUs (24 cloud ML units)
To train across multiple GPUs you use a [custom scale tier](https://cloud.google.com/ml/docs/concepts/training-overview#job_configuration_parameters).

You specify the number and types of machines you want to run on in a config.yaml, then reference that config.yaml via the --config config.yaml command line argument.

Here I am specifying a master node with machine type complex_model_m_gpu and one worker node of the same type. Each complex_model_m_gpu has 4 GPUs so this job will run on 2x4=8 GPUs total. 

Note your GCP project needs to have approval to run on that many GPUs or else you will get a quota exceeded error.

In [99]:
%%writefile config.yaml
trainingInput:
  scaleTier: CUSTOM
  masterType: complex_model_m_gpu
  workerType: complex_model_m_gpu
  workerCount: 1

Writing config.yaml


In [100]:
%%bash
GCS_BUCKET=gs://ml-taw/housing #CHANGE THIS TO YOUR GCS BUCKET
JOBNAME=housing_$(date -u +%y%m%d_%H%M%S)

gcloud ml-engine jobs submit training $JOBNAME \
   --region=us-east1 \
   --module-name=trainer.task \
   --package-path=./trainer \
   --job-dir=$GCS_BUCKET/$JOBNAME \
   --config config.yaml \
   -- \
   --output_dir=$GCS_BUCKET/$JOBNAME/output

jobId: housing_170511_054853
state: QUEUED


Job [housing_170511_054853] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ml-engine jobs describe housing_170511_054853

or continue streaming the logs with the command

  $ gcloud ml-engine jobs stream-logs housing_170511_054853


In [120]:
###  TESTING   #####
output_dir = './output' #https://storage.googleapis.com/ml-taw/housing/housing_170511_054853/'

est = tf.contrib.learn.DNNRegressor(feature_columns=feature_cols,
                                            hidden_units=[10, 10],
                                            model_dir=output_dir)

#predict_scores(input_fn=generate_input_fn(df.loc[0,FEATURES]))


est.predict_scores(input_fn=generate_input_fn(df.loc[0,FEATURES]))


INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_save_checkpoints_secs': 600, '_num_ps_replicas': 0, '_keep_checkpoint_max': 5, '_tf_random_seed': None, '_task_type': None, '_environment': 'local', '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f6dfb14aa90>, '_tf_config': gpu_options {
  per_process_gpu_memory_fraction: 1
}
, '_task_id': 0, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_evaluation_master': '', '_keep_checkpoint_every_n_hours': 10000, '_master': ''}


AttributeError: 'numpy.int64' object has no attribute 'values'

In [117]:
df.loc[1,FEATURES].as_matrix()

array([3, 2.25, 2570, 7242, 2.0, 0, 0, 3, 7, 2170, 400, 1951, 1991, 98125,
       47.721000000000004, -122.319, 1690, 7639], dtype=object)