Structure of an Estimator API ML model


```python

# Define input feature columns

featcols = [
  tf.feature_column.numeric_column("sq_footage")
]

# Instantiate Linear Regression Model
model = tf.estimator.LinearRegressor(featcols, './model_trained')


# Train
def train_input_fn():
  ...
  return features, labels

model.train(train_input_fn, steps=100)


# Predict
def pred_input_fn():
  ...
  
  return features

out = model.predict(pred_input_fn

```



# Encoding categorical data to suply to a DNN

```python

# 1a. If you know the complete vocavulary beforehand:

tf.feature_column.categorical_column_with_vocabulary_list('zipcode',
volcabulary_list = ['83452', '72345', '87654', '98723', '23451']),

# 1b. If your data is already indexed; i.e., has integers in [0-N):

tf.feature_column.categorical_column_with_identity('stateId', num_buckets=50)

# 2. To pass in a categorical column into a DNN, one option is to one-hot encode it:

tf.feature_column.indicator_column(my_categorical_column)

```

# To read CSV files, create a TextLineDataset giving it a function to decode the CSV into features, labels

```python
CSV_COLUMNS = ['sqfootage', 'city', 'amount']
LABEL_COLUMN  = 'amount'
DEFAULTS = [[0.0], ['na'], [0.0]]

def read_dataset(filename, mode, batch_size=512):
  def decode_csv(value_column):
    columns = tf.decode_csv(value_column, record_defaults=DEFAULTS)
    features = dict(zip(CSV_COLUMNS, columns))
    label = features.pop(LABEL_COLUMN)
    return features, label

  dataset = tf.data.TextLineDataset(filename).map(decode_csv)

  if mode == tf.estimator.ModeKeys.TRAIN:
    num_epochs = None # indefinitely
    dataset = dataset.shuffle(buffer_size=10*batch_size)
  else:
    num_epochs = 1 # end-of-input after this
  
  dataset = dataset.repeat(num_epochs).batch(batch_size)
  
  return dataset.make_one_shot_iterator().get_next()
```

# Estimator API comes with a method that handles distributed training and evaluation

```python
import tensorflow as tf

estimator = tf.estimator.LinearRegressor(
  model_dir = output_dir,
  features_columns=feature_cols
)

tf.estimator.train_and_evaluate(
  estimator,
  train_spec,
  eval_spec
)
```
# Benifits of using `tf.estimator.train_and_evaluate`
* Distribute the graph
* Share variables
* Evaluate occasionally
* Handle machine failures
* Create checkpoint files
* Recover from failures
* Save summaries for TensorBoard

# TrainSpec consists of the things that used to be passed into the train() method

```python
import tensorflow as tf

train_spec = tf.estimator.TrainSpec(
  input_fn=read_dataset('gs://.../train*',
  mode = tf.contrib.learn.ModeKeys.TRAIN,
  max_steps=num_train_steps
)

tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

```

# Think `steps` not `epochs`, with production-ready, distributed models.

1. Gradient updates from slow workers could get ignored
2. When retraining a model with fresh data, we'll resume from earlier numbers of steps ( and corresponding hyper-parameters)
 

# EvalSpec controls the evaluation and the checkpointing of the model because they happen at the same time


```python

exporter = ...

eval_spec = tf.estimator.EvalSpec(
  input_fn = read_dataset('gs://.../valid*',
                          mode=tf.contrib.learn.ModeKeys.EVAL),
  steps=None,
  start_delay_secs=60 # start evaluating after N seconds
  throttle_secsx=600,
  exporters=exporter
)

tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

```

# Wide-and-deep network in Estimator API


```python

import tensorflow as tf

model = tf.estimator.DNNLinearCombinedClassifier(
  model_dir=...,
  linear_feature_columns=wide_columns,
  dnn_feature_columns=deep_columns,
  dnn_hidden_units=[100, 50]
  
)

```


# Monitor and experiment with training

```python
from google.datalab.ml import TensorBoard

TensorBoard().start('./babyweight_trained')
```

# Operationalize the model

```python

p = beam.Pipeline()

(p

  | beam.io.ReadFromText('gs://..')

  | bea.Map(Transform)

  | beam.GroupByKey()

  | beam.FlatMap(Filter)

  | beam.io.WriteToText('gs://...')

)

p.run()

```

# An example Beam pipeline for BigQuery->CSV on cloud

```python

import apache_beam as beam

def transform(rowdict):
  import copy
  result = copy.deepcopy(rowdict)
  if rowdict['a'] > 0::
    result['c'] = result['a'] * result['b']
    yeild ','.join([str(result[k]) if k in result else 'None' for k in ['a', 'b', 'c']])

  
if __name__ == "__main__":
  p = beam.Pipeline(argv=sys.argv)

  selquery = 'SELECT a,b FROM someds.sometable'

  (p
  
    |  beam.io.Read(beam.io.BigQuerySource(query=selquery,
                                          user_standard_sql=True))
    |  beam.Map(transform_data) # do some processing
    | beam.io.WriteToText('gs://...') # write output
  )

  p.run() # run the pipeline


```
```text

Simply running main() runs pipeline locally


python ./etl.py

To run on cloud, specify cloud parameters

python ./etl.py \
      --project=$PROJECT\
      --job_name=myjob\
      --stagging_location=gs://$BUCKET/staging/\
      --temp_location=gs://$BUCKET/staging/\
      --runner=DataflowRunner # DirectRunner would be local

```

# Use the gcloud command to submit the training job either locally or to the cloud

```bash

gcloud ml-engine local train \
    --module-name=trainer.task\
    --package-path=/somedir/babyweight/trainer\
    --train_data_paths etc.


gcloud ml-engine jobs submit training $JOBNAME \
    --region=$REGION\
    --module-name=trainer.task\
    --job-dir=$OUTDIR \
    --staging-bucket=gs://$BUCKET \
    --scale-tier=BASIC\
```


In [None]:
import tensorflow as tf