# RISE one. Presentation Document, presented by group 7.

### youtube link: https://www.youtube.com/watch?v=MhteN-1sYwg


# Topic 1.Ray
  
Currently, Ray is only available for Linux and MacOS
  
how to install Ray (MacOS): `pip install -U ray`

In [None]:
import ray
import time
import numpy as np
import pickle

In [None]:
ray.init(num_cpus=4,num_gpus=4,resources={'CustomResource1': 1, 'CustomResource2': 4},
         include_webui=False, ignore_reinit_error=True)

## Task
    1) ray functions cannot be called directly
    2) use ray.get() to retrieve the value

In [None]:
def slow(i):
    time.sleep(1.0)
    return i

@ray.remote
def fast(i):
    time.sleep(1.0)
    return i

In [None]:
time.sleep(2.0)
start_time = time.time()

results = [slow(i) for i in range(10)]

end_time = time.time()
duration = end_time - start_time
print(duration)

In [None]:
time.sleep(2.0)
start_time = time.time()

results = [fast.remote(i) for i in range(10)]
results = ray.get(results)

end_time = time.time()
duration = end_time - start_time
print(duration)

In [None]:
@ray.remote
def first(a):
    time.sleep(2.0)
    a += 1
    return a

@ray.remote
def second(b):
    time.sleep(2.0)
    b += 1
    return b

@ray.remote
def thrid(c):
    time.sleep(2.0)
    c += 1
    return c

@ray.remote
def forth(d):
    time.sleep(2.0)
    d += 1
    return d

In [None]:
a = 0

a = first.remote(a)
b = second.remote(a)
c = thrid.remote(b)
d = forth.remote(c)

d = ray.get(d)

print(d)

## Actor

In [None]:
class Foo(object):
    def __init__(self):
        self.counter = 0

    def reset(self):
        self.counter = 0

    def increment(self):
        time.sleep(0.5)
        self.counter += 1
        return self.counter

In [None]:
f1 = Foo()
f2 = Foo()
f3 = Foo()

time.sleep(2.0)
start_time = time.time()

f1.reset()
f2.reset()
f3.reset()

results = []
for _ in range(5):
    results.append(f1.increment())
    results.append(f2.increment())
    results.append(f3.increment())
    
end_time = time.time()
duration = end_time - start_time

print(results)
print(duration)

In [None]:
@ray.remote
class Fooboo(object):
    def __init__(self):
        self.counter = 0

    def reset(self):
        self.counter = 0

    def increment(self):
        time.sleep(0.5)
        self.counter += 1
        return self.counter

In [None]:
fb1 = Fooboo.remote()
fb2 = Fooboo.remote()
fb3 = Fooboo.remote()

time.sleep(2.0)
start_time = time.time()

fb1.reset.remote()
fb2.reset.remote()
fb3.reset.remote()

results = []
for _ in range(5):
    results.append(fb1.increment.remote())
    results.append(fb2.increment.remote())
    results.append(fb3.increment.remote())
results = ray.get(results)

end_time = time.time()
duration = end_time - start_time

print(results)
print(duration)

## ray.wait()

In [None]:
@ray.remote
def f():
    time.sleep(np.random.uniform(0, 5))
    return time.time()

In [None]:
time.sleep(2.0)
start_time = time.time()

result_ids = [f.remote() for _ in range(10)]

results = []
for i in range(len(result_ids)):
    ready_ids,result_ids = ray.wait(result_ids,1,None)
    result = ray.get(ready_ids[0])
    results.append(result)
    print('Processing result which finished after {} seconds.'
          .format(result - start_time))

end_time = time.time()
duration = end_time - start_time

## ray.put()

In [None]:
neural_net_weights = {'variable{}'.format(i): np.random.normal(size=1000000)
                      for i in range(50)}

@ray.remote
def use_weights(weights, i):
    return i

In [None]:
time.sleep(2.0)
start_time = time.time()
para = ray.put(neural_net_weights)
results_ids = ray.get([use_weights.remote(para, i) for i in range(20)])

end_time = time.time()
duration = end_time - start_time
print(duration)

## GPU API and User-defined Resources

In [None]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = ','.join([str(i) for i in ray.get_gpu_ids()])

In [None]:
@ray.remote(num_gpus=1)
def f_gpu():
    time.sleep(1)
    return ray.get_gpu_ids()

In [None]:
time.sleep(2)
start_time = time.time()

runs = [f_gpu.remote() for _ in range(8)]
runs = ray.get(runs)

end_time = time.time()
duration = end_time - start_time
print(duration)

In [None]:
@ray.remote(resources={'CustomResource2':2})
def f_custom():
    time.sleep(1)

In [None]:
time.sleep(2)
start_time = time.time()

runs = [f_custom.remote() for _ in range(5)]
runs = ray.get(runs)

end_time = time.time()
duration = end_time - start_time
print(duration)

## pandas vs modin.pandas

In [None]:
import pandas as pd
import modin.pandas as mpd

In [None]:
%%time
csv_pd = pd.read_csv('log20161201.csv',low_memory=False)
top = csv_pd.head()

# CPU times: user 1min 55s, sys: 4min 8s, total: 6min 4s
# Wall time: 10min 41s

In [None]:
%%time
csv_mpd = mpd.read_csv('log20161201.csv',low_memory=False)
top = csv_mpd.head()

# CPU times: user 152 ms, sys: 316 ms, total: 468 ms
# Wall time: 5min 48s

# Topic 2. RLlib Exercise  - Proximal Policy Optimization

**GOAL:** The goal of this exercise is to demonstrate how to use the proximal policy optimization (PPO) algorithm.

To understand how to use **RLlib**, see the documentation at http://rllib.io.

PPO is described in detail in https://arxiv.org/abs/1707.06347. It is a variant of Trust Region Policy Optimization (TRPO) described in https://arxiv.org/abs/1502.05477

PPO works in two phases. In one phase, a large number of rollouts are performed (in parallel). The rollouts are then aggregated on the driver and a surrogate optimization objective is defined based on those rollouts. We then use SGD to find the policy that maximizes that objective with a penalty term for diverging too much from the current policy.

![ppo](https://raw.githubusercontent.com/ucbrise/risecamp/risecamp2018/ray/tutorial/rllib_exercises/ppo.png)

**NOTE:** The SGD optimization step is best performed in a data-parallel manner over multiple GPUs. This is exposed through the `num_gpus` field of the `config` dictionary (for this to work, you must be using a machine that has GPUs).

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import gym
import ray
from ray.rllib.agents.ppo import PPOAgent, DEFAULT_CONFIG
from ray.tune.logger import pretty_print

Start up Ray. This must be done before we instantiate any RL agents.

In [None]:
ray.init(ignore_reinit_error=True)

Instantiate a PPOAgent object. We pass in a config object that specifies how the network and training procedure should be configured. Some of the parameters are the following.

- `num_workers` is the number of actors that the agent will create. This determines the degree of parallelism that will be used.
- `num_sgd_iter` is the number of epochs of SGD (passes through the data) that will be used to optimize the PPO surrogate objective at each iteration of PPO.
- `sgd_minibatch_size` is the SGD batch size that will be used to optimize the PPO surrogate objective.
- `model` contains a dictionary of parameters describing the neural net used to parameterize the policy. The `fcnet_hiddens` parameter is a list of the sizes of the hidden layers.

In [None]:
config = DEFAULT_CONFIG.copy()
config['num_workers'] = 3
config['num_sgd_iter'] = 30
config['sgd_minibatch_size'] = 128
config['model']['fcnet_hiddens'] = [100, 100]
config['num_cpus_per_worker'] = 0  # This avoids running out of resources in the notebook environment when this cell is re-executed

agent = PPOAgent(config, 'CartPole-v0')

Train the policy on the `CartPole-v0` environment for 2 steps. The CartPole problem is described at https://gym.openai.com/envs/CartPole-v0.

**EXERCISE:** Inspect how well the policy is doing by looking for the lines that say something like

```
total reward is  22.3215974777
trajectory length mean is  21.3215974777
```

This indicates how much reward the policy is receiving and how many time steps of the environment the policy ran. The maximum possible reward for this problem is 200. The reward and trajectory length are very close because the agent receives a reward of one for every time step that it survives (however, that is specific to this environment).

In [None]:
for i in range(2):
    result = agent.train()
    print(pretty_print(result))

**EXERCISE:** The current network and training configuration are too large and heavy-duty for a simple problem like CartPole. Modify the configuration to use a smaller network and to speed up the optimization of the surrogate objective (fewer SGD iterations and a larger batch size should help).

In [None]:
config = DEFAULT_CONFIG.copy()
config['num_workers'] = 3
config['num_sgd_iter'] = 30
config['sgd_minibatch_size'] = 128
config['model']['fcnet_hiddens'] = [100, 100]
config['num_cpus_per_worker'] = 0

agent = PPOAgent(config, 'CartPole-v0')

**EXERCISE:** Train the agent and try to get a reward of 200. If it's training too slowly you may need to modify the config above to use fewer hidden units, a larger `sgd_minibatch_size`, a smaller `num_sgd_iter`, or a larger `num_workers`.

This should take around 20 or 30 training iterations.

In [None]:
for i in range(2):
    result = agent.train()
    print(pretty_print(result))

Checkpoint the current model. The call to `agent.save()` returns the path to the checkpointed model and can be used later to restore the model.

In [None]:
checkpoint_path = agent.save()
print(checkpoint_path)

Now let's use the trained policy to make predictions.

**NOTE:** Here we are loading the trained policy in the same process, but in practice, this would often be done in a different process (probably on a different machine).

In [None]:
trained_config = config.copy()

test_agent = PPOAgent(trained_config, 'CartPole-v0')
test_agent.restore(checkpoint_path)

Now use the trained policy to act in an environment. The key line is the call to `test_agent.compute_action(state)` which uses the trained policy to choose an action.

**EXERCISE:** Verify that the reward received roughly matches up with the reward printed in the training logs.

In [None]:
env = gym.make('CartPole-v0')
state = env.reset()
done = False
cumulative_reward = 0

while not done:
    action = test_agent.compute_action(state)
    state, reward, done, _ = env.step(action)
    cumulative_reward += reward

print(cumulative_reward)

# Topic 3. Tune

Tune is a scalable framework for model training and hyperparameter search with a focus on deep learning and deep reinforcement learning.

**Code**: https://github.com/ray-project/ray/tree/master/python/ray/tune

**Examples**: https://github.com/ray-project/ray/tree/master/python/ray/tune/examples

**Documentation**: http://ray.readthedocs.io/en/latest/tune.html

**Mailing List** https://groups.google.com/forum/#!forum/ray-dev

# Overview

Tuning hyperparameters is often the most expensive part of the machine learning workflow. Tune is built to address this, demonstrating an efficient and scalable solution for this pain point.


## Before getting started!

Be sure to use Jupyter Notebook instead of Jupyter Lab for this tutorial! The URL should look something like:

    http:/[[current hostname]]/camp/ray/jupyter/notebooks/tune_exercises/Tune.ipynb

## Outline
This tutorial will walk you through the following process:

1. Creating and training a model on a toy dataset (MNIST)
2. Integrating Tune into your workflow
3. Trying out advanced features - plugging in an efficient scheduler
4. Validating your trained model
5. (Optional) Try out a search algorithm


In [None]:
from helper import *
import numpy as np
from IPython.display import HTML
import matplotlib.pyplot as plt

import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.preprocessing.image import ImageDataGenerator

limit_threads(4)
%load_ext autoreload
%autoreload 2

## PART 1: Creating a model to be trained.

Let's create a Convolutional Neural Network model. Convolutional Neural Networks (ConvNets or CNNs) are a category of Neural Networks that have proven very effective in areas such as image recognition and classification. The details of how a Convolutional Neural Network works are unimportant here, but you're welcome to read more about them here: http://cs231n.github.io/convolutional-networks/

<img src="cnn.png" alt="MNIST Visualization" width="800"/>


This convolutional neural network model will be used classify digits (from the MNIST dataset).

<img src="mnist.png" alt="MNIST Visualization" width="400"/>

This is a fairly simple dataset, but it enables us to explore Tune's functionality in depth.
We will use 60,000 images to train the network. The images are 28x28 NumPy arrays, with pixel values ranging between 0 and 255. The labels are an array of integers, ranging from 0 to 9. These correspond to the digit the image represents.

Here, we'll specify some arguments and some reasonable defaults for this model. These are the hyperparameters settings that we will later use to further optimize this model.

In [None]:
import argparse
parser = argparse.ArgumentParser(description='Keras MNIST Example')
parser.add_argument('--lr', type=float, default=0.1, help='learning rate')
parser.add_argument('--momentum', type=float, default=0.0, help='SGD momentum')
parser.add_argument('--kernel1', type=int, default=3, help='Size of first kernel')
parser.add_argument('--kernel2', type=int, default=3, help='Size of second kernel')
parser.add_argument('--poolsize', type=int, default=2, help='Size of Poolin')
parser.add_argument('--dropout1', type=float, default=0.25, help='Size of first kernel')
parser.add_argument('--hidden', type=int, default=4, help='Size of Hidden Layer')
parser.add_argument('--dropout2', type=float, default=0.5, help='Size of first kernel')

DEFAULT_ARGS = vars(parser.parse_known_args()[0])

This below function will create and return a Convolutional Neural Network. You don't need to modify this function

In [None]:
def make_model(parameters):
    config = DEFAULT_ARGS.copy()  # This is obtained via the global scope
    config.update(parameters)
    num_classes = 10
    
    model = Sequential()
    model.add(Conv2D(32, kernel_size=(config["kernel1"], config["kernel1"]),
                     activation='relu', input_shape=(28, 28, 1)))
    model.add(Conv2D(64, (config["kernel2"], config["kernel2"]), activation='relu'))
    model.add(MaxPooling2D(pool_size=(config["poolsize"], config["poolsize"])))
    model.add(Dropout(config["dropout1"]))
    model.add(Flatten())
    model.add(Dense(config["hidden"], activation='relu'))
    model.add(Dropout(config["dropout2"]))
    model.add(Dense(num_classes, activation='softmax'))

    model.compile(loss=keras.losses.categorical_crossentropy,
                  optimizer=keras.optimizers.SGD(
                      lr=config["lr"], momentum=config["momentum"]),
                  metrics=['accuracy'])
    return model

### Exercise: Setup a basic model training script.

The process of training the neural network model occurs as follows:

1. Feed the training data to the model—in this example, the `batch_of_data` and `batch_of_labels` arrays.
2. The model learns to associate images and labels.

**Exercise**: Fill out the TODO in the code below. Here are a few hints:

1) `data_generator` is an iterator that returns (`batch_of_data`, `batch_of_labels`), like follows:

```python
for batch_of_data, batch_of_labels in data_generator:
    do_something_interesting()
```
2) You can use `model.fit(batch_of_data, batch_of_labels)` to repeatedly improve the model.

In [None]:
def train_mnist(args):
    """Loads data, does one pass over the data, and saves the weights."""
    data_generator = load_data()
    model = make_model(args)
    for batch_of_data, batch_of_labels in data_generator:
        model.fit(batch_of_data, batch_of_labels)
    model.save_weights("./weights.h5")
    return model

Let's run this above training script to make sure things work.

In [None]:
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
first_model = train_mnist(DEFAULT_ARGS)

In [None]:
evaluate(first_model)

Let's now quickly try out this model to see if it works as expected. We'll load the model with our trained weights.

**Exercise**: Try to write a digit into the box below. This will automatically save your input in a variable `data` behind the scenes.

In [None]:
data = 78
HTML(open("input.html").read())

(tip: don't expect it to work)

In [None]:
prepared_data = prepare_data(data)
print("This model predicted your input as", first_model.predict(prepared_data).argmax())

**You've now set up a model that we can use Tune to optimize!**

## Part 2: Setting up Tune

One thing we might want to do now is find better hyperparameters so that our model trains more quickly and possibly to a higher accuracy. Let's make some minor modifications to utilize Tune. 

Tune uses Ray as a backend, so we will first import and initialize Ray.

In [None]:
import ray
from ray import tune

ray.init(ignore_reinit_error=True)

Tune will automate and distribute your hyperparameter search by scheduling a number of **trials** on a machine. Each trial runs a user-defined Python function with a sampled set of hyperparameters. 

### Exercise: Two steps to use Tune:

Step 1) For the function you wish to tune, we need to change the signature to a specific format as shown below. Specifically: **pass in a ``reporter`` object to the below `train_mnist_tune` class**.

```python
def trainable(config, reporter):
    """
    Args:
        config (dict): Parameters provided from the search algorithm
            or variant generation.
        reporter (Reporter): Handle to report intermediate metrics to Tune.
    """
```

Step 2) We want to keep track of performance as the model is training. Specifically: **get the `mean_accuracy` from Keras, and call the ``reporter`` to report the `mean_accuracy` for every batch**. 

You can get model accuracy from Keras with the following code:

```python
mean_accuracy = model.evaluate(x_batch, y_batch)[1]
```


Example of using the reporter:

```python
def train_func(config, reporter):  # add a reporter arg
    # ...
    for data, target in dataset:
        model.fit(data, target)
        save_model()
        accuracy = model.evaluate(x_batch, y_batch)[1]
        reporter(mean_accuracy=accuracy, metric2=1, metric3=0.3, ...) # report metrics
```


In [None]:
def train_mnist_tune(config): ### TODO: Change this function signature following step 1 #####
    data_generator = load_data()
    model = make_model(config)
    for i, (x_batch, y_batch) in enumerate(data_generator):
        model.fit(x_batch, y_batch, verbose=0)
        if i % 3 == 0:
            last_checkpoint = "weights_tune_{}.h5".format(i)
            model.save_weights(last_checkpoint)
        ### Don't change above ############### 
        
        ### TODO: Use the reporter here to fill out intermediate metrics following step 2###
        reporter(mean_accuracy=None, ## Change me
                 timesteps_total=i, 
                 checkpoint=last_checkpoint)
        

In [None]:
# This may take 30 seconds or so to run if incorrectly written
assert test_reporter(train_mnist_tune)

**If you've done this correctly, you now have properly converted your function to be Tune-compatible!**

### Exercise: Let's now try to search over some parameters. 

*NOTE: You can find the documentation for this section here: https://ray.readthedocs.io/en/latest/tune-usage.html#specifying-experiments*

In this section, we'll use some basic Tune features for training - namely specifying a stopping criteria and a search space. 

Let's first create a Tune Experiment specification. The relevant documentation for the Experiment class is here:

```python
class ray.tune.Experiment(name, run, stop=None, config=None, ... ):
    """Tracks experiment specifications.

    Parameters:
        name (str): Name of experiment.
        run (function|class|str): The algorithm or model to train.
            This may refer to the name of a built-on algorithm
            (e.g. RLLib's DQN or PPO), a user-defined trainable
            function or class, or the string identifier of a
            trainable function or class registered in the tune registry.
        stop (dict): The stopping criteria. The keys may be any field in
            the return result of 'train()', whichever is reached first.
            Defaults to empty dict.
        config (dict): Algorithm-specific configuration for Tune variant
            generation (e.g. env, hyperparams). Defaults to empty dict.
            Custom search algorithms may ignore this.
        trial_resources (dict): Machine resources to allocate per trial,
            e.g. ``{"cpu": 64, "gpu": 8}``. Note that GPUs will not be
            assigned unless you specify them here. Defaults to 1 CPU and 0
            GPUs in ``Trainable.default_resource_request()``.
        ...
```

**Part 1**: First, **set the stopping criteria to when `mean_accuracy` passes `0.95`**. For example, to specify that trials will be stopped whenever they report `arbitrary_metric` that is `>= 500`, do:

```python
stop={"arbitrary_metric": 500}
```


**Part 2**: We also want to designate a search space. We'll search over *learning rate*, which sets the step size of our model update, and *momentum*, which helps accelerate gradients vectors in the right directions, thus leading to faster converging.

You can use `tune.grid_search` to specify an axis of a grid search. By default, Tune also supports sampling parameters from user-specified lambda functions, which can be used independently or in combination with grid search.  The following example shows grid search over a set of values combined with random sampling from a lambda functions, generating 3 different trials. 

```python
configuration = tune.Experiment(
    # ...
    config={
        "arbitrary_parameter1": lambda spec: np.random.uniform(0.1, 100),
        "arbitrary_parameter2": tune.grid_search([16, 64, 256]),
        # ...
    }
)
```

Specifically, 
1. randomly search for learning rate `"lr"` between 0.001 to 0.1,
2. do a grid search over `"momentum"` for `[0.2, 0.4, 0.6]` 

In [None]:
configuration = tune.Experiment(
    "experiment_name",
    run=train_mnist_tune,
    trial_resources={"cpu": 4},
    stop={},  # TODO: Part 1
    config={}  # TODO: Part 2
)

assert configuration.spec.get("stop", {}).get("mean_accuracy") == 0.95
assert "grid_search" in configuration.spec.get("config", {}).get("momentum", {})
assert "lr" in configuration.spec.get("config", {})
print("Success!")

Now, we can run our experiment with a single line of code. 

*Note*: Be sure pay attention to the `acc` metric next to each running trial. That indicates the most recently reported mean accuracy for that trial. This should evaluate in less than a minute. The output will look something similar to:

```
== Status ==
Using FIFO scheduling algorithm.
Resources requested: 8/8 CPUs, 0/1 GPUs
Result logdir: .../ray_results/experiment_name
RUNNING trials:
 - train_mnist_tune_0_lr=0.085836,momentum=0.2:	RUNNING [pid=44320], 4 s, 3 iter, 0.406 acc
 - train_mnist_tune_1_lr=0.062562,momentum=0.4:	RUNNING [pid=44321], 3 s, 2 iter, 0.219 acc
 - train_mnist_tune_2_lr=0.099461,momentum=0.6:	RUNNING [pid=44317], 3 s, 2 iter, 0.281 acc
 ```

In [None]:
trials = tune.run_experiments(configuration, verbose=False)

You can expect the result below to be about `0.6`, although your mileage may vary (and it's OK).

In [None]:
print("The best result is", get_best_result(trials, metric="mean_accuracy"))

**You've run your first Tune experiment!**

# Topic 4. Clipper
First, we start Clipper.

In [None]:
import logging, xgboost as xgb, numpy as np
from clipper_admin import ClipperConnection, DockerContainerManager
cl = ClipperConnection(DockerContainerManager())

In [None]:
ClipperConnection.start_clipper(cl)

Register our application.
register_application(name, input_type, default_output, slo_micros)

In [None]:
cl.register_application('xgboost-test', 'integers', 'default_pred', 100000)

Get some training data.

In [None]:
def get_test_point():
    return [np.random.randint(255) for _ in range(784)]

Create an XGBoost model. 

In [None]:
# Create a training matrix.
dtrain = xgb.DMatrix(get_test_point(), label=[0])
# We then create parameters, watchlist, and specify the number of rounds
# This is code that we use to build our XGBoost Model, and your code may differ.
param = {'max_depth': 2, 'eta': 1, 'silent': 1, 'objective': 'binary:logistic'}
watchlist = [(dtrain, 'train')]
num_round = 2
bst = xgb.train(param, dtrain, num_round, watchlist)

Define our predict function for our model. 

In [None]:
def predict(xs):
    return bst.predict(xgb.DMatrix(xs))

Now that we have a model, as well as a predict function, we must deploy our model container. 

In [None]:
from clipper_admin.deployers import python as python_deployer
# We specify which packages to install in the pkgs_to_install arg.
# For example, if we wanted to install xgboost and psycopg2, we would use
# pkgs_to_install = ['xgboost', 'psycopg2']
python_deployer.deploy_python_closure(cl, name='xgboost-model', version=1,
     input_type="integers", func=predict, pkgs_to_install=['xgboost'])

The next step is to link the model container to our app, 
so that Clipper can route requests for the “xgboost-test” application to the “xgboost-model” container.

In [None]:
cl.link_model_to_app('xgboost-test', 'xgboost-model')

In [None]:
import requests, json
# Get Address
addr = cl.get_query_addr()
# Post Query
response = requests.post(
     "http://%s/%s/predict" % (addr, 'xgboost-test'),
     headers={"Content-type": "application/json"},
     data=json.dumps({
         'input': get_test_point()
     }))
result = response.json()

In [None]:
if response.status_code == requests.codes.ok and result["default"]:
    print('A default prediction was returned.')
elif response.status_code != requests.codes.ok:
    print(result)
    raise BenchmarkException(response.text)
else:
    print('Prediction Returned:', result)

In [None]:
cl.stop_all()