In [1]:
import os
import time
import ray

# Normal Python
def fibonacci_local(sequence_size):
    fibonacci = []
    for i in range(0, sequence_size):
        if i < 2:
            fibonacci.append(i)
            continue
        fibonacci.append(fibonacci[i-1]+fibonacci[i-2])
    return sequence_size

# Ray task
@ray.remote
def fibonacci_distributed(sequence_size):
    fibonacci = []
    for i in range(0, sequence_size):
        if i < 2:
            fibonacci.append(i)
            continue
        fibonacci.append(fibonacci[i-1]+fibonacci[i-2])
    return sequence_size

In [2]:
os.cpu_count()

4

In [3]:
# Normal Python
def run_local(sequence_size):
    start_time = time.time()
    results = [fibonacci_local(sequence_size) for _ in range(os.cpu_count())]
    duration = time.time() - start_time
    print('Sequence size: {}, Local execution time: {}'.format(sequence_size, duration))

# Ray
def run_remote(sequence_size):
    # Starting Ray
    ray.init()
    start_time = time.time()
    results = ray.get([fibonacci_distributed.remote(sequence_size) for _ in range(os.cpu_count())])
    duration = time.time() - start_time
    print('Sequence size: {}, Remote execution time: {}'.format(sequence_size, duration))  

In [4]:
run_local(100000)

Sequence size: 100000, Local execution time: 1.423630714416504


In [5]:
run_remote(100000)

2022-05-12 17:49:32,173	INFO services.py:1456 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


Sequence size: 100000, Remote execution time: 0.5920052528381348


# Task Dependencies

In [6]:
import numpy as np

@ray.remote
def create_matrix(size):
    return np.random.normal(size=size)

@ray.remote
def multiply_matrices(x, y):
    return np.dot(x, y)

x_id = create_matrix.remote([1000, 1000])
y_id = create_matrix.remote([1000, 1000])
z_id = multiply_matrices.remote(x_id, y_id)

# Get the results.
z = ray.get(z_id)
print(z)

[[-43.74338951 -12.16991773 -79.92173803 ... -22.48706205  31.5581209
  -47.13317131]
 [-32.70412478 -30.34191998  -0.75417966 ... -24.39169977 -10.40545103
   31.32188393]
 [ 35.16236425  -3.66729647  -1.90961785 ...  -4.16589092  68.02127893
    5.84063103]
 ...
 [ 18.2684853   22.16242914  26.4647355  ... -13.63078342  -0.14547263
   24.10078844]
 [ 28.8467166   79.31952896  -9.86002844 ... -62.58190737  40.3096995
  -26.27479474]
 [ 68.61577567  -3.47736588  19.3771663  ...   8.01371938 -17.64306138
    3.38061947]]


# From Classes to Actors

In [7]:
@ray.remote
class Counter(object):
    def __init__(self):
        self.x = 0
    
    def inc(self):
        self.x += 1
    
    def get_value(self):
        return self.x

# Create an actor process.
c = Counter.remote()

# Check the actor's counter value.
print(ray.get(c.get_value.remote()))  # 0

# Increment the counter twice and check the value again.
c.inc.remote()
c.inc.remote()
print(ray.get(c.get_value.remote()))  # 2

0
2


# The following example creates an actor that stores messages. Several worker tasks repeatedly push messages to the actor, and the main Python script reads the messages periodically.

In [8]:
@ray.remote
class MessageActor(object):
    def __init__(self):
        self.messages = []
    
    def add_message(self, message):
        self.messages.append(message)
    
    def get_and_clear_messages(self):
        messages = self.messages
        self.messages = []
        return messages


# Define a remote function which loops around and pushes
# messages to the actor.
@ray.remote
def worker(message_actor, j):
    for i in range(100):
        time.sleep(1)
        message_actor.add_message.remote(
            "Message {} from worker {}.".format(i, j))


# Create a message actor.
message_actor = MessageActor.remote()

# Start 3 tasks that push messages to the actor.
[worker.remote(message_actor, j) for j in range(3)]

# Periodically get the messages and print them.
for _ in range(20):
    new_messages = ray.get(message_actor.get_and_clear_messages.remote())
    print("New messages:", new_messages)
    time.sleep(1)



New messages: []
New messages: ['Message 0 from worker 2.', 'Message 0 from worker 1.', 'Message 0 from worker 0.']
New messages: ['Message 1 from worker 0.', 'Message 1 from worker 2.', 'Message 1 from worker 1.']
New messages: ['Message 2 from worker 1.', 'Message 2 from worker 0.', 'Message 2 from worker 2.']
New messages: ['Message 3 from worker 0.', 'Message 3 from worker 2.', 'Message 3 from worker 1.']
New messages: ['Message 4 from worker 2.', 'Message 4 from worker 0.', 'Message 4 from worker 1.']
New messages: ['Message 5 from worker 2.', 'Message 5 from worker 0.', 'Message 5 from worker 1.']
New messages: ['Message 6 from worker 1.', 'Message 6 from worker 0.', 'Message 6 from worker 2.']
New messages: ['Message 7 from worker 0.', 'Message 7 from worker 1.', 'Message 7 from worker 2.']
New messages: ['Message 8 from worker 1.', 'Message 8 from worker 0.', 'Message 8 from worker 2.']
New messages: ['Message 9 from worker 0.', 'Message 9 from worker 1.', 'Message 9 from worke

# Ray Datasets

In [9]:
# Create a Dataset of Python objects.
ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

In [10]:
ds.take(5)
# -> [0, 1, 2, 3, 4]

[0, 1, 2, 3, 4]

In [11]:
ds.count()
# -> 10000

10000

In [12]:
# Create a Dataset of Arrow records.
ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(10000)])
# -> Dataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string})


In [13]:
ds.show(5)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
# -> {'col1': 3, 'col2': '3'}
# -> {'col1': 4, 'col2': '4'}

{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}
{'col1': 3, 'col2': '3'}
{'col1': 4, 'col2': '4'}


In [14]:
ds.schema()
# -> col1: int64
# -> col2: string

col1: int64
col2: string