# Ray Tune Install for Google Collab
**Please run the below code to setup ray in a collab environmet.**

Since ray is not installed by default, and there's a issue with the pre-installed version of pyarrow the below code should handle the installation process so run-all can work nicely. **Note that the first time it's ran, the system will say there was a crash (this is what the os exit step is for). This is on purpose to reload the system.**



In [None]:
try:
  import ray
except:
  ## needed due to an incompatibility with collab
  !pip uninstall -y -q pyarrow
  !pip install -q https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.8.0.dev5-cp36-cp36m-manylinux1_x86_64.whl
  !pip install -q ray[debug]

  import os
  os._exit(0)

[31mERROR: ray-0.8.0.dev5-cp36-cp36m-manylinux1_x86_64.whl is not a supported wheel on this platform.[0m[31m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.6/58.6 MB[0m [31m15.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.7/8.7 MB[0m [31m27.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m158.8/158.8 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m468.5/468.5 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
  # This is so we can run ngrok tunnels if desired so we can view tensorboard in a new tab
  !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
  !unzip ngrok-stable-linux-amd64.zip

  LOG_DIR = './logs'
  get_ipython().system_raw(
      'tensorboard --logdir {} --host 0.0.0.0 --port 6006 &'
      .format(LOG_DIR)
  )

  ! echo "### If you want to view tensorboard in a new tab, use this link:"
  ! curl -s http://localhost:4040/api/tunnels | python3 -c \
        "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

In [None]:
# !pkill tensorboard

## Tensorboard Setup/Run
Tensorboard is a way to visualize/examine the parameters of the data and can be very useful for organizing/examining the data. In addition to providing a web interface for viewing data, it also automatically generates csv files which can be very useful for later viewing the data. 

In [None]:
import datetime, os, shutil
LOG_DIR = "./logs"
# shutil.rmtree(logs_base_dir, ignore_errors=True, onerror=None)
os.makedirs(LOG_DIR, exist_ok=True)
%load_ext tensorboard
%tensorboard --logdir {LOG_DIR}

# Import Data

Here we load the cifar-10 dataset and transform it using binary categorization style matrices

In [None]:
import tensorflow as tf
import numpy as np

imgW,imgH = 32,32
channel_depth = 3
numOfCategories = 10


# Load data
def load_data(nb_classes=10):
    from keras.datasets import cifar10
    from keras.utils import np_utils
    from keras import backend
    backend.set_image_data_format('channels_last')   
    # the data, shuffled and split between train and test sets
    (X_train, y_train), (X_test, y_test) = cifar10.load_data()

    def prepare_X(X):
        return X.astype('float32') / 255

    X_train, X_test = [prepare_X(X) for X in (X_train, X_test)]
    print(X_train.shape[0], 'train image samples of shape: ', X_train.shape[1:])
    print(X_test.shape[0], 'test samples')

    # convert class vectors to binary class matrices (basically one hot)
    Y_train = np_utils.to_categorical(y_train, nb_classes)
    Y_test = np_utils.to_categorical(y_test, nb_classes)

    w = X_train[0].shape[0]
    h = X_train[0].shape[1]


    # Fixes datasets whose shape doesn't imply depth (example, MNIST shape is 28x28 instead of 28,28,1) as they are 2D
    if (np.size(X_train[0].shape) == 2):
      X_train = X_train.reshape(-1, w, h, channel_depth)  # 28x28x1 input img (in the case of MNIST)
      X_test = X_test.reshape(-1, w, h,   channel_depth)  # 28x28x1 input img (in the case of MNIST)    
    
    return X_train, Y_train, X_test, Y_test, (w,h)

trX, trY, teX, teY, shape = load_data()

## Ray Tune Setup
The app tune by ray is a great way to run multiple testing/trainings with various parameters adjusted. by naming each training session based on the parameters we will be comparing to each other, we will be able to visualize the various setups in an organized way through our above tensorboard integration.

In [None]:
import ray
from ray import tune

import math
import io


############ Matplot Helper Functions ############
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
plt.switch_backend('agg') ### Let's us store the matplot images inside of tensorboard (useful for organizing data)

def plot_to_tf_image(figure):
    """Converts the matplotlib plot specified by 'figure' to a PNG image and
    returns it. The supplied figure is closed and inaccessible after this call."""
    # Save the plot to a PNG in memory.
    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    # Closing the figure prevents it from being displayed directly inside
    # the notebook.
    plt.close(figure)
    buf.seek(0)
    # Convert PNG buffer to TF image
    image = tf.image.decode_png(buf.getvalue(), channels=4)
    # Add the batch dimension
    image = tf.expand_dims(image, 0)
    return image


#########################################################################################
######################## Generalized Helper Function ####################################
######## Creates clases dynamically using just a class string and dynamicClassCreater("name.of.class.type")(param,param2=test,...)
### this is useful in cases where you can only pass a string inside another deeply nested function you want to have variable classinstances of
#  Ex: passing an optimizer with variable parameters through a config file into ray tune (which note is multi-threaded process)
#  Using this convenience function, you can have classes a parameter within a ray tune config since you can supply an array of strings.
def dynamicClassCreater(class_str):
    from importlib import import_module
    try:
        module_path, class_name = class_str.rsplit('.', 1)
        module = import_module(module_path)
        return getattr(module, class_name)
    except (ImportError, AttributeError) as e:
        raise ImportError(class_str)

## Callback Class Definitions
Tensorflow 2.0 has a nice callback feature we can use to inject a list of functions which which run at the end of an epoch (or batch depening on settings) so here we define few we may want to add later. Note that accuracy logger callback from below is a CUSTOMIMZED callback which expects an extra parameter supplied inorder to be created.



In [None]:
#########################################################################################
######################## MODEL CALLBACKS ################################################
######## Tensorflow 2.0 has a nice option of supplying callbacks that can be run at the end of each epoch
### in-order to speed things along, we will be creating useful callback here we can use
# for everything from logging into tensorboard, stopping a training session automatically and
# more.

STOPPING_PARAMETER = 'mse'
STOPPING_VALUE = 0.02
class StopperMseCallback(tf.keras.callbacks.Callback):
    def __init__(self):
        self.skipcount = 0

    def on_epoch_end(self, epoch, logs={}):
        self.skipcount+=1
        if (self.skipcount > 5):
            self.skipcount = 0
            res, test = self.model.evaluate()
            print("MODEL BEING EVALUATED")
            print (res)
            try:
                if(logs.get(STOPPING_PARAMETER) < STOPPING_VALUE):   
                    self.model.stop_training = True
            except:
                print(STOPPING_PARAMETER + " not found\n")


class AccuracyLoggerCallback(tf.keras.callbacks.Callback):
    def __init__(self, directory):
        self.directory = directory

    def on_train_end(self, epoch, logs={}): 
        accuracy, mse, _ = self.model.evaluate(teX,teY)
        # Add image to tb summary
        writer = tf.summary.create_file_writer(self.directory)
        with writer.as_default():
            tf.summary.scalar(
                "sanity_accuracy",
                accuracy,
                step=1,
                description="Accuracy from testing at end of training"
        )
            
### example usage of AccuracyLoggerCallback being passed the correct directory for proper tensorboard integration
# AccuracyLoggerCallback(reporter._logdir)       
            
from ray.tune import track
class TuneReporterCallback(tf.keras.callbacks.Callback):
    """Tune Callback for Keras."""

    def __init__(self, reporter=None, freq="batch", logs={}):
        """Initializer.
        Args:
            reporter (StatusReporter|tune.track.log|None): Tune object for
                returning results.
            freq (str): Sets the frequency of reporting intermediate results.
                One of ["batch", "epoch"].
        """
        self.reporter = reporter or track.log
        self.iteration = 0
        
        if freq not in ["batch", "epoch"]:
            raise ValueError("{} not supported as a frequency.".format(freq))
        self.freq = freq
        super(TuneReporterCallback, self).__init__()

    def on_batch_end(self, batch, logs={}):
        if not self.freq == "batch":
            return
        self.iteration += 1
        for metric in list(logs):
            if "loss" in metric and "neg_" not in metric:
                logs["neg_" + metric] = -logs[metric]
        if "acc" in logs:
            self.reporter(keras_info=logs, mean_accuracy=logs["acc"])
        else:
            self.reporter(keras_info=logs, mean_accuracy=logs.get("accuracy"))

    def on_epoch_end(self, batch, logs={}):
        if not self.freq == "epoch":
            return
        self.iteration += 1
        for metric in list(logs):
            if "loss" in metric and "neg_" not in metric:
                logs["neg_" + metric] = -logs[metric]
        if "acc" in logs:
            self.reporter(keras_info=logs, mean_accuracy=logs["acc"])
        else:
            self.reporter(keras_info=logs, mean_accuracy=logs.get("accuracy"))

# Model Configuration
Here we are setting what we are going to actually test/run inside ray tune. Note that tune allows ofr us to define multiple runs of model training with different parameters.

In [None]:

### The default value to be used if none is defined within the config
EPOCH = 15;

def tuneModelInitialize(config, reporter):
    import tensorflow as tf
    from ray.tune import track
    from tensorflow.keras.callbacks import EarlyStopping


    ## Model initialization
    model = tf.keras.models.Sequential()
    optimizer = tf.keras.optimizers.Adam()
    loss = tf.keras.losses.CategoricalCrossentropy()
    metrics = ["categorical_accuracy","mse",]
    earlyStopCB = EarlyStopping(monitor='val_categorical_accuracy', mode='max', verbose=1, patience=80)
    callbacks = [TuneReporterCallback(reporter,freq="epoch"),earlyStopCB]


    ## Example of using dynamicClassCreater to generate an optimizer with parameters to feed into the model inside the thread.
    ## optimizer = dynamicClassCreater("tensorflow.keras.optimizers." + config["optimizer"])(lr=config["lr"],momentum=config["momentum"],)


    try:
      epochs = config['epoch']
    except:
      epochs = EPOCH

    # convience function for compiling/running the model based on the above paramters.
    def compile_and_fit(model):
      model.compile(loss=loss, optimizer=optimizer , metrics=metrics)
      model.summary() ## convenience function to printout the model shape

      model.fit(
        trX,
        trY,
        # batch_size=batch_size,
        epochs=epochs,
        verbose=2,
        validation_data=(teX, teY),
        validation_freq=20,
        callbacks= callbacks
      )


    ###########################################################################################
    #################################### Model Structuring ####################################
    ###########################################################################################
    # This section is where we define all the structure of our NN, using the infor gathered from
    # the configs to actually determine the shape of things
    
    model.add(tf.keras.layers.Input((imgW, imgW, channel_depth)))   ### since our image determines the shape of the input and it's the same for each question, we are adding this layer here just to define the sizing parameters for tf to determine shaping on the next layer

    #### Since ray tune can't pass multiple varied models down inside the callback, we will hardcode the variations of the questions here, and then use the configs passed in to determine which model to use
    if (config['name'] == 'Q3.1'):
        ## this is used to create the tester for Q1. Since we don't really have a starting point, we'll just pick a reasonable number of conv filters (10/layer) with 5x5 kernels, and a halving between convolutions using maxpool.
        ## becuase we want to hopefully grab more subfeatures the deeper we get (cause after pooling, the calculations aren't as expensive we add more layers)
        for index in range(config['num_conv_layers']):
            model.add(tf.keras.layers.Conv2D(tf.dtypes.cast(8**(index+1), tf.int32), kernel_size=(5, 5), strides=(1, 1), activation='relu', padding='same', name='L{}_Conv2D_10x5x5'.format(index)))
            model.add(tf.keras.layers.MaxPooling2D(pool_size=1, strides=2, padding='valid', name='L{}_MaxPool2D_halved'.format(index)))

        currentLayer = config['num_conv_layers'] + 1
        ### here we flatten our 3 layered convolved outputs to fully connect them to our final step.
        model.add(tf.keras.layers.Flatten())
        model.add(tf.keras.layers.Dense(100, activation='relu', name='L{}_Relu_{}'.format(currentLayer,100)))
        model.add(tf.keras.layers.Dense(numOfCategories, activation='softmax', name = 'Output'))




    elif (config['name'] == "Q3.2" and config['mode'] == 'testing'):
        ## this will be used to create the combinations of all the variation of the layer sizes we want to test
        for i, conv_layer_depth in enumerate(config['cnn_features']):
          model.add(tf.keras.layers.Conv2D(conv_layer_depth, kernel_size=(5, 5), strides=(1, 1), activation='relu', padding= 'same'))
          model.add(tf.keras.layers.MaxPool2D(pool_size=1, strides=2, padding='valid'))
    

        ### here we flatten our 3 layered convolved outputs to fully connect them to our final step.
        model.add(tf.keras.layers.Flatten())
        model.add(tf.keras.layers.Dense(100, activation='relu', name='L4_Relu'))
        model.add(tf.keras.layers.Dense(numOfCategories, activation='softmax', name = 'Output'))



    elif (config['name'] == "Q3.2" and config['mode'] == 'final'):
        ## this will be used to create the combinations of all the variation of the layer sizes we want to test
        for i, conv_layer_depth in enumerate(config['cnn_features']):
          model.add(tf.keras.layers.Conv2D(conv_layer_depth, kernel_size=(5, 5), strides=(1, 1), activation='relu', padding= 'same'))
          model.add(tf.keras.layers.MaxPool2D(pool_size=1, strides=2, padding='valid'))
          model.add(tf.keras.layers.Dropout(rate=i*0.25))
    

        ### here we flatten our 3 layered convolved outputs to fully connect them to our final step.
        model.add(tf.keras.layers.Flatten())
        model.add(tf.keras.layers.Dense(120, activation='relu', name='L4_Relu'))
        model.add(tf.keras.layers.Dense(numOfCategories, activation='softmax', name = 'Output'))


    compile_and_fit(model)

In [None]:
## Since we will be using tensorboard to read all the data, this is a nice way to setup the naming scheme so we can filter the parameters
def trial_str_creator(trial):
    return "{}_config={}_trial_id={}".format(trial.config['name'],trial.experiment_tag, trial.trial_id)

def testRunner(config):
    tune.run(
        tuneModelInitialize,
        trial_name_creator = trial_str_creator,
        local_dir = logs_base_dir,
        resources_per_trial={
            "cpu": 2,
            "gpu": 1
        },
        verbose=1,
        config=config,
        num_samples=1,
    )

In [None]:
try:
    tf.get_logger().setLevel('INFO')
except Exception as exc:
    print(exc)
import warnings
warnings.simplefilter("ignore")


## Final Magic Step. Spins up the various configs
ray.shutdown()  # Restart Ray defensively in case the ray connection is lost. 
ray.init(
    local_mode=True,
    memory=9000 * 1024 * 1024,
    object_store_memory=200 * 1024 * 1024,
    driver_object_store_memory=100 * 1024 * 1024,
    log_to_driver=False
)

In [None]:
#################################### MODEL Parameters / Configuration ####################################
## These are the configurations of all our various networks/parameters we want to test.
## 
testing_configs = [
  ## Q3.1
  {
              "name":     "Q3.1",
              "threads": 1,
              "num_conv_layers":    tune.grid_search([1,2,3]),
  },
  ### Q3.2
  ## since we are only testing here to compare approximate values, but we are doing a large number of test to get at the interplay between grids, 
  ## we don't want too many epochs to muddy the amount of calculations performed

  {
              "name":     "Q3.2",
              "threads": 1,
              "mode":         "testing",
              "epoch":        10,
              "cnn_features": [
                  tune.sample_from(lambda spec: np.random.randint(21,23)),
                  tune.sample_from(lambda spec: np.random.randint(44,50)),
                  tune.sample_from(lambda spec: np.random.randint(115,125) ),
               ],
  },
]

# for config in model_configs:
#     testRunner(config)

In [None]:
## Now that we know the size we want we'll test the various parameters
final_config = {
              "name":     "Q3.2",
              "threads": 1,
              "mode":         "final",
              "epoch":        1000,
              "cnn_features": [
                  tune.grid_search([22]),
                  tune.grid_search([32]),
                  tune.grid_search([128]),
               ],
  }

testRunner(final_config)


In [None]:
import shutil
shutil.make_archive('Q2', 'zip', LOG_DIR)

In [None]:
import os
os._exit(0)

In [None]:
from google.colab import files
files.download("Q2.zip")