# **Ray: Parallel & Distributed Computing**

In [1]:
%load_ext watermark
%watermark -a 'NavinKumarMNK' -v -p ray,modin

Author: NavinKumarMNK

Python implementation: CPython
Python version       : 3.8.10
IPython version      : 7.34.0

ray  : 2.4.0
modin: 0.23.1



In [2]:
import modin.pandas as pd  # use pandas as usual
import numpy as np
import ray
import os
from typing import List

ray.init(
    num_cpus=8,
    include_dashboard=True,
    dashboard_port=5000,
    dashboard_host='172.17.0.2',
    ignore_reinit_error=True,
)

2023-08-30 07:18:30,384	INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://172.17.0.2:5000 [39m[22m


0,1
Python version:,3.8.10
Ray version:,2.4.0
Dashboard:,http://172.17.0.2:5000


In [5]:
!ray status

Node status
---------------------------------------------------------------
Healthy:
 1 node_440ddba3448599796b0ac0b56e960f7cb3e036091078523165c76ebe
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 0.0/8.0 CPU
 0.0/1.0 GPU
 0B/8.99GiB memory
 0B/4.50GiB object_store_memory

Demands:
 (no resource demands)
[0m

## **Modin: Pandas on Ray**

In [3]:
os.environ['__MODIN_AUTOIMPORT_PANDAS__'] = '1'  # For pre-intialzied Cluster

frame_data = np.random.rand(2**10, 2**5)
df = pd.DataFrame(frame_data)

df.sample(10)



Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,22,23,24,25,26,27,28,29,30,31
197,0.590155,0.366686,0.281113,0.347351,0.231078,0.609254,0.522115,0.205556,0.968933,0.72096,...,0.545537,0.59927,0.051591,0.969313,0.85071,0.196345,0.385233,0.31497,0.059072,0.438691
710,0.830011,0.653616,0.940365,0.41614,0.553943,0.760723,0.19389,0.989536,0.150194,0.883801,...,0.721271,0.952326,0.463093,0.930931,0.918382,0.894743,0.229851,0.261846,0.389051,0.063115
43,0.440464,0.427074,0.856369,0.841894,0.898912,0.494466,0.200138,0.262988,0.849049,0.175653,...,0.90266,0.847232,0.269528,0.923799,0.652222,0.333671,0.082197,0.143363,0.141131,0.994792
112,0.509619,0.626254,0.102381,0.455701,0.020973,0.618966,0.863451,0.220738,0.637025,0.097074,...,0.228796,0.711734,0.72211,0.992491,0.85774,0.939099,0.889451,0.606937,0.930586,0.031791
52,0.159923,0.039043,0.222332,0.234752,0.829147,0.792671,0.415861,0.73287,0.544096,0.072595,...,0.177587,0.307869,0.647302,0.775092,0.319993,0.009514,0.05154,0.490687,0.101855,0.984882
980,0.330459,0.281059,0.168883,0.023797,0.428298,0.057459,0.397383,0.227264,0.043982,0.251871,...,0.692707,0.903127,0.173158,0.725232,0.662616,0.740453,0.50332,0.263018,0.225527,0.519567
499,0.180457,0.037204,0.377299,0.600312,0.346771,0.280349,0.106837,0.545112,0.879261,0.608881,...,0.84858,0.007381,0.384192,0.841904,0.738344,0.029858,0.453188,0.749496,0.581517,0.997861
568,0.885567,0.587812,0.518483,0.007225,0.998873,0.417804,0.038529,0.856307,0.797117,0.333712,...,0.971606,0.496051,0.763613,0.208033,0.104272,0.70794,0.514815,0.710384,0.345656,0.706582
502,0.043693,0.873747,0.991019,0.887049,0.979308,0.606208,0.128125,0.609924,0.212247,0.404136,...,0.283541,0.639853,0.812362,0.598629,0.200225,0.378576,0.975264,0.9419,0.935829,0.35523
546,0.283171,0.313531,0.338839,0.827231,0.496108,0.771235,0.481972,0.676476,0.750642,0.166865,...,0.137901,0.497713,0.640569,0.504953,0.254756,0.004269,0.450847,0.773059,0.025952,0.950943


## **Map Reduce**

In [4]:
def map_serial(function, xs):
    return [function(x) for x in xs]

def map_parallel(function: ray.remote, xs: List):
    return [function.remote(x) for x in xs]    

# Execution code
def increment_regular(x):
    return x+1

@ray.remote(num_cpus=4)
def increment_remote(x):
    return x+1

In [5]:
def reduce_serial(function, xs):
    if len(xs) == 1:
        return xs[0]
    
    result = xs[0]
    for i in range(1, len(xs)):
        result = function(result, xs[i])
    
    return result

def reduce_parallel(function: ray.remote, xs: List):
    if len(xs) == 1:
        return xs[0]
    
    result = xs[0]
    for i in range(1, len(xs)):
        result = function.remote(result, xs[i])
    
    return result

def add_regular(x, y):
    return x+y

@ray.remote
def add_remote(x, y):
    return x+y


### **Tree Reduce: More Parallelism**

In [6]:
# recurisve overhead
def reduce_parallel_tree(function: ray.remote, xs: List):
    if len(xs) == 2:
        return function.remote(xs[0], xs[1])
    elif len(xs) == 1:
        return xs[0]
    
    mid = len(xs) // 2 + 1 
    
    return function.remote(
        reduce_parallel_tree(function, xs[:mid]) ,
        reduce_parallel_tree(function, xs[mid:])
               )

In [7]:
xs = [x for x in range(100)]

# map
%time results_serial = map_serial(increment_regular, xs)
%time results_ids = map_parallel(increment_remote, xs)
%time results_parallel = ray.get(results_ids)  # retrieving from multiple cpu's taking time

# reduce
%time results_serial = reduce_serial(add_regular, results_serial)
print(f"{results_serial = }")
%time results_ids = reduce_parallel(add_remote, results_parallel)
%time results_ids = reduce_parallel_tree(add_remote, results_parallel)
%time results_parallel = ray.get(results_ids)
print(f"{results_parallel = }")

CPU times: user 8 µs, sys: 5 µs, total: 13 µs
Wall time: 16.7 µs
CPU times: user 16.5 ms, sys: 3.21 ms, total: 19.7 ms
Wall time: 14.6 ms
CPU times: user 9.22 ms, sys: 1.21 ms, total: 10.4 ms
Wall time: 34.3 ms
CPU times: user 8 µs, sys: 5 µs, total: 13 µs
Wall time: 16.9 µs
results_serial = 5050
CPU times: user 17.4 ms, sys: 0 ns, total: 17.4 ms
Wall time: 12.5 ms
CPU times: user 18.7 ms, sys: 4.53 ms, total: 23.2 ms
Wall time: 12.4 ms
CPU times: user 4.96 ms, sys: 659 µs, total: 5.62 ms
Wall time: 5.89 ms
results_parallel = 5050


## **Sharded Parameter Server**

### **Parameters Actor Class**

In [8]:
dim = 32

# Actor
@ray.remote
class ParameterServer():
    def __init__(self, dim):
        self.parameters = np.zeros(dim)
    
    def get_parameters(self):
        return self.parameters
    
    def update_parameters(self, update):
        self.parameters += update
    
ps = ParameterServer.remote(dim)
ps

Actor(ParameterServer, bdf955914fcdfebbd2e709c701000000)

In [9]:
@ray.remote
def worker(ps, dim, num_iters):
    for _ in range(num_iters):
        parameters = ps.get_parameters.remote()
        parameters = ray.get(parameters)
        update = 1e-3 * parameters + np.random.rand(dim)
        
        ps.update_parameters.remote(update)

ray.get(worker.remote(ps, dim, 1))

#starting worker
worker_results = [
    worker.remote(ps, dim, 1000) for _ in range(2)
]

print(ray.get(ps.get_parameters.remote()))
print(ray.get(ps.get_parameters.remote()))
print(ray.get(ps.get_parameters.remote()))  # Parameters are changing

[0.86997183 0.38078646 0.51639991 0.63380989 0.60641403 0.97978094
 0.08672282 0.0688103  0.77695205 0.60829419 0.15903358 0.25133237
 0.07067603 0.62364895 0.09768661 0.71086845 0.01646831 0.8174532
 0.87253001 0.06293315 0.52057789 0.58756335 0.15279611 0.82141561
 0.71294029 0.60065556 0.4666836  0.701576   0.19315387 0.85952172
 0.69749131 0.60679272]
[1.6283211  0.78869254 0.62559568 0.94954334 0.94908555 1.0805241
 0.90717342 0.41385069 1.5136838  1.30451049 0.36562445 0.74481619
 0.61539246 1.42749858 0.46357057 1.60540301 0.35531057 0.92277364
 1.13546958 0.4302055  1.15819236 0.68687314 0.62135154 1.66067664
 1.59130577 1.23998963 1.0298331  0.72597512 0.57890773 1.82847374
 0.82303174 1.02215407]
[2.03007428 1.35840809 1.47721654 1.06936497 1.41795414 2.00223951
 1.15104459 1.39177137 2.17924427 2.17587889 1.19526039 1.29528281
 1.41899835 2.05345196 0.58989004 2.40787156 0.87424841 1.653977
 1.69704377 0.5072282  1.36240005 1.51995838 1.10432979 2.1537851
 2.05355629 1.70689

### **Sharding Parameter Server**

> As the number of workers increases, the volume of updates being sent to the parameter server will increase. At some point, the network bandwidth into the parameter server machine or the computation down by the parameter server may be a bottleneck.

> Suppose you have $N$ workers and $1$ parameter server, and suppose each of these is an actor that lives on its own machine. Furthermore, suppose the model size is $M$ bytes. Then sending all of the parameters from the workers to the parameter server will mean that $N * M$ bytes in total are sent to the parameter server. If $N = 100$ and $M = 10^8$, then the parameter server must receive ten gigabytes, which, assuming a network bandwidth of 10 giga*bits* per second, would take 8 seconds. This would be prohibitive.

> On the other hand, if the parameters are sharded (that is, split) across `K` parameter servers, `K` is `100`, and each parameter server lives on a separate machine, then each parameter server needs to receive only 100 megabytes, which can be done in 80 milliseconds. This is much better.


In [10]:
@ray.remote
class ParameterServerShard():
    def __init__(self, sharded_dim):
        self.parameters = np.zeros(sharded_dim)
    
    def get_parameters(self):
        return self.parameters
    
    def update_parameters(self, update):
        self.parameters += update

        
total_dim = (10**8) // 8
num_shards = 2

assert total_dim % num_shards == 0, ('Parameters should totaly divide')

ps_shards = [
    ParameterServerShard.remote(total_dim // num_shards) for _ in range(num_shards)
]

ps_shards

[Actor(ParameterServerShard, d1ab1405f587f7168b71392f01000000),
 Actor(ParameterServerShard, 636c1d4fa7696f11c4699b0901000000)]

In [11]:
@ray.remote
def worker_task(total_dim, num_iters, *ps_shards):
    for _ in range(num_iters):
        parameter_shards = ray.get(
            [ps.get_parameters.remote() for ps in ps_shards]
        )
    
        parameters = np.concatenate(parameter_shards)
        update = np.random.rand(total_dim)
        update_shards = np.split(update, len(ps_shards))
        
        for ps, us in zip(ps_shards, update_shards):
            ps.update_parameters.remote(us)
        
ray.get(worker_task.remote(total_dim, 40, *ps_shards)) 

**NOTE:** Because these processes are all running on the same machine, network bandwidth will not be a limitation and sharding the parameter server will not help. To see the difference, you would need to run the application on multiple machines. There are still regimes where sharding a parameter server can help speed up computation on the same machine (by parallelizing the computation that the parameter server processes have to do). If you want to see this effect, you should implement a synchronous training application. In the asynchronous setting, the computation is staggered and so speeding up the parameter server usually does not matter.

In [12]:
%%time

num_workers = 4
ray.get([worker_task.remote(total_dim, 10, *ps_shards) for _ in range(num_workers)]) 

CPU times: user 29.1 ms, sys: 4.76 ms, total: 33.9 ms
Wall time: 3.26 s


[None, None, None, None]