

<p align="center">
<img src="https://softwareengineeringdaily.com/wp-content/uploads/2020/02/ray-logo.png" alt="drawing" width="400" />
</p>

# Why Ray ? 
As you all know, Python is an interpreted language: An interpreter executes the lines of code one by one. 
As a result, a python program takes much longer to execute than a binary file compiled with C or C++. 
Ok, but for a lot of reasons, we don't want to use C++ ...
What's left : 
- **Optimize your code**: 

it seems obvious and yet we often forget it. Of course it has its limits ...

- **Multithreading**: 

Distribute your code in several tasks that all have access to the same memory. However, if you know well python, you know the limits imposed by the GIL (Global Lock Interpreter) :
<p align="center">
<img src="https://pbs.twimg.com/media/EZzAw78WAAE7d_D.jpg" alt="drawing" width="400" />
</p> 


- **Multiprocessing**: 

We're left with multiprocessing, but there are multiple ways to implement multiprocessing. You can use the python multiprocessing library. Also you will have to deal with the memory. In addition, when you want to scale efficiently on an HPC you will have to use a different framework like MPI. 
Also, You will probably have to rethink your machine learning application from the beginning.

**The answer to your problems is Ray** : 

Ray offers an extremely simple api to distribute your code with minimal changes. In addition, Ray offers a whole bunch of libraries that allow machine learning applications while optimizing the use of resources to get the best models, as quickly as possible. 
Finally, one of the most interesting capabilities (to me) is that Ray scales very well on clusters.



Today we will see the different librairies that ray offers but the focus will be on RLlib. 
**Part 1** of this tutorial presents very simply how ray core works. **The second** part gives an overview of the different machine learning libraries that ray offers and refers to introductory tutorials, the idea being just to draw your attention to the fact that these libraries exist.
**The third** part is the heart of the tutorial, in this part we will see how to use RLlib a library of Ray that allows to do distributed RL.

1. Introduction to Ray Core (25 min):
  * init 
  * remote 
  * serialization
  * ray on HPC (links)
2. Ray librairies overview (10 min):
  * Ray Data (links)
  * Ray Train (links)
  * Ray Tune (links)
3. TP RLLIB (2h40): 
  * RLib overview 
  * First training : **CartPole-v0 with PPO** (1h)
    * Hyper-parameters setting
    * Ray Tune API
    * Tensorboard or Weights and Biases : Framework to monitor your training
    * Customize your training
<p align="center">
<img src="https://bytepawn.com/images/cartpole.gif" alt="drawing" width="400" />
</p>
  * Second training : **Pong with Rainbow** (1h40)
    * Hyper-parameters setting
    * Custom Environment
    * Change your Model (Neural Network)
    * Change the Loss (implementing Contrastive Loss)
    * Quiz

<p align="center">
<img src="https://blog.floydhub.com/content/images/2018/12/gif1.gif" alt="drawing" width="400" />
</p>

  * Optional (Also i'm here to assist): 
    * Test other algorithms
    * Create your own environment 





# Ray core
Ray Core provides a small number of core primitives (i.e., tasks, actors, objects) for building and scaling distributed applications. Below we’ll walk through simple examples that show you how to turn your functions and classes easily into Ray tasks and actors, and how to work with Ray objects.

See more : [Ray Core](https://docs.ray.io/en/latest/ray-core/walkthrough.html)

Fist let's install the library on your Colab sever :

In [None]:
!pip install ray > /dev/null 2>&1

In [None]:
! ray --version 

In [None]:
import pickle
print(pickle.format_version)

## Init
Ray provides a really simple API that allows you to initialize the module and use all its parallelization capabilities :

In [None]:
import ray 
ray.init()

Now that the ray server has been initialized, you can check your available ressources : 

In [None]:
ressources = ray.available_resources()

* Unless you already changed your runtime type, the default one has no GPU. If you want a GPU : you can change the runtime type in the settings (Also you need to rerun everything)
* Also you can call the ray init with multiple args such as **num_cpus** or **nump_gpus** to specify explicitly the ressources you will be using. Important when working on shared machines. 
See all the [parameters](https://docs.ray.io/en/latest/ray-core/package-ref.html#ray-init) of ray.init.
* If you call twice ray.init in the notebook it will crash. You can call **ray.shutdown** if you want to ... shutdown the ray server ! :)

## Remote


Let's start with a simple example which make sense in an industrial context : let's create a function that counts up to n and check how long does it take to execute 2 times

In [None]:
import time 

def usefull(n):
  count=0
  for _ in range(n):
    count+=1
  return count

n=int(4e8)
t0=time.time()
[usefull(n) for k in range(2)]
print("It takes {}s to count 2 times up to {} without ray".format(time.time()-t0,n))


Now with ray :

In [None]:
@ray.remote
def usefull(n):
  count=0
  for _ in range(n):
    count+=1
  return count

n=int(4e8)
t0=time.time()
ray.get([usefull.remote(n) for k in range(2)])
print("It takes {}s to count 2 times up to {} with ray".format(time.time()-t0,n))

Ok it's faster. Unfortunately we can't see the clear advantages of using ray core because on Colab we have only 2 cores. But you get the idea.
Also, it makes sense to use ray when the task at hand consumes more resources than the cost of setting up ray.

## Serialization

To better understand how Ray works you need to understand its [key concepts](https://docs.ray.io/en/latest/ray-core/key-concepts.html) 
Basically there are three concepts : 

* Tasks (which we just saw)
* Actors (allowing to distribute classes and their methods)
* Objects (allowing to distribute any object so that it can be called from any node)

When ray is being used in a cluster mode, the informations needed to compute the task on the node are stored on the RAM of the node.
Also some informations are not serializable making it impossible for ray to store.
Let's see how to store object in the [Object store](https://docs.ray.io/en/releases-1.11.0/ray-core/memory-management.html)

In [None]:
y=1
y_obj=ray.put(y)
print('obj ref : ', y_obj)
print('obj value : ', ray.get(y_obj))

Ray uses [cloudpickle](https://github.com/cloudpipe/cloudpickle) with which you can serialize **almost** anything. 
Now let's say you to want to use ray to distribute the training of your agent on a custom environment which call a specific framework. 
You won't be able to serialize the environment. 

In [None]:
import sqlite3
class Env:
    def __init__(self, path):
        self.path = path
        self.conn = sqlite3.connect(path) #this can't be serialized

original = Env("/tmp/db")
ray_obj = ray.get(ray.put(original))

To enable the serialization, you need to provide ray with the serializable data needed to rebuilt the object on another node. This is done with __reduce__.

In [None]:
import sqlite3
class Env:
    def __init__(self, path):
        self.path = path
        self.conn = sqlite3.connect(path) #this can't be serialized

    def __reduce__(self):
        deserializer = Env
        serialized_data = (self.path,)
        return deserializer, serialized_data

original = Env("/tmp/db")
ray_obj = ray.get(ray.put(original))

usefull tips : 

* **Ray actor definition** : You only need to decorate the class with ray.remote
* **ray.remote args** : you can specify the ressources used by each actors with the args of the decorator
* **Object memory management** : In the ray.init or ray.remote you can specify the capacity (or the capacity used) of the object store memory to limit the RAM usage. 

## Ray on HPC

As you can see ray is really simple and it allows you distribute efficiently your code with no major changes. 
The most outstanding thing is that once you have designed your code to integrate Ray, there is almost nothing to do to run it on an HPC. So you can forget about abominations like MPI.
Unfortunately we won't have time to run the example (This part is just to let you know that this feature exists).
If you want to run basic example on Pando, here is all you need : 

Doc for running on [ AWS clursters ](https://docs.ray.io/en/latest/cluster/vms/user-guides/launching-clusters/index.html).

Doc for running on [Slurm](https://docs.ray.io/en/latest/cluster/vms/user-guides/community/slurm.html) (Pando)

# Ray librairies overview

Unfortunately we won't have time to discover in depth all the librairies.
Here is a brief summary of the capabilities of these librairies

## Ray Data

Ray Data is a library for building distributed data pipelines with Ray. It provides a high-level interface for defining and executing data processing tasks, as well as tools for managing the lifecycle of those tasks. 
Ray Datasets also simplify general purpose parallel GPU and CPU compute in Ray; for instance, for GPU batch inference. They provide a higher-level API for Ray tasks and actors for such embarrassingly parallel compute, internally handling operations like batching, pipelining, and memory management.

Usefull link : [Processing NYC taxi data using Ray Datasets](https://docs.ray.io/en/latest/data/examples/nyc_taxi_basic_processing.html)

## Ray Train 
Ray Train scales model training for popular ML frameworks such as Torch, XGBoost, TensorFlow, and more. It seamlessly integrates with other Ray librairies such as Tune:
<p align="center">
<img src="https://docs.ray.io/en/latest/_images/train-specific.svg" alt="drawing" width="600" />
</p> 

Here is the [Quick Start](https://docs.ray.io/en/latest/train/train.html#quick-start)

## Ray Tune
Ray Tune is a library that allows you to perform an optimal hyperparameter search for a given training. Indeed, not only this library allows to realize these evaluations in a distributed way but it also allows to improve this research with state-of-the-art methods such as bayesian optimization.
Here is the [Quick start](https://docs.ray.io/en/latest/tune/getting-started.html#tune-tutorial)

# RLlib
<p align="center">
<img src="https://docs.ray.io/en/latest/_images/rllib-logo.png" alt="drawing" width="300" />
</p> 

## RLlib overview

RLlib Algorithm classes coordinate the distributed workflow of running rollouts and optimizing policies. Algorithm classes leverage parallel iterators to implement the desired computation pattern. The following figure shows synchronous sampling, the simplest of these patterns:

<p align="center">
<img src="https://docs.ray.io/en/latest/_images/a2c-arch.svg" alt="drawing" width="600" />
</p> 

As it can be seen on the figure, RLlib uses multiple RolloutWorkers which are actually ray core Actors in order to maximize the number of sample collected. 
Once the ReplayBuffer filled, the trainer sample batches and train the model (learner). Once the model updated, the new weights are sent to the Rollout Workers. And it goes on ...

That's actually what's going on underneath, Also there are multiple API levels which allow you to customize the workflow. We will start with the high level APIs and finish with low levels.

## CartPole with PPO
Let's install everything so that we can use [gym](https://www.gymlibrary.dev/) properly.

In [None]:
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
!apt-get update > /dev/null 2>&1
!apt-get install cmake > /dev/null 2>&1
!pip install --upgrade setuptools 2>&1
!pip install ez_setup > /dev/null 2>&1
!pip install gym > /dev/null 2>&1
!pip install gym[classic_control] > /dev/null 2>&1
!git clone https://github.com/Paul-antoineLeTolguenec/ray_course.git

### Environment 
We can't render the envrionment in colab so for each rollout we will record a video and watch it afterwards.

In [None]:
from pyvirtualdisplay import Display
import pygame
import gym
from gym import logger as gymlogger
from ray_course.gym_recorder import *
gymlogger.set_level(40) #error only
display = Display(visible=0, size=(1400, 900)) #display
display.start()

env=wrap_env(gym.make('CartPole-v1')) #env

# rollout
s = env.reset()
d=False
while not d:
    env.render('rgb_array')
    a = env.action_space.sample() 
    s, r, d, i = env.step(a) 
env.close()
show_video()

### Hyper-parameter setting

Now we are going to use the PPO (Proximal Policy Optimization).
Usefull links :     
* [PPO article](https://arxiv.org/abs/1707.06347)
* [PPO summary](https://paperswithcode.com/method/ppo#:~:text=Proximal%20Policy%20Optimization%2C%20or%20PPO,using%20only%20first%2Dorder%20optimization.) 
* [PPO definition in RLlib](https://docs.ray.io/en/latest/rllib/rllib-algorithms.html#ppo)
Before going on that part it might be usefull to refresh your memory using the PPO summary.

Now we are going to setup the configuration of the PPO algorithm using a simple dictionary.

In [None]:
CONFIG = {
    #COMON config
		"env": 'CartPole-v1',
		# "env_config": ENV_CONFIG, #the env config is the dictionary that's pass to the environment when built
		"num_gpus": 0,
		"num_workers": 1, # int(ressources['CPU'])
		"explore": True,
		"exploration_config": {
			"type": "StochasticSampling",
		},
    "framework": "tf2", #I prefer tensorflow but feel free to use pytorch
    # PPO config
    "gamma": 0.95,
    "use_critic": True,
    "use_gae": True, #Generalized Advantage Estimate
    "lambda": 1,
    "kl_coeff": 0.2,
    "rollout_fragment_length":1024, #number of steps in the environment for each Rollout Worker
    "train_batch_size": 1024, 
    "sgd_minibatch_size": 64,
    "shuffle_sequences": True, #Kind of experience replay for PPO
    "num_sgd_iter": 16,
    "lr": 1e-3,
    "lr_schedule": None,
    "vf_loss_coeff": 1.0,
    "model": {
        "vf_share_layers": False, 
    },
    "entropy_coeff": 0.0,
    "entropy_coeff_schedule": None,
    "clip_param": 0.4,
    "vf_clip_param": 10.0,
    "grad_clip": None,
    "observation_filter": "NoFilter"
	}
 


Questions : 

* What's the clip_param ? 
* How does it affect the training ?

**Usefull tips** : There used to be a common conf dictionary where you could access all the variables of the configuration (but since the new version, i'm not able to access it anymore): 
* [Here](https://chuacheowhuan.github.io/RLlib_trainer_config/) is the file i'm talking about.
* Alternatively you can import the Algorithm config from RLlib and plot it as a dict : example below


In [None]:
from ray.rllib.algorithms.ppo import PPOConfig
algo = PPOConfig()
algo.to_dict()

Now that the config is defined, we are ready to train : 

In [None]:
from ray.rllib.algorithms.ppo import PPO
ray.shutdown() #shutdown before re-init
ray.init() #re-init
algo = PPO(config=CONFIG)
for epoch in range(30):
	result=algo.train()
	print('epoch : ',epoch)

Once, you consider the training over, you can save the model so that it can be reused later : 

In [None]:
checkpoint_dir = algo.save() #save the model 
print(f"Checkpoint saved in directory {checkpoint_dir}") 
ray.shutdown()

Let's see how to re-instanciate the model you trained :

In [None]:
from ray.rllib.algorithms.algorithm import Algorithm
algo = Algorithm.from_checkpoint(checkpoint_dir) #load the state of the algorithm where it was : Optimizer state, weights, ...
policy=algo.get_policy() #get the policy 

Now, let's eval the model in the environment : 

In [None]:
import numpy as np
from pyvirtualdisplay import Display
import gym
# Eval
display = Display(visible=0, size=(1400, 900)) #display
display.start()
env = gym.make('CartPole-v1')
episode_reward = 0
d = False
s = env.reset()
while not d:
    env.render('rgb_array')
    logits,_= policy.model({'obs': np.expand_dims(s,axis=0)})
    a=np.argmax(logits)
    s,r,d,i= env.step(a)
    episode_reward += r
env.close()
show_video()

Note : As you can see, i used my own policy (argmax) on the logits. Alternatively, the policy has the method **compute_single_action**. However (for some really weird reasons) this method sample an action from the distribution instead of giving the optimal action. Also there is NO way (or maybe i didn't find it) to make it deterministic. (if you find a way please be kind and tell me)

### Ray Tune (RLlib API)
When sing an algorithm to train your model, it might be usefull to test different combinations of hyper-parameters. The ray Tune API allow you this feature. The following example, show you how to lunch multiple training to test various learning rates with Tune.

In [None]:
from ray.rllib.algorithms.ppo import PPOConfig
from ray import air, tune
ray.shutdown()
ray.init()

config = PPOConfig().training(lr= tune.grid_search([0.01, 0.001, 0.0001])).rollouts(num_rollout_workers=1).resources(num_gpus=0).environment(env="CartPole-v1")

tuner = tune.Tuner(
    "PPO",
    run_config=air.RunConfig(
        stop={"episode_reward_mean": 200},
        local_dir="./results", 
        name="PPO"
    ),
    param_space=config,
)

results = tuner.fit()

# Get the best result based on a particular metric.
best_result = results.get_best_result(metric="episode_reward_mean", mode="max")

# Get the best checkpoint corresponding to the best result.
best_checkpoint = best_result.checkpoint

### TensorBoard or Weights and Biaises (Monitoring your algorithm)

We just saw how to use the basic API of RLlib. However we didn't have much data during the training (which can be nice for monitoring). 
These informations are essential because they can allow you, for example, to stop a training when assymptotically we see that there is no learning.
From now you have multiple solutions :

* **Tensorboard** :
<p align="center">
<img src="https://www.tensorflow.org/static/site-assets/images/project-logos/tensorboard-logo-social.png" alt="drawing" width="400" />
</p> 

TensorBoard stores the data localy on logfile and you can access the interface in your browser on the port you specified.
It's well suited for RLlib because the library generates its own logfile




In [None]:
!pip install -U tensorboardx > /dev/null 2>&1

In [None]:
%tensorboard --logdir /content/results/PPO/NAME_OF_YOUR_EXPERIMENT_FOLDER

* **Weights and Biases** :

<p align="center">
<img src="https://assets.website-files.com/5ac6b7f2924c656f2b13a88c/6077a58f02c7ef0e37fde627_weights%20and%20biases%20workspace.jpg" alt="drawing" width="400" />

Weights and Biases is a great tool to visualize data in real time (or near real time). 
The advantage over tensorboard is that the data is stored in the cloud.
Steps to use weights and biases : 
1. Create an account : [here](https://wandb.ai/site)
2.Copy your API key : [here](https://wandb.ai/home)

In [None]:
!pip install wandb > /dev/null 2>&1

In [None]:
! wandb login

Basic usage of wandb: 

In [None]:
import wandb
wandb.init(project='Test')
wandb.run.name='run'
for k in range(50):
    wandb.log({'data': k})


To use wandb with Ray, it is necessary to implement a specific CallBack( functions that are called to produce the log) that call wandb.
[Here](https://docs.ray.io/en/latest/_modules/ray/rllib/algorithms/callbacks.html#DefaultCallbacks) is the DefaultCallback. You can overide functions so that you output your own metrics. I propose an implementation that uses wandb in the ray_course.custom_callbacks. Feel free to modify it. In the below example we see how to specify the CustomCallback : 

In [None]:
from ray_course.custom_callbacks import CustomCallbacks
from ray.tune.logger import pretty_print

ray.shutdown()
ray.init() #re-init
CONFIG = {
    #COMON config
		"env": 'CartPole-v1',
		# "env_config": ENV_CONFIG,
		"num_gpus": 0,
		"num_workers": 1, # int(ressources['CPU'])
    "gamma": 0.95,
		"explore": True,
		"exploration_config": {
			"type": "StochasticSampling",
		},
    'reuse_actors':True,
    "framework": "tf2", #I prefer tensorflow but feel free to use pytorch
    "callbacks": CustomCallbacks,
    # PPO config
    "use_critic": True,
    "use_gae": True,
    "lambda": 1,
    "kl_coeff": 0.2,
    "rollout_fragment_length":1024,
    "train_batch_size": 1024,
    "sgd_minibatch_size": 64,
    "shuffle_sequences": True,
    "num_sgd_iter": 16,
    "lr": 1e-3,
    "lr_schedule": None,
    "vf_loss_coeff": 1.0,
    "model": {
        "vf_share_layers": False,
    },
    "entropy_coeff": 0.0,
    "entropy_coeff_schedule": None,
    "clip_param": 0.4,
    "vf_clip_param": 10.0,
    "grad_clip": None,
    "kl_target": 0.01,
    "batch_mode": "truncate_episodes",
    "observation_filter": "NoFilter"
	}
algo = PPO(config=CONFIG)
for k in range(10):
	result=algo.train()
	print(pretty_print(result))
ray.shutdown()

**Task for you**


Now that you have seen these tools. Try to optimize the training, play with the hyper-parmeters, customize your own metrics, ... 
Make the model learn to control the CartPole.

# Pong with **Rainbow**

Now we're going to see another algorithm on another environment
 
Fisrt let's install the atri suite for gym

In [None]:
!pip install "gym[atari]" "gym[accept-rom-license]" > /dev/null 2>&1

Normally you need to : **RESTART RUNTIME**

Let's check our new environment 

In [None]:
import gym
import ray
from pyvirtualdisplay import Display
import pygame
import gym
from gym import logger as gymlogger
from ray_course.gym_recorder import *
gymlogger.set_level(40) #error only
display = Display(visible=0, size=(1400, 900)) #display
display.start()
env=wrap_env(gym.make('PongDeterministic-v0')
) #env
# rollout
s = env.reset()
print(s.shape)
d=False
while not d:
    env.render('rgb_array')
    a = env.action_space.sample() 
    s, r, d, i = env.step(a) 
env.close()
show_video()

Useful links for Rainbow algorithm: 

* [Rainbow article](https://arxiv.org/abs/1710.02298)
* [Rainbow summary](https://yhyu13.github.io/2017/12/16/DeepMind-Rainbow-Review/)
* [Rainbow on RLlib](https://docs.ray.io/en/latest/rllib/rllib-algorithms.html#dqn)


Now let's see if we can learn a good policy with rainbow. Here is the new CONFIG and the training.

In [None]:
import ray
from ray_course.custom_callbacks import CustomCallbacks
from datetime import datetime
import wandb
from ray.rllib.algorithms.dqn import DQN
ray.shutdown()
ray.init() #re-init
CONFIG = {
    #COMON config
		"env": 'PongDeterministic-v0',
		"num_workers": 1, # int(ressources['CPU'])
        "gamma": 0.95,
		"explore": True,
		"exploration_config": {
			"type": "StochasticSampling",
		},
        "framework": "tf2", #I prefer tensorflow but feel free to use pytorch
        "callbacks": CustomCallbacks,
        # rainbow config
        'env_config':{},  # deterministic
        'num_gpus': 1,
        'gamma': 0.99,
        'lr': .0001,
        'replay_buffer_config':
            {'type': 'MultiAgentPrioritizedReplayBuffer',
            'capacity': 50000},
        'num_steps_sampled_before_learning_starts': 10000,
        'rollout_fragment_length': 4,
        'train_batch_size' : 32,
        'exploration_config' :
            {'epsilon_timesteps': 200000,
            'final_epsilon': .01},
        'model':
            {'grayscale': True,
            'zero_mean': False,
            'dim': 42},
        # we should set compress_observations to True because few machines
        # would be able to contain the replay buffers in memory otherwise
        'compress_observations' : True,
            }
algo = DQN(config=CONFIG)
now = datetime.now()
name = now.strftime("_%m_%d_%Y_%H_%M_%S")
wandb.run.name='rainbow'+name
for k in range(100):
	result=algo.train()
	print('epoch : ',k)
    # print(pretty_print(result))
 
checkpoint_dir = algo.save() #save the model 
print(f"Checkpoint saved in directory {checkpoint_dir}") 
ray.shutdown()

## Custom Environment

Cool! It works but as you can see the state is defined as the image at the given time step (just like when you play ! ). However, when learning on Atari it is more efficient to define the state of the agent as a stack of frame so that it is aware of the dynamic (like the speed vector of the ball). Otherwise it won't learn much.

When you call RLlib with Atari environment, RLlib recognizes the Atari environment and prepocess it with [DeepMind Preprocessor](https://github.com/ray-project/ray/blob/master/rllib/env/wrappers/atari_wrappers.py) so that the state become the stack of frames.

Now we are going to see how to wrap an Atari environment so that the state is defined as stack of num_frames=3. Also, instead of having the state being an rgb images we will set it as a gray scale image. 
Summary : we had observation_space.shape=(1,height, width, C) C=3 for RGB channel, and now we have exactly the same space but the three channels are the 3 frames.

Let see how to do that :

In [None]:
from numpy.core.memmap import uint8
import numpy as np 
import gym 
import cv2
import numpy as np
from gym.spaces import Box
import matplotlib.pyplot as plt 
class CustomEnv(gym.Env):
    def __init__(self,dict_env={}) -> None:
        super(CustomEnv, self).__init__()
        self.dict_env=dict_env
        self.old_env= gym.make('PongDeterministic-v0')
        self.num_frames=3
        self.shape_new_image=(84, 84, 1)
        self.observation_space=Box(0, 255, (self.shape_new_image[0], self.shape_new_image[1], self.shape_new_image[2]*3), dtype=uint8)
        self.action_space=self.old_env.action_space
        self.seq_s=[np.zeros(shape=self.shape_new_image) for _ in range(self.num_frames)]
        
    def reset(self):
        # self.seq_s=[np.zeros(shape=self.shape_new_image) for _ in range(self.num_frames)] #reset sequence of states
        old_env_s= self.old_env.reset() #state from the old environment
        old_env_s=self.image_filter(np.array(old_env_s))
        self.seq_s.pop(0) #delete older
        self.seq_s.append(old_env_s)
        s = np.concatenate(self.seq_s,axis=-1) 
        return s

    def step(self, a):
        old_env_s, r, d, i = self.old_env.step(a) 
        old_env_s=self.image_filter(np.array(old_env_s))
        self.seq_s.pop(0)
        self.seq_s.append(old_env_s)
        s = np.concatenate(self.seq_s,axis=-1)
        return s, r, d, i

    def render(self,args):
        self.old_env.render(args)
    
    def image_filter(self,img):
        img_gs= cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        resized = cv2.resize(img_gs, self.shape_new_image[:-1], interpolation = cv2.INTER_AREA)/255.0 #normalize the data
        return np.expand_dims(resized,axis=-1)

    def __reduce__(self):
        deserializer = CustomEnv
        serialized_data = (self.dict_env, )
        return deserializer, serialized_data



env=CustomEnv()
id=ray.put(env) #check if it can be serialized
print(env.observation_space)
s=env.reset()
print(s.shape)
# plt.imshow(s)
s, r, d, i = env.step(env.action_space.sample())
print(s.shape)

# Change your model

Now that we have a new state let's also implement our own model.
As we saw, ray has several API levels. The very high level with Tune and lower levels.
When implementing a RL algo it is essential to change the structure of the model.
To do this we will modify the ModelTF2 class (feel free to use the [pytorch class](https://docs.ray.io/en/latest/rllib/rllib-models.html#custom-pytorch-models)).

In [None]:
import argparse
import os
import ray
from ray import air, tune
from ray.rllib.algorithms.callbacks import DefaultCallbacks
from ray.rllib.algorithms.dqn.dqn import DQNConfig
from ray.rllib.algorithms.dqn.distributional_q_tf_model import DistributionalQTFModel
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.misc import normc_initializer
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.tf.visionnet import VisionNetwork as MyVisionNetwork
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.metrics.learner_info import LEARNER_INFO, LEARNER_STATS_KEY
from ray.tune.registry import get_trainable_cls
tf1, tf, tfv = try_import_tf()
class AtariModel(TFModelV2):
    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name="atari_model"):
        super(AtariModel, self).__init__(obs_space, action_space, num_outputs, model_config,
                         name)
        inputs = tf.keras.layers.Input(shape=obs_space.shape, name='observations')
        # Convolutions on the frames on the screen
        layer1 = tf.keras.layers.Conv2D(
                32,
                [8, 8],
                strides=(4, 4),
                activation="relu",
                data_format='channels_last')(inputs)
        layer2 = tf.keras.layers.Conv2D(
                32,
                [4, 4],
                strides=(2, 2),
                activation="relu",
                data_format='channels_last')(layer1)
        layer3 = tf.keras.layers.Conv2D(
                32,
                [3, 3],
                strides=(1, 1),
                activation="relu",
                data_format='channels_last')(layer2)
        layer4 = tf.keras.layers.Flatten()(layer3)
        layer5 = tf.keras.layers.Dense(
                512,
                activation="relu",
                kernel_initializer=normc_initializer(1.0))(layer4)
        action = tf.keras.layers.Dense(
                num_outputs,
                activation="linear",
                name="actions",
                kernel_initializer=normc_initializer(0.01))(layer5)
        self.base_model = tf.keras.Model(inputs, action)
        # self.register_variables(self.base_model.variables)

    def forward(self, input_dict, state, seq_lens):
        model_out = self.base_model(input_dict["obs"])
        return model_out, state


now you can register the model.

In [None]:
ray.init()
ModelCatalog.register_custom_model("AtariModel", AtariModel)
ray.shutdown()

Now let's use your own model with your CustomEnv in the CONFIG dict: 

In [None]:
from ray_course.custom_callbacks import CustomCallbacks
from ray.rllib.algorithms.dqn import DQN
from datetime import datetime
import wandb
import ray 
ray.shutdown()
ray.init() #re-init
CONFIG = {
    #COMON config
		"env": CustomEnv,
		# "env_config": ENV_CONFIG,
		"num_gpus": 0,
		"num_workers": 1, # int(ressources['CPU'])
        "gamma": 0.95,
		"explore": True,
		"exploration_config": {
			"type": "StochasticSampling",
		},
        "framework": "tf2", #I prefer tensorflow but feel free to use pytorch
        "callbacks": CustomCallbacks,
        # rainbow config
        'env_config':{},  # deterministic
        'num_gpus': 1,
        'gamma': 0.99,
        'lr': .0001,
        'replay_buffer_config':
            {'type': 'MultiAgentPrioritizedReplayBuffer',
            'capacity': 50000},
        'num_steps_sampled_before_learning_starts': 1000,
        'rollout_fragment_length': 4,
        'train_batch_size' : 32,
        'exploration_config' :
            {'epsilon_timesteps': 200000,
            'final_epsilon': .01},
        'model':
            {'custom_model': AtariModel,
            'grayscale': True,
            'zero_mean': False,
            'dim': 42},
        # we should set compress_observations to True because few machines
        # would be able to contain the replay buffers in memory otherwise
        'compress_observations' : True,
            }
algo = DQN(config=CONFIG)
now = datetime.now()
name = now.strftime("_%m_%d_%Y_%H_%M_%S")
wandb.run.name='rainbow_custom_model'+name
for k in range(10):
	result=algo.train()
	print('epoch : ',k)
    # print(pretty_print(result))
 
checkpoint_dir = algo.save() #save the model 
print(f"Checkpoint saved in directory {checkpoint_dir}") 
ray.shutdown()

# Change the Loss (Implementing CURL)

Ok so we've seen how to customize the hyper-parameters, how to pass a specific environment to the algorithm, and how to customize your own estimator. 
One important thing left is how to change the loss. Once, you've seen this last feature you will be able to implement almost all the DRL algorithms with RLlib.

**CURL (Contrastive Unsupervised Representations for Reinforcement Learning)**

Rainbow is great, but we can be more efficient. More specifically, we can be sample efficient. In order to make our algorithm more sample efficient we're going to use contrastive learning.

**Contrastive Learning** :
In contrastive learning, the main idea is to train a neural network to learn representations of data that are "close" for similar data points, and "far" for different data points. This is achieved by defining a distance metric between the representations of the input data in a feature space, and then training the network to minimize the distance between the representations of similar data points and maximize the distance between the representations of dissimilar data points. The distance between the representations is commonly measured by a contrastive loss function. This approach aims to learn useful and informative features from the input data.

Useful links :

* [Blog on Contrastive Learning](https://lilianweng.github.io/posts/2021-05-31-contrastive/)
* [Curl paper](https://arxiv.org/abs/2004.04136)
* [Video summarizing Curl](https://www.youtube.com/watch?v=-Drowt9r4zY)

For this we are going to use the Policy API which allows you to change the loss functions, the actions computations, ... see [Policy API](https://docs.ray.io/en/latest/rllib/package_ref/policy/policy.html)

We will implement the algorithm in 4 steps : 

* Build the model for Curl (encoder q,k,...)
* Implement the custom loss
* Build the policy with the new loss 
* Modify the training step method of the algorithm to make the EMA (exponential moving average) update

Again i will be using tensorflow ([here is the pytorch alternative](https://docs.ray.io/en/latest/rllib/rllib-concepts.html#building-policies-in-tensorflow-eager)): 

1. First we need to change the model so that we have the key encoder and the query encoder. Also we can defined the function that will allow us to make the EMA update of the k encoder in the direction of the q encoder.




In [None]:
import argparse
import os
import ray
from ray import air, tune
from ray.rllib.algorithms.callbacks import DefaultCallbacks
from ray.rllib.algorithms.dqn.dqn import DQNConfig
from ray.rllib.algorithms.dqn.distributional_q_tf_model import DistributionalQTFModel
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.misc import normc_initializer
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.tf.visionnet import VisionNetwork as MyVisionNetwork
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.metrics.learner_info import LEARNER_INFO, LEARNER_STATS_KEY
from ray.tune.registry import get_trainable_cls
tf1, tf, tfv = try_import_tf()

class AtariModel(TFModelV2):

    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name="atari_model"):
        super(AtariModel, self).__init__(obs_space, action_space, num_outputs, model_config,
                         name)
        inputs = tf.keras.layers.Input(shape=obs_space.shape, name='observations')
        self.beta=0.9
        # Convolutions on the frames on the screen
        # data aug layer k
        # layer_aug_k_flip=tf.keras.layers.RandomFlip("horizontal_and_vertical")(inputs)
        # layer_aug_k=tf.keras.layers.RandomRotation(0.2)(layer_aug_k_flip)
        #encoder keys
        layer_k_1 = tf.keras.layers.Conv2D(32,[8, 8],strides=(4, 4),activation="relu",data_format='channels_last',name='layer_k_1',trainable=False)(inputs)
        layer_k_2 = tf.keras.layers.Conv2D(64,[4, 4],strides=(2, 2),activation="relu",data_format='channels_last',name='layer_k_2',trainable=False)(layer_k_1)
        layer_k_3 = tf.keras.layers.Conv2D(64,[3, 3],strides=(1, 1),activation="relu",data_format='channels_last',name='layer_k_3',trainable=False)(layer_k_2)
        flatten_k = tf.keras.layers.Flatten(name='layer_k_4',trainable=False)(layer_k_3)
        encoder_k= tf.keras.layers.Dense(1024,activation="relu",kernel_initializer=normc_initializer(1.0), name='layer_k_5')(flatten_k)
        # data aug layer q
        # layer_aug_q_flip=tf.keras.layers.RandomFlip("horizontal_and_vertical")(inputs)
        # layer_aug_q=tf.keras.layers.RandomRotation(0.2)(layer_aug_q_flip)
        #encoder queries
        layer_q_1 = tf.keras.layers.Conv2D(32,[8, 8],strides=(4, 4),activation="relu",data_format='channels_last',name='layer_q_1')(inputs)
        layer_q_2 = tf.keras.layers.Conv2D(64,[4, 4],strides=(2, 2),activation="relu",data_format='channels_last',name='layer_q_2')(layer_q_1)
        layer_q_3 = tf.keras.layers.Conv2D(64,[3, 3],strides=(1, 1),activation="relu",data_format='channels_last',name='layer_q_3')(layer_q_2)
        flatten_q = tf.keras.layers.Flatten(name='layer_q_4')(layer_q_3)
        encoder_q = tf.keras.layers.Dense(1024,activation="relu",kernel_initializer=normc_initializer(1.0), name='layer_q_5')(flatten_q)
        # Project Matrix W
        # W=tf.keras.layers.Dense(64, activation=None, use_bias=False)(encoder_k)
        # Head layer
        layer5 = tf.keras.layers.Dense(512,activation="relu",kernel_initializer=normc_initializer(1.0))(encoder_q)
        action = tf.keras.layers.Dense(num_outputs,activation="linear",name="actions",kernel_initializer=normc_initializer(0.01))(layer5)
        self.base_model = tf.keras.Model(inputs, [action, encoder_q,encoder_k])

        # self.register_variables(self.base_model.variables)

    def forward(self, input_dict, state, seq_lens):
        model_out, self._encoder_q_out, self._encoder_k_out= self.base_model(input_dict["obs"])
        return model_out, state
    
    # def projection_function(self):
    #     return self._W
    
    def encoder_q_function(self):
        return self._encoder_q_out

    def encoder_k_function(self):
        return self._encoder_k_out


    def ema_update(self):
        for k in range(1,5):
            layer_k=self.base_model.get_layer('layer_k_'+str(k))
            layer_q=self.base_model.get_layer('layer_q_'+str(k))
            weights_k_l=layer_k.get_weights()
            weights_q_l=layer_q.get_weights()
            new_weights_k=[]
            for i in range(len(weights_k_l)):
                new_weights_k.append(weights_k_l[i]*float(self.beta)+(1-float(self.beta))*weights_q_l[i])
            self.base_model.get_layer('layer_k_'+str(k)).set_weights(new_weights_k)




2. Now let's modify the initial loss to add the contrastive loss in the loss_fn function. The image augmentation is defined just like in the article (random_crop)


In [None]:
from torch import logit
from ray.rllib.algorithms.dqn.dqn_tf_policy import build_q_losses
from ray.rllib.policy.sample_batch import SampleBatch
import tensorflow as tf
from tensorflow.keras.losses import CosineSimilarity


def custom_loss(policy, model, _, train_batch):
    # RAINBOW LOSS
    dqn_loss= build_q_losses(policy, model, _, train_batch)

    # OUR CONTRASTIVE LOSS
    cosine_loss = tf.keras.losses.CosineSimilarity(axis=-1)
    size_crop=20
    input_batch=train_batch[SampleBatch.CUR_OBS]
    # augmented view for query 
    i_x=np.random.randint(0,size_crop)
    i_y=np.random.randint(0,size_crop)
    cropped_s= tf.keras.layers.Cropping2D(cropping=((i_x, size_crop-i_x), (i_y, size_crop-i_y)))(input_batch)
    augmented_q = tf.image.resize(cropped_s, (input_batch.shape[1], input_batch.shape[2]), method = tf.image.ResizeMethod.GAUSSIAN) #gausian interpolation
    model_output, _ = model({'obs' : augmented_q})
    # latent q
    z_q=model.encoder_q_function()
    # augmented view for key 
    i_x=np.random.randint(0,size_crop)
    i_y=np.random.randint(0,size_crop)
    cropped_s= tf.keras.layers.Cropping2D(cropping=((i_x, size_crop-i_x), (i_y, size_crop-i_y)))(input_batch)
    augmented_k = tf.image.resize(cropped_s, (input_batch.shape[1], input_batch.shape[2]), method = tf.image.ResizeMethod.GAUSSIAN)
    model_output, _ = model({'obs' : augmented_k})
    # latent k
    z_k=model.encoder_k_function()
    # contrastive loss
    pos_pairs_loss=cosine_loss(z_k,z_q)
    neg_pairs_loss=0
    for i in range(input_batch.shape[0]):
        neg=tf.concat((z_k[:i],z_k[i+1:]),axis=0) #neg pairs
        anchor=tf.repeat(tf.expand_dims(z_q[i],axis=0), repeats=neg.shape[0], axis=0)
        neg_pairs_loss+=-cosine_loss(neg,anchor)
    contrastive_loss=pos_pairs_loss+neg_pairs_loss
    print('contrastive_loss : ',contrastive_loss)
    print('dqn_loss : ',dqn_loss)
    return dqn_loss + contrastive_loss


3. Build the policy. As you can see we can modify the initial policy with the **with_updates** function (really cool feature) :

In [None]:
from ray.rllib.policy.tf_policy_template import build_tf_policy
from ray.rllib.algorithms.dqn.dqn_tf_policy import DQNTFPolicy

CustomPolicy = DQNTFPolicy.with_updates(
    name="CustomDQNPolicy",
    loss_fn=custom_loss)

4. As it can be seen in the article. After each update of to minimize the loss, there is an ema update of k toward q.
We already implemented the ema_function. Now we need to call it in the algorithm flow. 
We are going to inherite from the DQN algorithm so that you see the structure of the **training_step** function that we are going to modify. 
Alternatively, we could have modify the **learn_on_batch** function of the policy.
I just copy/pasted the **training_step** method of the dqn algorithm available [here](https://github.com/ray-project/ray/blob/ec3243d78726a2840f1323f997a210d1f33e5656/rllib/algorithms/dqn/dqn.py) and overided the function.
The modification can be seen at the *******custom update***** comment

In [None]:

import logging
from typing import List, Optional, Type, Callable
import numpy as np

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig, NotProvided
from ray.rllib.algorithms.dqn.dqn_tf_policy import DQNTFPolicy
from ray.rllib.algorithms.dqn.dqn_torch_policy import DQNTorchPolicy
from ray.rllib.algorithms.simple_q.simple_q import (
    SimpleQ,
    SimpleQConfig,
)
from ray.rllib.execution.rollout_ops import (
    synchronous_parallel_sample,
)
from ray.rllib.policy.sample_batch import MultiAgentBatch
from ray.rllib.execution.train_ops import (
    train_one_step,
    multi_gpu_train_one_step,
)
from ray.rllib.policy.policy import Policy
from ray.rllib.utils.annotations import override
from ray.rllib.utils.replay_buffers.utils import update_priorities_in_replay_buffer
from ray.rllib.utils.typing import ResultDict
from ray.rllib.utils.metrics import (
    NUM_ENV_STEPS_SAMPLED,
    NUM_AGENT_STEPS_SAMPLED,
)
from ray.rllib.utils.deprecation import (
    Deprecated,
)
from ray.rllib.utils.metrics import SYNCH_WORKER_WEIGHTS_TIMER
from ray.rllib.execution.common import (
    LAST_TARGET_UPDATE_TS,
    NUM_TARGET_UPDATES,
)
from ray.rllib.utils.deprecation import DEPRECATED_VALUE
from ray.rllib.utils.replay_buffers.utils import sample_min_n_steps_from_buffer
from ray.rllib.algorithms.dqn import DQN

def calculate_rr_weights(config: AlgorithmConfig) -> List[float]:
    """Calculate the round robin weights for the rollout and train steps"""
    if not config["training_intensity"]:
        return [1, 1]

    # Calculate the "native ratio" as:
    # [train-batch-size] / [size of env-rolled-out sampled data]
    # This is to set freshly rollout-collected data in relation to
    # the data we pull from the replay buffer (which also contains old
    # samples).
    native_ratio = config["train_batch_size"] / (
        config.get_rollout_fragment_length()
        * config["num_envs_per_worker"]
        # Add one to workers because the local
        # worker usually collects experiences as well, and we avoid division by zero.
        * max(config["num_workers"] + 1, 1)
    )

    # Training intensity is specified in terms of
    # (steps_replayed / steps_sampled), so adjust for the native ratio.
    sample_and_train_weight = config["training_intensity"] / native_ratio
    if sample_and_train_weight < 1:
        return [int(np.round(1 / sample_and_train_weight)), 1]
    else:
        return [1, int(np.round(sample_and_train_weight))]

class Curl(DQN):
    def __init__(self, config):
        super(DQN, self).__init__(config)
    @override(DQN)
    def training_step(self) -> ResultDict:
        """DQN training iteration function.
        Each training iteration, we:
        - Sample (MultiAgentBatch) from workers.
        - Store new samples in replay buffer.
        - Sample training batch (MultiAgentBatch) from replay buffer.
        - Learn on training batch.
        - Update remote workers' new policy weights.
        - Update target network every `target_network_update_freq` sample steps.
        - Return all collected metrics for the iteration.
        Returns:
            The results dict from executing the training iteration.
        """
        train_results = {}

        # We alternate between storing new samples and sampling and training
        store_weight, sample_and_train_weight = calculate_rr_weights(self.config)

        for _ in range(store_weight):
            # Sample (MultiAgentBatch) from workers.
            new_sample_batch = synchronous_parallel_sample(
                worker_set=self.workers, concat=True
            )

            # Update counters
            self._counters[NUM_AGENT_STEPS_SAMPLED] += new_sample_batch.agent_steps()
            self._counters[NUM_ENV_STEPS_SAMPLED] += new_sample_batch.env_steps()

            # Store new samples in replay buffer.
            self.local_replay_buffer.add(new_sample_batch)

        global_vars = {
            "timestep": self._counters[NUM_ENV_STEPS_SAMPLED],
        }

        # Update target network every `target_network_update_freq` sample steps.
        cur_ts = self._counters[
            NUM_AGENT_STEPS_SAMPLED
            if self.config.count_steps_by == "agent_steps"
            else NUM_ENV_STEPS_SAMPLED
        ]
        if cur_ts > self.config.num_steps_sampled_before_learning_starts:
            for _ in range(sample_and_train_weight):
                # Sample training batch (MultiAgentBatch) from replay buffer.
                train_batch = sample_min_n_steps_from_buffer(
                    self.local_replay_buffer,
                    self.config.train_batch_size,
                    count_by_agent_steps=self.config.count_steps_by == "agent_steps",
                )

                # Postprocess batch before we learn on it
                post_fn = self.config.get("before_learn_on_batch") or (lambda b, *a: b)
                train_batch = post_fn(train_batch, self.workers, self.config)

                # for policy_id, sample_batch in train_batch.policy_batches.items():
                #     print(len(sample_batch["obs"]))
                #     print(sample_batch.count)

                # Learn on training batch.
                # Use simple optimizer (only for multi-agent or tf-eager; all other
                # cases should use the multi-GPU optimizer, even if only using 1 GPU)
                if self.config.get("simple_optimizer") is True:
                    train_results = train_one_step(self, train_batch)
                else:
                    train_results = multi_gpu_train_one_step(self, train_batch)

                # Update replay buffer priorities.
                update_priorities_in_replay_buffer(
                    self.local_replay_buffer,
                    self.config,
                    train_batch,
                    train_results,
                )
                #*****************************************custom update*****************************************
                for pid in self.workers.local_worker().policy_map.keys():
                    self.workers.local_worker().policy_map[pid].model.ema_update()
                #*****************************************end of the custom update*****************************************
                last_update = self._counters[LAST_TARGET_UPDATE_TS]
                if cur_ts - last_update >= self.config.target_network_update_freq:
                    to_update = self.workers.local_worker().get_policies_to_train()
                    self.workers.local_worker().foreach_policy_to_train(
                        lambda p, pid: pid in to_update and p.update_target()
                    )
                    self._counters[NUM_TARGET_UPDATES] += 1
                    self._counters[LAST_TARGET_UPDATE_TS] = cur_ts
                # print("Worker dict : ",self.workers.local_worker().policy_dict())
                # Update weights and global_vars - after learning on the local worker -
                # on all remote workers.
                with self._timers[SYNCH_WORKER_WEIGHTS_TIMER]:
                    self.workers.sync_weights(global_vars=global_vars)

        # Return all collected metrics for the iteration.
        return train_results

    def get_default_policy_class(self, config):
        return CustomPolicy

    

Now let's train Curl !

In [None]:
import ray 
from ray_course.custom_callbacks import CustomCallbacks
import wandb
from datetime import datetime

ray.shutdown()
ray.init() #re-init
CONFIG = {
    #COMON config
		"env": CustomEnv,
		# "env_config": ENV_CONFIG,
		"num_gpus": 0,
		"num_workers": 1, # int(ressources['CPU'])
        "gamma": 0.95,
		"explore": True,
		"exploration_config": {
			"type": "StochasticSampling",
		},
        "framework": "tf2", #I prefer tensorflow but feel free to use pytorch
        # "callbacks": CustomCallbacks,
        # rainbow config
        'env_config':{},  # deterministic
        'gamma': 0.99,
        'lr': .0001,
        'replay_buffer_config':
            {'type': 'MultiAgentPrioritizedReplayBuffer',
            'capacity': 50000},
        'num_steps_sampled_before_learning_starts': 500, #10000
        'rollout_fragment_length': 4,
        'train_batch_size' : 32,
        'exploration_config' :
            {'epsilon_timesteps': 200000,
            'final_epsilon': .01},
        'model':
            {'custom_model': AtariModel,
            'grayscale': True,
            'zero_mean': False,
            'dim': 42},
        # we should set compress_observations to True because few machines
        # would be able to contain the replay buffers in memory otherwise
        'compress_observations' : True,
            }
algo = Curl(config=CONFIG)
# now = datetime.now()
# name = now.strftime("_%m_%d_%Y_%H_%M_%S")
# wandb.run.name='rainbow_custom_model'+name
for k in range(2):
	result=algo.train()
	print('epoch : ',k)
    # print(pretty_print(result))
 
checkpoint_dir = algo.save() #save the model 
print(f"Checkpoint saved in directory {checkpoint_dir}") 
ray.shutdown()

**Questions** :

There is an hyper-parameter really important in order for curl to work, can you guess which one it is ? 

We made an error in our implementation, can you guess what it is ? (indication : it's in the loss definition)

Modify the implementation so that it is exactly Curl


Try to use the **learn_on_batch** function to make the ema update : [here's where you begin](https://github.com/ray-project/ray/blob/ec3243d78726a2840f1323f997a210d1f33e5656/rllib/policy/tf_policy.py)
