## The Basics of Ray
Welcome to the Jupyter Notebook on the Basics of Ray! Here, we run a simple Ray experiment where we run a computationally intensive task using matrix multiplication. We also perform analysis on how much resources are being used locally and how the resources are distributed.

In [1]:
# importing all required packages
import ray
import time
import psutil
import numpy as np

### Initializing Ray
We start off by initilizing ray. Using the terminal, we use the command `ray start --head --port=6379` to start up Ray. Next, we connect to the ray instance using `ray.init()` and check if the ray cluster is initialized.

In [2]:
# starting up a ray instance
ray.init(address="auto")

2025-01-09 05:19:38,304	INFO worker.py:1636 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2025-01-09 05:19:38,328	INFO worker.py:1812 -- Connected to Ray cluster. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.9.21
Ray version:,2.40.0
Dashboard:,http://127.0.0.1:8265


In [3]:
# checking if ray cluster is initialized
ray.is_initialized()

True

### Computational Functions
Next, we define two computational tasks in two functions below. The `compute_heavier(x)` is designed to be more computationally intensive, so that we can observe how CPU resources are being used up. We use `@ray.remote(num_cpus=2)` to determine how many CPUs are required to run the function below.

In [4]:
@ray.remote
def compute_heavy(x):
    time.sleep(1)
    return x * 2

In [5]:
@ray.remote(num_cpus=1)
def compute_heavier(x):
    matrix_size = 7000  
    matrix_a = np.random.rand(matrix_size, matrix_size)
    matrix_b = np.random.rand(matrix_size, matrix_size)
    result = np.dot(matrix_a, matrix_b)
    return np.sum(result)

### Running the `compute_heavy()` Function
The code below uses the `compute_heavy()` function in parallel for for 10 inputs. Each call to `compute_heavy.remote(i)` creates a task that executes on Ray workers, returning a "future". These futures are collected in a list, and `ray.get(futures)` blocks until all tasks are complete. 

In [6]:
# running the compute_heavy() function
futures = [compute_heavy.remote(i) for i in range(10)]

# retrieving all the futures
results = ray.get(futures)

# printing out the results
print(results)

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


In [7]:
# checking for number of workers
workers = len(ray.nodes()) - 1
print(f"Number of Ray workers: {workers}")

Number of Ray workers: 0


### Running the `compute_heavier()` Function
Next, we run the `computer_heavier()` function. We use the `get_cpu_usage()` method to measure the percentage of the CPU being used over a 10 second interval. We then measure the usage of CPU resources before, during and after running the function using Ray. We are also perform benchmarking to compare the differences of running the function with and without Ray.

In [8]:
# function to retrieve CPU usage for a 10 second interval
def get_cpu_usage():
    return psutil.cpu_percent(interval=10)

In [9]:
# starting time
start_time = time.perf_counter()

In [10]:
# submitting multiple tasks in parallel
futures = [compute_heavier.remote(i) for i in range(10)]

# running the tasks and collecting the results
results = ray.get(futures)

# printing out the results
print(results)

[np.float64(85744998619.16913), np.float64(85729977894.7692), np.float64(85752560385.48042), np.float64(85755639811.25732), np.float64(85733861895.35637), np.float64(85764190858.20898), np.float64(85754351893.43669), np.float64(85749799377.60217), np.float64(85732425990.52632), np.float64(85745143603.87476)]


In [11]:
# stopping the timer
end_time = time.perf_counter()

# calculating and printing the time elapsed
time_elapsed = end_time - start_time

# printing out the time elapsed
print(time_elapsed)

96.80903122800001


### Benchmarking Without Ray
To see if the parallel processing is truly beneficial, we perform the same experiment without using the Ray library. We run the function `compute_heavier_without_ray()` which runs the same computations as the `compute_heavier()` function but without the ray tag overhead.

In [12]:
# function that performs computations without using ray
def compute_heavier_without_ray(x):
    matrix_size = 10000  
    matrix_a = np.random.rand(matrix_size, matrix_size)
    matrix_b = np.random.rand(matrix_size, matrix_size)
    result = np.dot(matrix_a, matrix_b)
    return np.sum(result)

In [None]:
# intiializing the time
start_time = time.perf_counter()

# performing the computations
futures = [compute_heavier_without_ray(i) for i in range(10)]

# setting a stop to the timer
end_time = time.perf_counter()

# measuring the time elapsed and printing the results out
time_elapsed = end_time - start_time
print(time_elapsed)

### Shutting Down Ray
Lastly, we shut down Ray using `ray.shutdown()`. We then stop Ray using the terminal line command `ray stop`.

In [None]:
# shutting down ray instance
ray.shutdown()

## Conlusion
We performed this experiment where we set the size of the matrix to 10,000. The reason is we wanted to see if there was a significant differece in terms of time taken for the function to finish running. As we can see, when we use ray, the process takes about 98s to finish, whereas, when we perform computations without Ray, it takes around 277s.