### Activity 6: Communicating Ray Actors

(due due Monday December 9, 2024 4:00 pm)

This is a short exercise to demonstrate how actors can communicate through remote oids.
We are going to break the actors of the ImageNet classification [Example 24](../../examples/24_ex_ray_actors.ipynb) into 
two actors: one that transforms the image into an ResNet50 compatible tensor and one that takes
the tensor as input and returns the classification. 

You have been given two class files that have been written to be instantiated as Ray actors:
  * [rayresnet50_normalize](./rayresnet50_normalize.py)
  * [rayresnet50_classify](./rayresnet50_classify.py)

To complete the exercise you need to populate the following driver code.  Then answer the questions.

Data is from https://github.com/EliSchwartz/imagenet-sample-images.

Note: check your ouput to make sure that the predictions match the input file.

In [4]:
from rayresnet50_normalize import RRN50Normalize
from rayresnet50_classify import RRN50Classify
import ray
import time
import os

num_actors=4

# script to drive parallel program
ray.init(num_cpus=num_actors, ignore_reinit_error=True)

### TODO instantiate 4 normalization actors
normalize_actors = [RRN50Normalize.remote() for _ in range(num_actors)]

### TODO instantiate 4 classification actors
classify_actors = [RRN50Classify.remote() for _ in range(num_actors)]

directory = '../../data/imagenet1000'
files = os.listdir(directory)

start_time = time.time()  # Get the current time

normalize_oids = []
classify_oids = []

for i in range(len(files)):
    if files[i].endswith(".JPEG"):
        file_path = os.path.join(directory, files[i])

        ### TODO call remote to normalize image into tensor
        normalize_actor = normalize_actors[i % num_actors]
        norm_oid = normalize_actor.normalize_image.remote(file_path)
    
        ### TODO call remote to classify tensor
        classify_actor = classify_actors[i % num_actors]
        class_oid = classify_actor.classify_image.remote(norm_oid)
        
        ### TODO store the oids needed to complete the computation
        normalize_oids.append((files[i], norm_oid))
        classify_oids.append((files[i], class_oid))
err = 0
for i in range(len(files)):
    try:
        if files[i].endswith(".JPEG"):
            ### TODO collect results for each file in a variable preds
            file_name, class_oid = classify_oids[i]
            preds = ray.get(class_oid)

            parsed = file_name.split(".")[0].split("_")[1:]
            filename = " ".join(parsed)
            if filename not in preds: err += 1
    except:
        pass

end_time = time.time()  # Get the current time again

execution_time = end_time - start_time
print("Execution time: ", execution_time, " seconds")
print(f"Error rate: {(err / len(files)):0.4f}") 

2024-11-24 17:07:16,073	INFO worker.py:1652 -- Calling ray.init() again after it has already been called.


Execution time:  426.4795763492584  seconds
Error rate: 0.0170


In [3]:
ray.shutdown()

### Questions

Please provide short answers (2-3 sentences) to the following questions.  Submit just the written answers (no code) to Gradescope.

* Question 1: Does the computation for a single input file (normalization and classification) run in serial or parallel?  If serially, how is the dependency enforced?

The computation runs in series, as the output of the normalization step is required for the classification step. Ray enforces this when we send the `norm_oid` into the remote call of the classification step, which tells Ray that we need to schedule the classification step after the normalization step is completed.

* Question 2: Does the computation of different files run in serial or parallel?  If parallel, explain why they are independent.

The computation for different files is run in parallel. With different Ray actors, we can handle the computation for different files independently, as each actor will work on their own file. Since the normalization of file A does not depend on the normalization or classification of file B, we can do these files in parallel using different actors.

* Question 3: This version has about the same runtime as the version in Activity 22 that does normalization and classification in one actor.

    * (part a) In what configuration would it be fast to do them together?
 
      If you have a small dataset where the parallelism might not be as pronounced, then the overhead of using many actors would outweigh the speedup you gain. Additionally, if the tasks we are performing (normalization and classification) are not computationally expensive, then using only one actor could be better than splitting the work up and potentially leading to delays in the overall computation. In terms of target hardware, we would want to use only one actor when we have limited CPU/GPU resources, as we won't be able to take advantage of the multiple actors. 
      
    * (part b) In what configuration would it be faster to do them separately?

      Conversely, if we have a large dataset, we will see a significant speedup with using many actors, since the parallelism will be greater when working on multiple actors at once. Additionally, if the normalization/classification tasks are very expensive computationally, then we will want to split up the work among many actors to ensure that as much of the work is parallelized as possible. In terms of target hardware, we would want to use multiple actors when we have extensive compute resources, such as distributed/cloud compute, efficient GPUs, or even simply multi-core CPUs. Any improvement to the hardware can be directly translated to a speedup in the program if we match the number of actors to our available resources.

(By configuration, I mean data properties or target hardware system on which this would be preferable.) 