In [None]:
import ray
import os

def initialize_ray():
    ray.shutdown()
    
    if ray.is_initialized() == False:
       service_host = os.environ["RAY_HEAD_SERVICE_HOST"]
       service_port = os.environ["RAY_HEAD_SERVICE_PORT"]
       _temp_dir='/domino/datasets/local/{}/'.format(os.environ['DOMINO_PROJECT_NAME']) #set to a dataset
       #ray.util.connect(f"{service_host}:{service_port}")
       address=f"ray://{service_host}:{service_port}"
       ray.init(address=address, _temp_dir=_temp_dir)

In [None]:
import ray
import time
import numpy as np

initialize_ray()

# ---- CONFIG ----
USE_ACTOR = True     # Set to False to use Ray function
NUM_EVALS = 200      # Simulate Ray Tune sending 200 eval requests
SLEEP_TIME = 0.1     # Simulate work
ARRAY_SIZE_MB = 50   # Simulate memory pressure

# ---- MEMORY PRESSURE OBJECT ----
def make_big_array():
    return np.ones((ARRAY_SIZE_MB * 250_000,), dtype=np.float32)  # ~50 MB

# ---- ACTOR VERSION ----
@ray.remote
class EvaluatorActor:
    def __init__(self):
        self.cache = []  # Persistent memory (grows over time)

    def evaluate(self, x):
        arr = make_big_array()
        self.cache.append(arr)  # Causes memory accumulation
        time.sleep(SLEEP_TIME)
        return x * 2

# ---- FUNCTION VERSION ----
@ray.remote
def evaluate_fn(x):
    arr = make_big_array()  # Ephemeral
    time.sleep(SLEEP_TIME)
    return x * 2


In [None]:
#Uncomment when you want to not use actors
#USE_ACTOR = False
# ---- DISPATCH ----
print(f"Use Actor? {USE_ACTOR}")
if USE_ACTOR:
    actors = [EvaluatorActor.remote() for _ in range(10)]  # Fewer due to higher cost
    futures = [actors[i % len(actors)].evaluate.remote(i) for i in range(NUM_EVALS)]
else:
    futures = [evaluate_fn.remote(i) for i in range(NUM_EVALS)]

results = ray.get(futures)
print("Sample results:", results[:5])
print("Total results count:",len(results))

## What is going on?

Ray actors fail unexpectedly in scenarios like Ray Tune–to–Eval pipelines because of the **accumulation of long-lived state, memory leaks, or scheduling contention**, 
none of which affect stateless Ray functions the same way.

### Memory leak via Actor Internal State
```python
self.cache.append(arr)
```
- Leak: Each evaluate call creates a large array (~50 MB) and stores it in self.cache.

- Since the actor lives across all calls, self.cache grows linearly with each call.

- No deletion, no eviction, and Python’s GC won't clean up because references are held.

- Ray cannot reclaim this memory since it's inside actor-managed state, not the object store.

- Over time, this causes unbounded memory growth - a classic application-level memory leak.

### In contrast (Ray Function Version):
```python
evaluate_fn.remote(i)
```
- Each task runs, completes, and memory is released.

- No retained state, no scheduler blocking beyond what’s necessary.

- Ray can schedule tasks dynamically and aggressively without saturation.



### What is queue saturation

Queue saturation in Ray means the internal task or method call queue—maintained by the Raylet scheduler and GCS—has more pending tasks than it can handle efficiently, 
leading to stalled progress or failure to schedule new work.

In the Actor Case:
- You have 10 actors.

- You submit 200 method calls.

- Each actor can only execute one method at a time (unless max_concurrency is set).

- So:

 - 10 calls begin executing.

 - 190 calls are placed in method call queues (per actor or at the Raylet).

- These queues saturate:

  - Memory usage grows (each queued call holds metadata and serialized args).

  - Raylet may stall or delay new scheduling decisions.

  - If the actor crashes or memory exceeds limits, pending calls are dropped or retried, increasing pressure.


###  In the 200,000 Tasks Case:
- 200,000 Ray tasks are submitted at once.

- The driver floods the head node’s scheduler.

- Ray reaches internal limits (max_in_flight_requests, network buffers, GCS table size).

- Result: system hangs, freezes, or drops tasks.

### Similarity

| Feature              | Actor Queue Saturation                       | Task Flood Saturation                        |
|----------------------|----------------------------------------------|----------------------------------------------|
| Source of Saturation | Too many pending actor method calls          | Too many submitted tasks at once             |
| Queued in            | Per-actor queues / Raylet scheduler          | Head node GCS + Raylet                       |
| Resource block       | Limited actor concurrency                    | Global CPU/memory task limits                |
| Failure mode         | Actor crashes, timeouts                      | Driver freeze, dashboard hangs               |
| Mitigation           | Control method rate, cleanup state           | Throttle task submission, use `ray.wait()`   |

### Simply managing memory in actor state is not enough?

Managing memory alone is not sufficient in the actor case. You must also address concurrency and scheduling contention, or the system will still degrade or stall.

| Problem Type           | Cause                                              | Result                                      |
|------------------------|----------------------------------------------------|---------------------------------------------|
| Concurrency bottleneck | Actors run one method at a time by default         | Queues build up; calls wait indefinitely     |
| Scheduling contention  | Too many method calls queued; Raylet saturated     | New calls delayed or dropped                |
| Resource exhaustion    | Each actor holds CPU/memory for entire lifetime    | System can't schedule even idle work        |
| Backpressure buildup   | Client or scheduler can’t dispatch fast enough     | Control plane stalls                         |


Required Fixes (Cumulative, Not Isolated): 
1. **Bounded Submission**
   Use ray.wait() or controlled loops to limit in-flight method calls.

2. **Actor Memory Hygiene**
   Explicitly free memory (del, gc.collect()), avoid large retained state.

3. **Actor Pooling or Recycling**
   Use a pool of short-lived actors or periodically destroy/recreate them.

4. **Tune max_concurrency**
   Set @ray.remote(max_concurrency=N) if methods are async-safe.

5. **System Monitoring**
   Use dashboard, ray memory, and logs to catch bottlenecks early.

Actors introduce control-plane load, resource pinning, and queue buildup risks. Managing memory prevents crashes, but only concurrency-aware submission and system-aware orchestration prevent hangs.


**Control-plane** load refers to the overhead of managing task scheduling, metadata, object references, and actor lifecycle, not the actual computation, but the orchestration of it. It’s the management traffic and coordination cost inside Ray.

| Concept               | Description                                                                                   |
|-----------------------|-----------------------------------------------------------------------------------------------|
| Control-plane load    | Cost of managing tasks, actors, object refs, dependencies, scheduling decisions, and metadata |
| Head node saturation  | When the head node’s CPU, memory, or I/O is overwhelmed—often due to too much control-plane activity |

Key Distinctions:

| Aspect      | Control-plane Load                                 | Head Node Saturation                                  |
|-------------|----------------------------------------------------|--------------------------------------------------------|
| Scope       | Logical overhead: task metadata, scheduling        | Physical resource exhaustion (CPU, RAM, etc.)          |
| Causes      | Too many tasks, actors, object refs                | Too much control-plane + other loads combined          |
| Location    | GCS, Raylet, client-server comms                   | Entire head node (OS-level saturation)                 |
| Symptoms    | Slow scheduling, delayed actor init                | Dashboard hangs, node unresponsive                     |
| Mitigation  | Throttle submissions, batch, minimize state        | Distribute load, autoscale, reduce GCS pressure        |


**Example:**

Submitting 200,000 tasks at once causes high control-plane load.

If this overwhelms the Raylet or GCS on the head node, it leads to head node saturation.

Control-plane load leading to Head node saturation is a causal chain, not two isolated issues. Saturation is the physical manifestation of excessive logical overhead.




In [None]:
## Mitigation use ray.wait()

import ray
import time
import numpy as np

ray.init()  # use your actual initialize_ray() if needed

# ---- CONFIG ----
USE_ACTOR = True
NUM_EVALS = 200_000
SLEEP_TIME = 0.1
ARRAY_SIZE_MB = 50
MAX_IN_FLIGHT = 1000  # throttle limit

# ---- MEMORY PRESSURE OBJECT ----
def make_big_array():
    return np.ones((ARRAY_SIZE_MB * 250_000,), dtype=np.float32)

@ray.remote
class EvaluatorActor:
    def __init__(self):
        self.cache = []
    def evaluate(self, x):
        arr = make_big_array()
        self.cache.append(arr)
        time.sleep(SLEEP_TIME)
        return x * 2

@ray.remote
def evaluate_fn(x):
    arr = make_big_array()
    time.sleep(SLEEP_TIME)
    return x * 2

# ---- DISPATCH ----
print(f"Use Actor? {USE_ACTOR}")

if USE_ACTOR:
    actors = [EvaluatorActor.remote() for _ in range(10)]
    def submit_task(i):
        return actors[i % len(actors)].evaluate.remote(i)
else:
    def submit_task(i):
        return evaluate_fn.remote(i)

# ---- THROTTLED SUBMISSION ----
pending = [submit_task(i) for i in range(min(MAX_IN_FLIGHT, NUM_EVALS))]
results = []
next_task_id = len(pending)

while pending:
    done, pending = ray.wait(pending, num_returns=1)
    results.extend(ray.get(done))
    if next_task_id < NUM_EVALS:
        pending.append(submit_task(next_task_id))
        next_task_id += 1

print("Sample results:", results[:5])
print("Total results count:", len(results))