# <div class="alert alert-block alert-info" style="border-width:4px">SBrain Learning Experiment Management Tutorial </div>

# Introduction

In this tutorial, we will explore some of the **SBrain** features related to model learning. **SBrain** takes the deep learning model code written in your notebook and runs it in a cluster as a distributed job, accelerating the learning and allowing data scientists to run more experiments is the same period of time. 
 **SBrain** provides the data scientist with high level abstractions hiding the engineering details such as cluster allocation, tensorflow distributed synchronization and job management.


 In this notebook, we will train a Resnet model from scratch using CIFAR-10 data. Here are the tutorial goals:

 1. Get you familiarized with the main **SBrain** features and abstractions related to learning
 2. Walk you through defining estimators, which encapsulate your model architecture and can be shared and reused by others
 3. Showcase how you can manage/monitor distributed learning jobs running in a cluster, from the notebook
 4. Showcase the benefit of submitting parallel jobs in a cluster configured in a declarative manner, as opposed to running the learning in your own computer;
 5. Configure hyperparameter search space and run multiple jobs in parallel with different configurations and evaluate the metrics for each
 

### Lets try it out

Before we begin, it would be good to copy this notebook and rename it with your name at the end, since we don't want multiple people editing the same notebook at the same time, causing reloading issues.

#### Imports
After that, lets start by importing the necessary packages, mainly learning and dataset. The learning package contains everything related to defining a model learning experiment.
The dataset package contains everything related to defining dataset transformations.

In [None]:
from sbrain.learning.experiment import *
from sbrain.dataset.dataset import *

#### Unique Names

Most of the **SBrain** artifacts you would create in this notebook like estimators, models and jobs, need to have a human readable unique name which others can look up and possibly reuse or inspect. We provide you a helper function here to make the names unique by appending the username and a timestamp at the end, so that you don't run into DuplicateName error every now and then. You can turn this off by changing the flag should_uniquify to False. Please, put your username as the value for the user_name field.

In [None]:
user_name = "albin"

def uniquify(name):
    import time
    should_uniquify = True
    if should_uniquify:
        return name + user_name + str(time.time()).replace(".","")
    else:
        return name

#### SBrain Estimators


**SBrain** exposes **Estimators** as the main abstraction for model definition, evaluation and depoyment. They are higher level APIs which hide a lot of low level details. **SBrain** follows the same style as
TensorFlow Estimators.
More details on TensorFlow Estimator APIs are given in the following links.
- https://www.tensorflow.org/programmers_guide/estimators
- https://www.tensorflow.org/get_started/custom_estimators

The advantage is that anybody who uses TensorFlow can directly plug in their code into **SBrain** and immediately leverage all the capabilities that **SBrain** provides.

#### The model function

One important function that the user has to implement when using estimators is the model_function. In the model_function, we can define the architecture of the neural network, the loss, the training operation etc, and we return a tf.estimator.EstimatorSpec() object with everything that we have defined.

#### Our model function : ResNet and CIFAR-10

 In the below code, we are defining a ResNet architecture and adapting it to CIFAR-10 dataset. Towards the very end of this big function, we are definining the network, loss, optimizer and training operation. This neural network has 44 layers and is a fairly complex neural network to train.

 Since, **SBrain** captures this function and executes it in a cluster, references to any libraries that you may use inside this function should be imported on the top inside the function. This helps you isolate your notebook environment/variables from your cluster code context.

 More comments are provided all throughout the code below. For more details on TensorFlow model_function abstraction please read the below article.

 - https://www.tensorflow.org/get_started/custom_estimators#write_a_model_function

In [None]:
def cifar_model_function(features, labels, mode, params):
    ## Importing relevant packages
    import tensorflow as tf
    import numpy as np
    ########## Defining ResNet structure as a class. #############

    class ResNet(object):
        """ResNet model."""

        def __init__(self, is_training, data_format, batch_norm_decay, batch_norm_epsilon):
            """ResNet constructor.

            Args:
              is_training: if build training or inference model.
              data_format: the data_format used during computation.
                           one of 'channels_first' or 'channels_last'.
            """
            self._batch_norm_decay = batch_norm_decay
            self._batch_norm_epsilon = batch_norm_epsilon
            self._is_training = is_training
            assert data_format in ('channels_first', 'channels_last')
            self._data_format = data_format

        def forward_pass(self, x):
            raise NotImplementedError(
                'forward_pass() is implemented in ResNet sub classes')

        def _residual_v1(self,
                         x,
                         kernel_size,
                         in_filter,
                         out_filter,
                         stride,
                         activate_before_residual=False):
            """Residual unit with 2 sub layers, using Plan A for shortcut connection."""

            del activate_before_residual
            with tf.name_scope('residual_v1') as name_scope:
                orig_x = x

                x = self._conv(x, kernel_size, out_filter, stride)
                x = self._batch_norm(x)
                x = self._relu(x)

                x = self._conv(x, kernel_size, out_filter, 1)
                x = self._batch_norm(x)

                if in_filter != out_filter:
                    orig_x = self._avg_pool(orig_x, stride, stride)
                    pad = (out_filter - in_filter) // 2
                    if self._data_format == 'channels_first':
                        orig_x = tf.pad(orig_x, [[0, 0], [pad, pad], [0, 0], [0, 0]])
                    else:
                        orig_x = tf.pad(orig_x, [[0, 0], [0, 0], [0, 0], [pad, pad]])

                x = self._relu(tf.add(x, orig_x))

                tf.logging.info('image after unit %s: %s', name_scope, x.get_shape())
                return x

        def _conv(self, x, kernel_size, filters, strides, is_atrous=False):
            """Convolution."""

            padding = 'SAME'
            if not is_atrous and strides > 1:
                pad = kernel_size - 1
                pad_beg = pad // 2
                pad_end = pad - pad_beg
                if self._data_format == 'channels_first':
                    x = tf.pad(x, [[0, 0], [0, 0], [pad_beg, pad_end], [pad_beg, pad_end]])
                else:
                    x = tf.pad(x, [[0, 0], [pad_beg, pad_end], [pad_beg, pad_end], [0, 0]])
                padding = 'VALID'
            return tf.layers.conv2d(
                inputs=x,
                kernel_size=kernel_size,
                filters=filters,
                strides=strides,
                padding=padding,
                use_bias=False,
                data_format=self._data_format)

        def _batch_norm(self, x):
            if self._data_format == 'channels_first':
                data_format = 'NCHW'
            else:
                data_format = 'NHWC'
            return tf.contrib.layers.batch_norm(
                x,
                decay=self._batch_norm_decay,
                center=True,
                scale=True,
                epsilon=self._batch_norm_epsilon,
                is_training=self._is_training,
                fused=True,
                data_format=data_format)

        def _relu(self, x):
            return tf.nn.relu(x)

        def _fully_connected(self, x, out_dim):
            with tf.name_scope('fully_connected') as name_scope:
                x = tf.layers.dense(x, out_dim)

            tf.logging.info('image after unit %s: %s', name_scope, x.get_shape())
            return x

        def _avg_pool(self, x, pool_size, stride):
            with tf.name_scope('avg_pool') as name_scope:
                x = tf.layers.average_pooling2d(
                    x, pool_size, stride, 'SAME', data_format=self._data_format)

            tf.logging.info('image after unit %s: %s', name_scope, x.get_shape())
            return x

        def _global_avg_pool(self, x):
            with tf.name_scope('global_avg_pool') as name_scope:
                assert x.get_shape().ndims == 4
                if self._data_format == 'channels_first':
                    x = tf.reduce_mean(x, [2, 3])
                else:
                    x = tf.reduce_mean(x, [1, 2])
            tf.logging.info('image after unit %s: %s', name_scope, x.get_shape())
            return x

    ########## End ResNet class #############

    ####### Subclassing ResNet specific to CIFAR-10 ###########

    class ResNetCifar10(ResNet):
        """Cifar10 model with ResNetV1 and basic residual block."""

        def __init__(self,
                     num_layers,
                     is_training,
                     batch_norm_decay,
                     batch_norm_epsilon,
                     data_format='channels_first'):
            super(ResNetCifar10, self).__init__(
                is_training,
                data_format,
                batch_norm_decay,
                batch_norm_epsilon
            )
            self.n = (num_layers - 2) // 6
            # Add one in case label starts with 1. No impact if label starts with 0.
            self.num_classes = 10
            self.filters = [16, 16, 32, 64]
            self.strides = [1, 2, 2]

        def forward_pass(self, x, input_data_format='channels_last'):
            """Build the core model within the graph."""
            if self._data_format != input_data_format:
                if input_data_format == 'channels_last':
                    # Computation requires channels_first.
                    x = tf.transpose(x, [0, 3, 1, 2])
                else:
                    # Computation requires channels_last.
                    x = tf.transpose(x, [0, 2, 3, 1])

            # Image standardization.
            x = x / 128 - 1

            x = self._conv(x, 3, 16, 1)
            x = self._batch_norm(x)
            x = self._relu(x)

            # Use basic (non-bottleneck) block and ResNet V1 (post-activation).
            res_func = self._residual_v1

            # 3 stages of block stacking.
            for i in range(3):
                with tf.name_scope('stage'):
                    for j in range(self.n):
                        if j == 0:
                            # First block in a stage, filters and strides may change.
                            x = res_func(x, 3, self.filters[i], self.filters[i + 1],
                                         self.strides[i])
                        else:
                            # Following blocks in a stage, constant filters and unit stride.
                            x = res_func(x, 3, self.filters[i + 1], self.filters[i + 1], 1)

            x = self._global_avg_pool(x)
            x = self._fully_connected(x, self.num_classes)

            return x
    ####### End ResNetCifar10 class ###########

    ######### Here we define all the hyperparameters, network, loss, optimzier and training operations ##################

    ## Hyperparams
    num_layers = 44

    # batch_norm_decay = 0.997
    batch_norm_decay = params["batch_norm_decay"]
    batch_norm_epsilon = 1e-5
    # weight_decay = 2e-4
    weight_decay = params["weight_decay"]
    learning_rate = 0.1

    is_training = (mode == tf.estimator.ModeKeys.TRAIN)
    ## Neural network architecture
    model = ResNetCifar10(
        num_layers,
        batch_norm_decay=batch_norm_decay,
        batch_norm_epsilon=batch_norm_epsilon,
        is_training=is_training,
        data_format="channels_last")

    data = tf.feature_column.input_layer(features, [tf.feature_column.numeric_column("data", shape=(32,32,3))])
    data = tf.reshape(data, (-1,32,32,3))
    logits = model.forward_pass(data, input_data_format='channels_last')

    if mode == tf.estimator.ModeKeys.PREDICT:
        predictions = {
            'class_ids': tf.argmax(input=logits, axis=1),
            'probabilities': tf.nn.softmax(logits),
            'logits': logits,
        }
        return tf.estimator.EstimatorSpec(mode, predictions=predictions)
    ## Defining Loss
    labels = tf.string_to_number(labels,out_type=tf.int32)
    loss = tf.losses.sparse_softmax_cross_entropy(logits=logits, labels=labels)
    loss = tf.reduce_mean(loss)
    model_params = tf.trainable_variables()
    loss += weight_decay * tf.add_n([tf.nn.l2_loss(v) for v in model_params])
    predictions = tf.argmax(logits, axis=1)

    ## Compute evaluation metrics.
    accuracy = tf.metrics.accuracy(labels=labels, predictions=tf.argmax(logits, axis=1),
                                   name='acc_op')
    metrics = {'accuracy': accuracy}
    tf.summary.scalar('accuracy', accuracy[1])

    if mode == tf.estimator.ModeKeys.EVAL:
        return tf.estimator.EstimatorSpec( mode, loss=loss, eval_metric_ops=metrics)

    assert mode == tf.estimator.ModeKeys.TRAIN

    ## Create optimizer
    num_batches_per_epoch = 45000 // 64
    boundaries = [ num_batches_per_epoch * x for x in np.array([82, 123, 300], dtype=np.int64)]
    staged_lr = [learning_rate * x for x in [1, 0.1, 0.01, 0.002]]
    learning_rate = tf.train.piecewise_constant(tf.train.get_global_step(), boundaries, staged_lr)
    optimizer = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.9)

    ## Create global step and training operation
    global_step = tf.train.get_global_step()
    train_op = optimizer.minimize(loss, global_step=global_step)

    ## Some print operations for better logging.
    update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
    with tf.control_dependencies(update_ops):
        with tf.control_dependencies([train_op]):
            train_op = tf.Print(predictions, [predictions, tf.shape(predictions), "predictions"], summarize=32)
            train_op = tf.Print(global_step, [global_step])

    ## Return Estimator Spec with loss and training operation
    return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op, training_chief_hooks=None)

### More SBrain Abstractions

The model function we defined above captures the structure of the network, loss and training operation. The next step is to tie this up to other **SBrain** abstractions.

Here, we define a new **SBrain** classification estimator, passing in the same model_function that we defined earlier. This gives us an **SBrain** object which packages your model function.

In [None]:
classification_estimator = Estimator.NewClassificationEstimator(model_fn=cifar_model_function)

Now we save this estimator as an asset in the **SBrain** environment, with a name we choose.

In [None]:
name = uniquify("MyFirstResnetCifar10Estimator")
saved_estimator = Estimator.create(estimator_name=name,
                                   description="ResNet Cifar10 estimator trial",
                                   estimator_obj=classification_estimator)

At this point, we have created an estimator in the **SBrain** environment. Anybody can look it up and reuse the same estimator with different hyper parameters and run configurations to try out different variants of the same experiment. We will walk you through it below later.

#### Listing estimators
If you would like to see the estimators that you or others created, use the below code to list them. This will list all the estimators.

In [None]:
all_estimators = Estimator.list_all()

#### Searching Estimators
Also you can search for specific ones using name/description as shown in the code below.

In [None]:
all_resnet_estimators = Estimator.search(description="ResNet")

#### Hyper Parameters

Now, let us define the hyper parameters. 
The first two parameters, 'iterations' and 'batch_size' are used by **SBrain** to initiate the training. But the rest of them are passed in to the model function that you defined earlier
 as params, so that you can dynamically adapt to the hyper parameters provided at runtime.
 
 Hyperparametes can be specified in 2 ways:
 
 1. specify the constant values for all the parameters using the abstraction HParams as shown below:
 
 ```python
 hyper_parameters = HParams(iterations=5000,
                           batch_size=32,
                           batch_norm_decay=0.9,
                           batch_norm_epsilon=1e-5,
                           weight_decay=2e-4,
                           learning_rate=0.1)
```
                           
 2. specify an array of values for the parameters to try, using the HyperParamsSpace abstraction.
 The experiment will generate all combinations for the parameters and run a job for each combination.
 
 ```python
 space = HyperParamsSpace([
        HParamValues.discrete_list(HParamValues.ITERATIONS, [500, 1000]),
        HParamValues.constant_value(HParamValues.BATCH_SIZE, 128),
        HParamValues.constant_value("weight_decay", 2e-4),
        HParamValues.discrete_list("batch_norm_decay", [0.997, 0.99])
    ])
    grid = space.grid_search()
```

In this tutorial we are going to try the hyperparameter search using the HyperParamsSpace abstraction.  We specify only a few values to reduce the number of combinations. Also we run for very small number of iterations 500 and 1000. 

In [None]:
space = HyperParamsSpace([
        HParamValues.discrete_list(HParamValues.ITERATIONS, [500, 1000]),
        HParamValues.constant_value(HParamValues.BATCH_SIZE, 128),
        HParamValues.constant_value("weight_decay", 2e-4),
        HParamValues.discrete_list("batch_norm_decay", [0.997, 0.99])
    ])

grid = space.grid_search()

#### Run Config

Run Config lets you define how this job should be run on the cluster. You can specify the following parameters
- **no_of_ps** : no of parameters servers to run. In the current release this should be at least 1.
- **no_of_workers** : no of workers decide the parallelism we want during training. Try 1 for now. Later we will try 2. For the current cluster this is limited to 2, but this can be bigger.
- **summary_save_frequency** - How often you should see the summary of your metrics in the tensorboard.
- **run_eval** - If True, will run evaluation in parallel to training, and report an evaluation graph along with the training one in the tensorboard
- **use_gpu** - If True, will run your job on a GPU machine in the cluster.

All the relevant execution configurations can be declaratively defined in this single line. You can control your training parallelism, CPU/GPU, eval and summary saving all
in this single declarative definition.

All learning jobs under this experiment will use the same run configuration.


In [None]:
run_config = RunConfig(no_of_ps=1, no_of_workers=1, summary_save_frequency=10, run_eval=False, use_gpu=True)

#### DataSetSplit

Now we should tie this to the data. Dataset is an **SBrain** abstraction for operations on large sets of data. For training purpose, we pre-created a DataSetSplit which is division of a DataSet into train, validation and eval sets. Refer to the DataSet notebook [DataSetManagement-Basic.ipynb](../dataset-management/DataSetManagement-Basic.ipynb) for details on how they are created. 

 Now we look up a split that we have already created for CIFAR-10 data. The split ratio is **68,16,16** to be consistent with the CIFAR-10 **train,validation,eval** split.
 
 **NOTE**: If running the cell below returns error "Split not found", please run the [Cifar10 DataPreparation For Learning Tutorials.ipynb](./Cifar10%20DataPreparation%20For%20Learning%20Tutorials.ipynb)

In [None]:
cifar_10_split = DataSetSplit.lookup(dataset_name="cifar10-demo", dataset_version_name="v1", split_name="cifar-10-split")
print(cifar_10_split.name)

#### Run the Experiment

Now it is time to run this experiment. Here we tie everything that we have created and define an experiment.
Give it a model name that makes sense, so that you can refer to it later. The attributes are given below.
 - **model_name** : Same name for the model and LearningJob that is created that we can refer to later to be deployed or inspected.
 - **description** : Description for the model/job
 - **estimator** : A saved estimator we already created.
 - **hparams_search_settings** : Hyper parameter search settings
 - **run_config** : The run configuration
 - **dataset_version_split** : split for feeding in data.

This code returns a LearningJob object. Go ahead and execute it

In [None]:
experiment_name = uniquify("Resnet_CIFAR10_model")

In [None]:
experiment = Experiment.run(experiment_name=experiment_name,
                                description="ResNet Model trained on Cifar10 data",
                                estimator=saved_estimator,
                                hparams_search_settings=grid,
                                run_config=run_config,
                                dataset_version_split=cifar_10_split,
                                input_function=None)

<p>At this point, you have started an experiment run. It will start multiple training jobs in **SBrain**. It is currently executing in the cluster. The Experiment object is your handle to the training job/s that are currently running in the cluster which are part of the particular experiment. </p>


In [None]:
experiment.has_finished()

The below methods will report the statuses of number of jobs under this experiment. List jobs will list all jobs under this experiment

In [None]:
experiment.report_status()

experiment.list_jobs()

This experiment will currently spawn jobs in parallel (currently two at a time). Once one job finishes, if there are more to run it will start the next one.

<font color=red>**IMPORTANT**</font> : Currently, the cluster resources are limited and if multiple people are running at the same time, the cluster will wait for earlier submitted experiments to finish
for yours to get started. This may mean the following:
- The tensorboard link may take too long respond, becuase tensorboard is waiting for resource to execute.
- Even if the tensorboard has come up, other details like graph/scalars may take too long to come up because training is waiting for resources.

Eventually, the experiment will be scheduled and the training should proceed.

<font color=blue>**Coming Soon**</font> : In future, a web UI will be provided for experiment and jobs monitoring and inspection of various logs (for eg. print statements in model function).
Also, further experiment management features like cancelling will be exposed.

#### LearningJob  Handle and Tensorboard Link

Lets look up a specific job under this experiment for more details.

In [None]:
job = LearningJob.lookup(name="{}_Trial_Job-1".format(experiment_name))
print("tensorboard url")
print(job.get_tensorboard_url())


Try the above tensorboard link in a new tab. This tensorboard gives you access to the metrics/graph of the currently running training.

In the tensorboard, look for the 'Scalars' tab for all the metrics to appear there. The 'Graph' tab shows the ResNet graph that you have built. It may take a while (may be 5 minutes) for the 'Scalars' tab to appear based on the speed of metrics being written. Just keep refreshing the tensorboard until it appears.

For any issues you might encounter, read below and watch out for sections marked <font color=red>**IMPORTANT**</font> for troubleshooting.

Keep watching/refreshing the scalars tab to see how the loss and accuracy are proceeding in both training and validation. This way we can compare accuracy/loss between training and
validation so that we can look for overfitting. Any other metrics that you define in the model function will show up on the tensorboard.

Let's query the job for its status and see if it is finished.


Below, we wait for the all the jobs under this experiment to finish. Anytime, there is a change in progress, this method notifies you. It is a blocking call and waits for all the jobs under this experiment to finish. 

In [None]:
experiment.wait_until_finish()
experiment.list_jobs()

After the job completion, you can verify whether it failed or succeeded. 

In [None]:
experiment.report_status()

We are done!!