Getting started with deep reinforcement learning is not easy. 

There are a host of challenges from the different terminology versus supervised learning or optimization, to developing a simulation, and of course, the alphabet soup of algorithms to choose from and the Greek alphabet soup of hyperparameters to fiddle with.

Moreover, RL tends to be *extremely* data hungry, requiring thousands if not millions of simulations in order to learn a good policy. Even if you don't mind jumping in to the papers and implementing the algorithm yourself, you'll find that optimizing the algorithms and taking advantage of parallelization is going to be important to getting results.

That's where [Ray](https://ray.readthedocs.io/en/latest/index.html) comes in. Ray has been around since 2017, developed by [UC Berkeley's RISE Lab](https://rise.cs.berkeley.edu/), it is designed to bring scalable, parallelizable, reinforcement learning to practitioners and researchers without the pain of implementing models yourself.

<img src="https://www.datahubbs.com/wp-content/uploads/2020/04/ray_header_logo.png">

Before we jump into the RL piece, let's spend some time with the basics of Ray and show how we can use it to achieve speed-ups in our computations via parallel computing.

## TL;DR

We introduce Ray and show how to parallelize a few different functions to get increased performance from the library.

## First Parallelization with Ray

Before we get into the full-blown RL models, I want to walk through some simpler examples of parallelization and explain some of the benefits of the technique.

Typically, programs you write in Python are done in serial, i.e. one step after the next. In many applications this is just fine. However, given that modern machines - be it your laptop or an AWS server - has multiple CPU's, you can take advantage of that by breaking problems down so they can be run simultaneously in parallel to get huge speed-ups!

<img src="https://www.datahubbs.com/wp-content/uploads/2020/04/parallel_computing.png">

This is the case for many machine learning algorithms, and particularly for reinforcement learning where Monte Carlo simulation is employed to generate training data. These simulations can be run in parallel and the trajectories sent back to the neural network for updating. This is incredibly useful and can greatly speed up your training.

### Parallelizing a Timer

To get started, we need to install Ray using `pip install ray`. One thing to note, as of this writing, Ray will only run on Linux and MacOs machines, and is only compatible for Python 3.5-3.7 ([check the docs for updates](https://ray.readthedocs.io/en/latest/installation.html)). If your machine doesn't meet those requirements, then you can hop over to [Google Colab](https://colab.research.google.com) for free access to a notebook where you can run this code.

Let's show an example of the speed-up's for parallelization by starting with a sequential program, timing it, and moving it to Ray. In the first instance, we're going to take a standard timing example to show the basics.

In [1]:
import time
import numpy as np
import ray

We'll define a timer function that takes an argument, `x`, waits 1 second, then returns `x`. This is utterly useless, but will illustrate the sequential versus parallel power we have.

In [2]:
def timer(x):
    time.sleep(1)
    return x

Now, timing it:

In [20]:
t0 = time.time()
values = [timer(x) for x in range(4)]
print('Time Elapsed:\t{:.4f}'.format(time.time() - t0))
print(values)

Time Elapsed:	4.0043
[0, 1, 2, 3]


Before we parallelize, we need to initialize Ray with `ray.init()`, where we can set the number of CPU's we have. If you don't know what you have available, just run the following:

In [4]:
ray.init()
ray.available_resources()['CPU']

2020-04-03 11:55:09,481	INFO resource_spec.py:216 -- Starting Ray with 1.61 GiB memory available for workers and up to 0.82 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


8.0

As you see above, my machine has 8 CPU's I can use to parallize my process. If I don't pass a specific value when I call `ray.init`, it will use all 8. I'm going to reinitialize `ray` with only 4 CPU's available to it so that each CPU will handle one of the calls to our timer function independently. Note that if you reinitialize ray from and IDE like Jupyter, you must pass the `ignore_reinit_error=True` argument, otherwise you'll get an error, or you need to restart your kernel.

In [6]:
ray.init(num_cpus=4, ignore_reinit_error=True)

2020-04-03 12:01:06,559	ERROR worker.py:679 -- Calling ray.init() again after it has already been called.


To parallelize our function with Ray, we just need to decorate it with `remote`.

In [7]:
@ray.remote
def timer_ray(x):
    time.sleep(1)
    return x

Running the exact same code as above, we have:

In [16]:
t0 = time.time()
values = [timer_ray.remote(x) for x in range(4)]
print('Time Elapsed:\t{:.4f}'.format(time.time() - t0))
print(values)

Time Elapsed:	0.0025
[ObjectID(7dec8564195ad979ffffffff010000c801000000), ObjectID(0bead116322a6c2bffffffff010000c801000000), ObjectID(b944ee5bb38dd1a5ffffffff010000c801000000), ObjectID(2a124e2070438a75ffffffff010000c801000000)]


Hopefully the above looks odd to you. First, the elapsed time is not the expected 1 second, and second, the results look like a lot of gibberish. What Ray is doing here is measuring the time it takes to create object ID's to be run, not the time it takes to run the code itself. That's what we're seeing when we print the `values` list: a list of object ID's that point to these tasks. To get Ray to actually evaluate these functions, we need to call `ray.get()`.

In [17]:
ray.get(values)

[0, 1, 2, 3]

So, to get the timing and expected results for all of this, we'll wrap our list in the `ray.get()` function and try again.

In [18]:
t0 = time.time()
values = ray.get([timer_ray.remote(x) for x in range(4)])
print('Time Elapsed:\t{:.4f}'.format(time.time() - t0))
print(values)

Time Elapsed:	1.0106
[0, 1, 2, 3]


Now we get the expected output!

Let's turn to a more useful example to show how we can leverage this in actual computation.

### Parallelizing Moving Averages

There are numerous financial measurements and strategies that require computing moving averages. Sometimes you want a simple, 90-day moving average, other times a 10-day, or some other value. That's not so bad if you only have a few time series to deal with. But, as is often the case, you might need to calculate simple moving averages for thousands of different securities on a regular basis. If this is the case, we can take advantage of parallelization and make great gains.

Let's start by generating 1,000 different, random time series to show how this works.

In [57]:
data = np.random.normal(size=(1000, 1000))

From here, we'll implement a sequential simple moving average calculation that returns a 10-day moving average. If too little data is available - such as over the first 9 data points - it will just give the moving average over those days.

The function below will give us the desired result.

In [42]:
def calc_moving_average(data, window=10):
    ma_data = np.zeros(data.shape)
    for i, row in enumerate(data):
        ma_data[i] = np.array(
            [np.mean(row[j-window:j+1]) 
             if j > window else np.mean(row[:j+1]) 
             for j, _ in enumerate(row)])        
    return ma_data

Our `calc_moving_average` function takes each individual time series (denoted by a row in our data) and then returns the moving average for each step. If you plot this, it will show a smoothed value.

We'll time it as we did for the examples above.

In [47]:
t0 = time.time()
ma_data = calc_moving_average(data)
seq_time = time.time() - t0
print('Time Elapsed:\t{:.4f}'.format(seq_time))

Time Elapsed:	7.9067


Nearly 8 seconds elapse to calculate this. Let's see if we can do better by using our `@ray.remote` decorator on the function.

In [70]:
@ray.remote
def calc_moving_average_ray(data, window=10):
    ma_data = np.zeros(data.shape)
    for i, row in enumerate(data):
        ma_data[i] = np.array(
            [np.mean(row[j-window:j+1]) 
             if j > window else np.mean(row[:j+1]) 
             for j, _ in enumerate(row)])        
    return ma_data

In [71]:
t0 = time.time()
ma_data = ray.get(calc_moving_average_ray.remote(data))
par_time = time.time() - t0
print('Time Elapsed:\t{:.4f}'.format(par_time))
print('Speed up:\t{:.1f}X'.format(seq_time / par_time))
print("Results match:\t{}".format(np.allclose(ma_data, ma_data_ray)))

Time Elapsed:	7.6218
Speed up:	1.0X
Results match:	True


Our implementation didn't work out so well versus the baseline: we saved ~0.3 seconds, not quite the boost we were looking for. The reason for this is that we didn't break the moving average calculation down into easily parallelizable steps. We just told the computer to parallelize the entire algorithm, not the pieces that make the most sense. 

We can make a slight adjustment to this as shown below and see what the speed-up looks like.

In [67]:
@ray.remote
def calc_moving_average_ray(row, window=10):
    return np.array([np.mean(row[j-window:j+1]) 
             if j > window else np.mean(row[:j+1]) 
             for j, _ in enumerate(row)])

In [69]:
t0 = time.time()
ma_data_ray = np.array(ray.get(
    [calc_moving_average_ray.remote(row) 
    for row in data]
    ))
par_time = time.time() - t0
print('Time Elapsed:\t{:.4f}'.format(par_time))
print('Speed up:\t{:.1f}X'.format(seq_time/par_time))
print("Results match:\t{}".format(np.allclose(ma_data, ma_data_ray)))

Time Elapsed:	2.2801
Speed up:	3.5X
Results match:	True


Now we have a 3.5X speed up, which is about what we'd expect now that we've parallelized our process over 4 CPU's rather than running on a single processor. All we had to change was how we're inputting our data to the function. By passing each row of the data to the function, we've parallelized at a lower and more meaningful level for this algorithm. 

We don't get an exact 4.0X speed up because there is some overhead cost associated with this operation. Typically, the more information that needs to be moved around, the more overhead cost we incur. This means we want to shy away from lots of small operations, because it might cost more in terms of time to pass the information back and forth between the various cores than we gain by parallelizing.

## Ray for RL

Ray has two other libraries built on top of it, `RLLIB` and `Tune`, both of which are incredibly powerful for implementing reinforcement learning algorithms. They take advantage of the parallelization we discussed here and I'll turn to introducing those libraries and key functionalities in subsequent posts. 