# Tensor Flow Basics -  Distributed Data Read

In [1]:
import shutil
import numpy as np
import tensorflow as tf
tf.logging.set_verbosity(tf.logging.INFO)
print(tf.__version__)

1.13.0-rc1


In [2]:
# Determine CSV, label, and key columns
CSV_COLUMNS = 'X,Y'.split(',')
LABEL_COLUMN = 'Y'
KEY_COLUMN = 'key'

# Set default values for each CSV column
DEFAULTS = [[0.0], [0.0]]
TRAIN_STEPS = 1000

In [3]:
# Create an input function reading a file using the Dataset API
# Then provide the results to the Estimator API
def read_dataset(filename, mode, batch_size = 512):
  def _input_fn():
    def decode_csv(value_column):
      columns = tf.decode_csv(value_column, record_defaults=DEFAULTS)
      features = dict(zip(CSV_COLUMNS, columns))
      label = features.pop(LABEL_COLUMN)
      return features, label
    
    # Create list of files that match pattern
    file_list = tf.gfile.Glob(filename)

    # Create dataset from file list
    # if no header, remove .skip()
   
    dataset = (tf.data.TextLineDataset(file_list).skip(1)  # Read text file
                 .map(decode_csv))  # Transform each elem by applying decode_csv fn
      
    if mode == tf.estimator.ModeKeys.TRAIN:
        num_epochs = None # indefinitely
        dataset = dataset.shuffle(buffer_size=10*batch_size)
    else:
        num_epochs = 1 # end-of-input after this
 
    dataset = dataset.repeat(num_epochs).batch(batch_size)
    return dataset
  return _input_fn

Next, define the feature columns

In [4]:
# Define feature columns
def get_categorical(name, values):
  return tf.feature_column.indicator_column(
    tf.feature_column.categorical_column_with_vocabulary_list(name, values))

def get_cols():
  # Define column types
  return [\
          #get_categorical('is_male', ['True', 'False', 'Unknown']),
          tf.feature_column.numeric_column('X'),
          #get_categorical('plurality',
          #            ['Single(1)', 'Twins(2)', 'Triplets(3)',
          #             'Quadruplets(4)', 'Quintuplets(5)','Multiple(2+)']),
          #tf.feature_column.numeric_column('gestation_weeks')
      ]

To predict with the TensorFlow model, we also need a serving input function. We will want all the inputs from our user.

In [5]:
# Create serving input function to be able to serve predictions later using provided inputs
def serving_input_fn():
    feature_placeholders = {
        #'is_male': tf.placeholder(tf.string, [None]),
        'X': tf.placeholder(tf.float32, [None]),
        #'plurality': tf.placeholder(tf.string, [None]),
        #'gestation_weeks': tf.placeholder(tf.float32, [None])
    }
    features = {
        key: tf.expand_dims(tensor, -1)
        for key, tensor in feature_placeholders.items()
    }
    return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)

In [6]:
# Create estimator to train and evaluate
def train_and_evaluate(output_dir):
  EVAL_INTERVAL = 300
  run_config = tf.estimator.RunConfig(save_checkpoints_secs = EVAL_INTERVAL,
                                      keep_checkpoint_max = 3)
  estimator = tf.estimator.DNNRegressor(
                       model_dir = output_dir,
                       feature_columns = get_cols(),
                       hidden_units = [64, 32],
                       config = run_config)
  train_spec = tf.estimator.TrainSpec(
                       input_fn = read_dataset('../input/LinearRegressionDS/LinearRegressionTrainingData.csv',
                                               mode = tf.estimator.ModeKeys.TRAIN),
                       max_steps = TRAIN_STEPS)
  exporter = tf.estimator.LatestExporter('exporter', serving_input_fn)
  eval_spec = tf.estimator.EvalSpec(
                       input_fn = read_dataset('../input/LinearRegressionDS/LinearRegressionValidData.csv',
                                               mode = tf.estimator.ModeKeys.EVAL),
                       steps = None,
                       start_delay_secs = 60, # start evaluating after N seconds
                       throttle_secs = EVAL_INTERVAL,  # evaluate every N seconds
                       exporters = exporter)
  tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

Finally, train!

In [7]:
# Run the model
shutil.rmtree('../Output/TensorFlow/trained_model', ignore_errors = True) # start fresh each time
train_and_evaluate('../Output/TensorFlow/trained_model')

INFO:tensorflow:Using config: {'_model_dir': '../Output/TensorFlow/trained_model', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 300, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 3, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x000001851C3C5C18>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and 

NotFoundError: Failed to create a directory: ../Output/TensorFlow/trained_model\export\exporter\temp-b'1557168829'; No such file or directory

In [None]:
%ls trained_model