### Basic Ray Tutorial and Deep Q Learning

This tutorial focuses on the cart-pole problem. A cart has a pole fixed with a movable lever in the middle of the cart. The cart slides along a frictionless surface. The goal is to keep the pole upright at all times. The test is how far back and forth the cart can move in order to prevent the pole from falling. The tutorial has been modified heavily so that it (i) runs in a jupyter notebook, (ii) demonstrates full capabilities of ray, and ray tune and (iii) breaks down the components of a RL project along with enhanced explainations of the code. We may modify this tutorial further to solve a different problem.

In the second part of the tutorial, we demonstrate how to create a custom reinforcement learning environment with the problem space of a robot walking down a corridor.

#### References:

Barto, A. G., Sutton, R. S. and Anderson, C. (1983), ‘Neuron-like adaptive elements that can solve difficult learning control problems’, IEEE Transactions on Systems, 5, Man, and Cybernetics 13, 834–846

Tune: A Research Platform for Distributed Model Selection and Training, Liaw, Richard and Liang, Eric and Nishihara, Robert and Moritz, Philipp and Gonzalez, Joseph E and Stoica, Ion, arXiv preprint arXiv:1807.05118}, 2018

Ray RLLib Documentation: [Ray RLLib Documentation](https://docs.ray.io/en/master/rllib.html)

Ray Tune Documentation: [Ray Tune Documentation](https://docs.ray.io/en/master/tune/index.html)

Mastering Reinforcement Learning with Python, Enes Bilgin, Packt Publishing, 2020 [Buy MRL with Python](https://www.amazon.com/Mastering-Reinforcement-Learning-Python-next-generation/dp/1838644148/?tag=meastus-200)

In [None]:
#! ray --version

In [None]:
## if using Domino's ray, start ray this way

import ray
import os

if ray.is_initialized() == False:
   service_host = os.environ["RAY_HEAD_SERVICE_HOST"]
   service_port = os.environ["RAY_HEAD_SERVICE_PORT"]
   ray.util.connect(f"{service_host}:{service_port}")

In [None]:
ray.nodes()

### What is Ray and what can it do?

In [None]:
import time
import os
import ray


y = 1
object_ref = y

def add(x, a=1):
    if x == 'add':
        answer = a + 1
    else:
        answer = a
    time.sleep(5)
    print(answer)
    
number_add =add('add')
number_none =add('hello')
        
object_ids = []
st = time.time()
for x in range(2):
    x = x
    y_id = add('add')
    object_ids.append(y_id) # the object ids will print out
    
## getting the results to pass to another function
objects = object_ids
end = time.time()
print(str(end-st))

## Creating remote objects

Put an object in Ray's object store, get it out and run the function
say want to add 10 million and after every million 5 seconds, total processing would be 50 seconds

Do this in ray, and have 10 ray workers, adding 1 million values each, 
after calculating 1 million each sleeps 5 seconds, then total processing takes less than six seconds
iterations in learning 
ml is already iterative, running partitions on each worker and at the distributed sequentially now paralellized
call without ray and then with ray
small amount of data, run and then kick off with same code but a larger data set, locally and in cloud testuse 10 workers, each sleeps 2 seconds, and see the difference

In [None]:
import ray
import time

#ray.init()

y = 1
object_ref = ray.put(y)

@ray.remote
def add(x, a=1):
    if x == 'add':
        answer = a + 1
    else:
        answer = a
    time.sleep(5)
    print(answer)
    
number_add = ray.get(add.remote('add'))
number_none = ray.get(add.remote('hello'))
        
object_ids = []
st = time.time()
for x in range(2):
    x = x
    y_id = add.remote('add')
    object_ids.append(y_id) # the object ids will print out
    
## getting the results to pass to another function
objects = ray.get(object_ids)
end = time.time()
print(str(end-st))

In [None]:
def return_multiple():
    time.sleep(5)
    return 1, 2, 3

st = time.time()
a, b, c = return_multiple()
print(a,b,c)
end = time.time()
print(str(end-st))

In [None]:
@ray.remote(num_returns=3)
def return_multiple():
    time.sleep(5)
    return 1, 2, 3


a, b, c = return_multiple.remote()
st = time.time()
print(ray.get(a), ray.get(b), ray.get(c))
end = time.time()
print(str(end-st))

In [None]:
### calculating pi
import random

NUM_SAMPLES = 15

#@ray.remote
def inside():
 x, y = random.random(), random.random()
 return x*x + y*y

st = time.time()
number = inside()
end = time.time()
print('The answer is: ', number)
print(str(end-st))

In [None]:
import random

NUM_SAMPLES = 15

@ray.remote
def inside():
 x, y = random.random(), random.random()
 return x*x + y*y

st = time.time()
number = ray.get(inside.remote())
end = time.time()
print('The answer is: ', number)
print(str(end-st))

In [None]:
ray.shutdown()

### The Cart Pole Problem

Training with hyperparameter tuning was traditionally very human-time intensive. With the Ray 'tune' tool, hyper-parameter tuning is automated.   

### Training a RL Model using RLLib

RLlib is an open-source library for reinforcement learning that offers both high scalability and a unified API for a variety of applications. RLlib natively supports TensorFlow, TensorFlow Eager, and PyTorch, but most of its internals are framework agnostic. See the docs [here](https://docs.ray.io/en/latest/rllib.html) for more information. 

In [None]:
import ray
from ray.rllib import agents
import pprint as pp
import gym

ray.init()

config = {'gamma': 0.9,       
          'lr': 1e-2,
          'num_workers': 3,
          'train_batch_size': 1000,
          'model': {
              'fcnet_hiddens': [128, 128]
          }}

trainer = agents.dqn.DQNTrainer(env='CartPole-v0') #test vanilla deep q network
#trainer = agents.dqn.ApexTrainer(env='CartPole-v0') #test APEX optimized deep q network if using GPUs
results = trainer.train()
pp.pprint(results)

ray.shutdown()

In [None]:
# Load the TensorBoard notebook extension
#%load_ext tensorboard

In [None]:
#print path to logs
!ls ~/ray_results/

In [None]:
import pandas as pd

# choose the path to your output logs
logs_path = 'DQN_CartPole-v0_2021-11-16_10-29-197o_27f73'
data_path = '~/ray_results/{}/progress.csv'.format(logs_path)

df = pd.read_csv(data_path)
df.head()

In [None]:
# if running on your local machine use the below to access tensorboard
#!tensorboard --logdir=logs/fit

In [None]:
## create dataframe with pertinent information and graph the episode reward mean against the episode  per iteration

#episode_reward_mean = df['episode_reward_mean']
episodes_this_iter = df['episodes_this_iter']
episodes_total = df['episodes_total']
#episodes_reward_mean = df['evaluation/episode_reward_mean']
#episodes_reward_max = df['evaluation/episode_reward_max']
#episodes_reward_min = df['evaluation/episode_reward_min']

df_episodes = pd.DataFrame(episodes_total)
df_episodes['episodes_reward_mean'] = df['episode_reward_mean']
df_episodes["episodes_reward_min"] = df['episode_reward_min']
df_episodes["episodes_reward_max"] = df['episode_reward_max']

df_episodes.plot.line()

## here we see the total number of episodes has increased overall but in each iteration, fewer episodes are required 
## to achieve a higher reward.  We see the algorithm learning more quickly towards the end when it reaches its maximum
## iterations and thus its best rewards.

In [None]:
policy = trainer.get_policy()
model = policy.model
print(model.base_model.summary())

### Creating a Custom Environment and Optimized Hyperparameters

In [None]:
### Creating a custom environment; adapted from Ray documentation

"""Example of a custom gym environment and model. Run this for a demo.

This example shows:
  - using a custom environment
  - using a custom model
  - using Tune for grid search

You can visualize experiment results in ~/ray_results using TensorBoard.
"""
import argparse
import gym
from gym.spaces import Discrete, Box
import numpy as np
import os
import random

import ray
from ray import tune
from ray.tune import grid_search
from ray.rllib.env.env_context import EnvContext
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.tf.fcnet import FullyConnectedNetwork
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFC
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.rllib.utils.test_utils import check_learning_achieved

torch, nn = try_import_torch()
run = 'DQN'
framework = 'torch'
as_test="store_true"
stop_iters = 50
stop_timesteps = 100000
stop_reward = 0.1

class SimpleCorridor(gym.Env):
    """Example of a custom env in which you have to walk down a corridor.

    You can configure the length of the corridor via the env config."""

    def __init__(self, config: EnvContext):
        self.end_pos = config["corridor_length"]
        self.cur_pos = 0
        self.action_space = Discrete(2)
        self.observation_space = Box(
            0.0, self.end_pos, shape=(1, ), dtype=np.float32)
        # Set the seed. This is only used for the final (reach goal) reward.
        self.seed(config.worker_index * config.num_workers)

    def reset(self):
        self.cur_pos = 0
        return [self.cur_pos]

    def step(self, action):
        assert action in [0, 1], action
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        elif action == 1:
            self.cur_pos += 1
        done = self.cur_pos >= self.end_pos
        # Produce a random reward when we reach the goal.
        return [self.cur_pos], \
            random.random() * 2 if done else -0.1, done, {}

    def seed(self, seed=None):
        random.seed(seed)


class CustomModel(TFModelV2):
    """Example of a keras custom model that just delegates to an fc-net."""

    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name):
        super(CustomModel, self).__init__(obs_space, action_space, num_outputs,
                                          model_config, name)
        self.model = FullyConnectedNetwork(obs_space, action_space,
                                           num_outputs, model_config, name)

    def forward(self, input_dict, state, seq_lens):
        return self.model.forward(input_dict, state, seq_lens)

    def value_function(self):
        return self.model.value_function()


class TorchCustomModel(TorchModelV2, nn.Module):
    """Example of a PyTorch custom model that just delegates to a fc-net."""

    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name):
        TorchModelV2.__init__(self, obs_space, action_space, num_outputs,
                              model_config, name)
        nn.Module.__init__(self)

        self.torch_sub_model = TorchFC(obs_space, action_space, num_outputs,
                                       model_config, name)

    def forward(self, input_dict, state, seq_lens):
        input_dict["obs"] = input_dict["obs"].float()
        fc_out, _ = self.torch_sub_model(input_dict, state, seq_lens)
        return fc_out, []

    def value_function(self):
        return torch.reshape(self.torch_sub_model.value_function(), [-1])




The code below is modified from the Ray documents.  The original can be found here:

In [None]:
## run and create the environment; change to work in a notebook

import ray
from ray.tune import grid_search

import os
local_dir = '/domino/datasets/local/{}'.format(os.environ['DOMINO_PROJECT_NAME'])

ray.init(ignore_reinit_error=True)

# Can also register the env creator function explicitly with:
# register_env("corridor", lambda config: SimpleCorridor(config))

ModelCatalog.register_custom_model(
    "my_model", TorchCustomModel
    if framework == "torch" else CustomModel)

config = {
    "env": SimpleCorridor,  # or "corridor" if registered above
    "env_config": {
        "corridor_length": 5,
    },
    # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
    "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
    "model": {
        "custom_model": "my_model",
        "vf_share_layers": True,
    },
    "lr": grid_search([1e-2, 1e-4, 1e-6]),  # try different lrs
    "num_workers": 3,  # parallelism
    "framework": framework,
}

stop = {
    "training_iteration": stop_iters,
    "timesteps_total": stop_timesteps,
    "episode_reward_mean": stop_reward,
}

results = tune.run(run, config=config, stop=stop, local_dir = local_dir)

if as_test:
    check_learning_achieved(results, stop_reward)
    
print("best config: ", results.get_best_config(metric = 'episode_reward_mean', mode = 'max'))

ray.shutdown()

### Finance Example: Predicting Investing Success

### What's Next?

We have two notebooks remaining:

*Using Pytorch and Ray for a simple finance example using DQN*