In [1]:
!pip install ray

Collecting ray
  Downloading ray-2.34.0-cp310-cp310-manylinux2014_x86_64.whl.metadata (13 kB)
Downloading ray-2.34.0-cp310-cp310-manylinux2014_x86_64.whl (64.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m64.9/64.9 MB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ray
Successfully installed ray-2.34.0


In [2]:
import ray
import time

ray.init()

@ray.remote
def slow_function():
    time.sleep(5)  # Pretend this is doing some heavy work
    return "Done!"
# Start the remote function
future = slow_function.remote()

# This prints immediately, we don't wait for slow_function to finish
print("I can do other things while waiting!")

# When we actually need the result, we use ray.get()
result = ray.get(future)
print(result)

2024-08-18 19:45:25,342	INFO worker.py:1781 -- Started a local Ray instance.


I can do other things while waiting!
Done!


In [4]:
import ray


@ray.remote
class StatefulCounter:
    def __init__(self):
        self.count = 0

    def increment(self):
        self.count += 1
        return self.count

@ray.remote
def stateless_increment(count):
    return count + 1

# Stateful example (this is NOT how remote functions work)
counter = StatefulCounter.remote()
print(ray.get(counter.increment.remote()))  # Prints 1
print(ray.get(counter.increment.remote()))  # Prints 2

# Stateless example (this IS how remote functions work)
print(ray.get(stateless_increment.remote(0)))  # Prints 1
print(ray.get(stateless_increment.remote(0)))  # Prints 1 again

1
2
1
1


### For Actors (stateful):

- Use classes decorated with @ray.remote
Example: @ray.remote class MyActor:
- Maintain state between method calls
- Created with actor = MyActor.remote()
- Methods called with actor.method.remote()


### For Tasks (stateless):

- Use functions decorated with @ray.remote
Example: @ray.remote def my_task():
- Don't maintain state between calls
- Called directly with my_task.remote()

# parralelism in ray

In [5]:
def square(x):
    time.sleep(1)  # Simulate some computation time
    return x * x

# Define the same function as a Ray remote function
@ray.remote
def square_ray(x):
    time.sleep(1)  # Simulate some computation time
    return x * x

# Number of tasks we want to run
num_tasks = 5

# Normal Python execution
start_time = time.time()
results_normal = [square(i) for i in range(num_tasks)]
end_time = time.time()
print(f"Normal Python execution time: {end_time - start_time} seconds")
print(f"Normal Python results: {results_normal}")

# Ray parallel execution
start_time = time.time()
futures = [square_ray.remote(i) for i in range(num_tasks)]
results_ray = ray.get(futures)
end_time = time.time()
print(f"Ray parallel execution time: {end_time - start_time} seconds")
print(f"Ray parallel results: {results_ray}")


Normal Python execution time: 5.004734992980957 seconds
Normal Python results: [0, 1, 4, 9, 16]
Ray parallel execution time: 3.033726692199707 seconds
Ray parallel results: [0, 1, 4, 9, 16]


## actors are stateful they have memory unlike tasks

In [7]:
@ray.remote
class Assistant:
    def __init__(self):
        self.tasks_completed = 0

    def do_task(self):
        self.tasks_completed += 1
        return f"Completed task. Total tasks done: {self.tasks_completed}"
alex = Assistant.remote()
print(ray.get(alex.do_task.remote()))  # Output: Completed task. Total tasks done: 1
print(ray.get(alex.do_task.remote()))  # Output: Completed task. Total tasks done: 2

Completed task. Total tasks done: 1
Completed task. Total tasks done: 2


## Each actor maintains their own state

In [9]:
alex = Assistant.remote()
bob = Assistant.remote()

# Alex and Bob can work at the same time
alex_task = alex.do_task.remote()
bob_task = bob.do_task.remote()

print(ray.get(alex_task))  # Output: Completed task. Total tasks done: 1
print(ray.get(bob_task))   # Output: Completed task. Total tasks done: 1

Completed task. Total tasks done: 1
Completed task. Total tasks done: 1


## actors are single threaded in nature

In [10]:
@ray.remote
class Chef:
    def make_sandwich(self):
        print("Starting to make a sandwich")
        time.sleep(3)  # Pretend it takes 3 seconds to make a sandwich
        print("Finished making a sandwich")
        return "Sandwich ready!"

    def make_salad(self):
        print("Starting to make a salad")
        time.sleep(2)  # Pretend it takes 2 seconds to make a salad
        print("Finished making a salad")
        return "Salad ready!"

# Hire our chef
chef = Chef.remote()

# Ask the chef to make a sandwich and a salad
sandwich_order = chef.make_sandwich.remote()
salad_order = chef.make_salad.remote()

# Get the results
print(ray.get(sandwich_order))
print(ray.get(salad_order))


[36m(Chef pid=19557)[0m Starting to make a sandwich
[36m(Chef pid=19557)[0m Finished making a sandwich
Sandwich ready!
[36m(Chef pid=19557)[0m Starting to make a salad
Salad ready!
[36m(Chef pid=19557)[0m Finished making a salad


In [12]:
@ray.remote
class Worker:
    def task(self, id):
        time.sleep(1)  # Simulate work
        return f"Task {id} done"

# Single actor, multiple calls
single_worker = Worker.remote()
start = time.time()
future1 = single_worker.task.remote(1)
future2 = single_worker.task.remote(2)
future3 = single_worker.task.remote(3)
results_single = ray.get([future1, future2, future3])
end = time.time()
print(f"Single actor results: {results_single}")
print(f"Single actor time: {end - start:.2f} seconds")

# Multiple actors, one call each
start = time.time()
worker1 = Worker.remote()
worker2 = Worker.remote()
worker3 = Worker.remote()
future1 = worker1.task.remote(1)
future2 = worker2.task.remote(2)
future3 = worker3.task.remote(3)
results_multi = ray.get([future1, future2, future3])
end = time.time()
print(f"Multiple actors results: {results_multi}")
print(f"Multiple actors time: {end - start:.2f} seconds")

Single actor results: ['Task 1 done', 'Task 2 done', 'Task 3 done']
Single actor time: 3.76 seconds
Multiple actors results: ['Task 1 done', 'Task 2 done', 'Task 3 done']
Multiple actors time: 2.78 seconds



1. Single Actor Performance:
   - All three tasks were completed successfully.
   - Total execution time: 3.76 seconds.
   - This is consistent with sequential execution (about 1 second per task).

2. Multiple Actors Performance:
   - All three tasks were completed successfully.
   - Total execution time: 2.78 seconds.
   - This is faster than the single actor, showing some parallelism, but not full parallelism (which would be closer to 1 second).


Key Observations:
1. The multiple actors approach is faster, demonstrating some level of parallelism.
2. However, the multiple actors didn't achieve the theoretical maximum parallelism (which would be close to 1 second) because we are running on collab with limited resources


Overall, this output shows that using multiple actors does provide a performance benefit, but the system may not have enough resources to run all actors fully in parallel. The warning suggests that you might be approaching the limits of your system's capacity to handle concurrent Ray tasks or actors.

In [13]:
import numpy as np

### Using ray.put() to store data in ray object store

In [14]:
# Example 1: Storing a simple Python object
data = [1, 2, 3, 4, 5]
data_ref = ray.put(data)
print("ObjectRef for data:", data_ref)
retrieved_data = ray.get(data_ref)
print("Retrieved data:", retrieved_data)


ObjectRef for data: ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001e1f505)
Retrieved data: [1, 2, 3, 4, 5]


In [15]:
# Example 2: Storing a large NumPy array
large_array = np.random.rand(1000000)
array_ref = ray.put(large_array)
print("ObjectRef for large array:", array_ref)
retrieved_array = ray.get(array_ref)
print("Shape of retrieved array:", retrieved_array.shape)

ObjectRef for large array: ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000002e1f505)
Shape of retrieved array: (1000000,)


In [17]:
@ray.remote
def process_data(data):
    # No need to use ray.get() here, 'data' is already the actual data
    return sum(data)

# Create some data
data = [1, 2, 3, 4, 5]

# Put the data in the object store and get an ObjectRef
data_ref = ray.put(data)

# Call the remote function with the ObjectRef
result_ref = process_data.remote(data_ref)

# Get the result
result = ray.get(result_ref)
print("Sum of data:", result)

Sum of data: 15


Inside remote functions, you receive the actual data, not the ObjectRef. So you don't need to use ray.get() there.
Use ray.get() only when you want to retrieve results from a remote function call.