# Ray Core: Zero to Hero

Ray has **three primitives**. Everything else is built on top:

```
┌─────────────────────────────────────────────────────────────────────────┐
│                              RAY CORE                                   │
├───────────────────────┬───────────────────────┬─────────────────────────┤
│         TASKS         │        ACTORS         │      OBJECT STORE       │
│                       │                       │                         │
│  Stateless functions  │  Stateful objects     │  Shared memory that     │
│  that run in parallel │  that live on workers │  holds all the data     │
├───────────────────────┴───────────────────────┴─────────────────────────┤
│                                                                         │
│  "I want to run this    "I need state that     "How do tasks and        │
│   function 1000 times"   persists across calls" actors share data?"     │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘
```

---

## 1. The Problem: Sequential Python is Slow

In [None]:
import time

def slow_square(x):
    time.sleep(0.5)
    return x * x

start = time.time()
results = [slow_square(i) for i in range(8)]
print(f"Local (sequential): {time.time() - start:.1f}s")
print(f"Results: {results}")

Local (sequential): 4.0s
Results: [0, 1, 4, 9, 16, 25, 36, 49]


---

## 2. Starting Ray

When you call `ray.init()`, Ray starts a local cluster:

```
┌─────────────────────────────────────────────────────────────────────────┐
│                              YOUR MACHINE                               │
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                         HEAD NODE                                │   │
│  │  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐  │   │
│  │  │  GCS (metadata) │  │     Raylet      │  │  Object Store   │  │   │
│  │  │                 │  │   (scheduler)   │  │ (shared memory) │  │   │
│  │  └─────────────────┘  └─────────────────┘  └─────────────────┘  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                    │                                    │
│            ┌───────────────────────┼───────────────────────┐            │
│            ▼                       ▼                       ▼            │
│     ┌───────────┐           ┌───────────┐           ┌───────────┐      │
│     │  Worker   │           │  Worker   │           │  Worker   │      │
│     │ (Python)  │           │ (Python)  │           │ (Python)  │      │
│     └───────────┘           └───────────┘           └───────────┘      │
│                                                                         │
│  Key: Same code scales to 1000-node cluster without changes!            │
└─────────────────────────────────────────────────────────────────────────┘
```

In [None]:
# How to start ray
import ray

ray.init(num_cpus=4, # MAKE SURE TO SET THIS TO THE NUMBER OF CPUS YOU WANT TO USE
         # limit the object store memory to 1GB for this example
         object_store_memory=1 * 1024 * 1024 * 1024,
         ignore_reinit_error=True)  # Connect to an existing Ray cluster

print(ray.cluster_resources())  # Print the cluster resources

2026-02-01 21:00:22,292	INFO worker.py:1839 -- Calling ray.init() again after it has already been called.


{'node:__internal_head__': 1.0, 'node:127.0.0.1': 1.0, 'object_store_memory': 1073741824.0, 'CPU': 4.0, 'memory': 9329328128.0}


---

## 3. Tasks: Parallel Functions

A **task** is a function decorated with `@ray.remote`. Calling `.remote()` returns immediately with an `ObjectRef` (a future/promise).

```
SEQUENTIAL (Python default)              PARALLEL (Ray)
┌─────────────────────────────┐         ┌─────────────────────────────┐
│                             │         │                             │
│  task(0) ████               │         │  task(0) ████               │
│  task(1)     ████           │         │  task(1) ████               │
│  task(2)         ████       │         │  task(2) ████               │
│  task(3)             ████   │         │  task(3) ████               │
│                             │         │                             │
│  Total: 4 units             │         │  Total: 1 unit              │
└─────────────────────────────┘         └─────────────────────────────┘
```

**The pattern:**
```python
# 1. Decorate with @ray.remote
@ray.remote
def my_function(x):
    return x * 2

# 2. Call with .remote() - returns immediately with ObjectRef
ref = my_function.remote(5)  # Non-blocking!

# 3. Get result with ray.get() - blocks until ready
result = ray.get(ref)  # 10
```

In [None]:
@ray.remote
def slow_square_ray(x):
    """Same function, but runs on Ray workers"""
    time.sleep(0.5)
    return x * x

# Run 8 tasks on Ray (in parallel)
start = time.time()
futures = [slow_square_ray.remote(i) for i in range(8)]  # Schedule all 8 instantly
# This blocks until the results are ready
results = ray.get(futures)  # Wait for all to finish
print(f"Ray (parallel): {time.time() - start:.1f}s")
print(f"Results: {results}")

Ray (parallel): 1.0s
Results: [0, 1, 4, 9, 16, 25, 36, 49]


### Task Dependencies (DAGs)

Tasks can depend on other tasks. Ray automatically figures out the execution order.

```
The execution graph:

fetch("db")  ──► process() ──┐
                             │
fetch("api") ──► process() ──┼──► combine() ──► result
                             │
fetch("cache")─► process() ──┘

Time: ─────────────────────────────────────────────────►
      |── 1s fetch ──|── 1s process ──|── 0.5s ──|
      (all parallel)   (all parallel)   combine
      
Total: 2.5s (not 7.5s if sequential!)
```

In [None]:
# Task Dependencies (DAG)
# Tasks can depend on other tasks: fetch -> process -> combine

import datetime

def timestamp():
    return datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]

@ray.remote
def fetch_data(source):
    print(f"[{timestamp()}] Fetching from {source}...")
    time.sleep(1)
    print(f"[{timestamp()}] Fetched {source}")
    return f"data_from_{source}"

@ray.remote
def process(data):
    print(f"[{timestamp()}] Processing {data}...")
    time.sleep(1)
    print(f"[{timestamp()}] Processed {data}")
    return f"processed({data})"

@ray.remote
def combine(results):
    resolved = ray.get(results)
    print(f"[{timestamp()}] Combining: {resolved}")
    time.sleep(0.5)
    print(f"[{timestamp()}] Combined!")
    return f"combined({resolved})"

start = time.time()
data_refs = [fetch_data.remote(s) for s in ["db", "api", "cache"]]
processed_refs = [process.remote(d) for d in data_refs]
final = combine.remote(processed_refs)

result = ray.get(final)
print(f"\nTotal time: {time.time() - start:.1f}s")
print(f"Result: {result}")

[36m(fetch_data pid=8068)[0m [21:01:08.949] Fetching from api...
[36m(fetch_data pid=8071)[0m [21:01:08.949] Fetching from cache...
[36m(fetch_data pid=8070)[0m [21:01:08.949] Fetching from db...
[36m(fetch_data pid=8068)[0m [21:01:09.950] Fetched api
[36m(process pid=8068)[0m [21:01:09.957] Processing data_from_db...
[36m(fetch_data pid=8071)[0m [21:01:09.950] Fetched cache
[36m(process pid=8071)[0m [21:01:09.957] Processing data_from_cache...
[36m(fetch_data pid=8070)[0m [21:01:09.950] Fetched db
[36m(process pid=8070)[0m [21:01:09.954] Processing data_from_api...
[36m(combine pid=8069)[0m [21:01:10.960] Combining: ['processed(data_from_db)', 'processed(data_from_api)', 'processed(data_from_cache)']
[36m(process pid=8068)[0m [21:01:10.958] Processed data_from_db
[36m(process pid=8071)[0m [21:01:10.958] Processed data_from_cache
[36m(process pid=8070)[0m [21:01:10.955] Processed data_from_api

Total time: 2.5s
Result: combined(['processed(data_from_db)', 'pro

[36m(combine pid=8069)[0m [21:01:11.461] Combined!


### `ray.wait()`: Process Results as They Arrive

`ray.get(refs)` blocks until **ALL** results are ready. Sometimes you want to process results as they complete:

```
ray.get([refs])              vs           ray.wait(refs)
┌────────────────────────┐               ┌────────────────────────┐
│ task1 ████             │               │ task1 ████ -> process! │
│ task2 ████████████     │               │ task2 ████████████     │
│ task3 ██████           │               │ task3 ██████ -> process│
│                        │               │        ...             │
│ (wait for ALL)    ▼    │               │ (process as ready) ▼   │
│ process results        │               │ task2 done -> process! │
└────────────────────────┘               └────────────────────────┘
```

In [None]:
# Don't block on everything finishing
@ray.remote
def variable_time_task(i):
    import time, random
    time.sleep(random.uniform(0.1, 2.0))
    return i

refs = [variable_time_task.remote(i) for i in range(10)]

# Process results as they complete
while refs:
    # Wait for the next task to complete
    ready, refs = ray.wait(refs, num_returns=1)
    result = ray.get(ready[0])
    print(f"Got result: {result}")


Got result: 3
Got result: 0
Got result: 2
Got result: 1
Got result: 5
Got result: 4
Got result: 6
Got result: 8
Got result: 7
Got result: 9


---

## 4. Actors: Stateful Objects

Tasks are stateless - each invocation is independent. But sometimes you need:
- State that persists across calls (counters, caches, ML models)
- A resource that shouldn't be recreated (database connections, GPU memory)
- Coordination between operations (locks, queues)

**Actors** are the answer: classes decorated with `@ray.remote`.

```
TASKS (stateless)                        ACTORS (stateful)
┌─────────────────────────────┐         ┌─────────────────────────────┐
│                             │         │                             │
│  @ray.remote                │         │  @ray.remote                │
│  def func(x):               │         │  class Counter:             │
│      return x * 2           │         │      def __init__(self):    │
│                             │         │          self.count = 0     │
│  # Each call independent    │         │                             │
│  # No memory between calls  │         │      def increment(self):   │
│                             │         │          self.count += 1    │
│                             │         │          return self.count  │
│                             │         │                             │
│                             │         │  # State persists!          │
└─────────────────────────────┘         └─────────────────────────────┘
```

### Tasks vs Actors: When to Use What

| Use **Tasks** when...              | Use **Actors** when...              |
|------------------------------------|-------------------------------------|
| Function is pure (no side effects) | Need state across calls             |
| Each call is independent           | Need to coordinate operations       |
| Can recreate any required state    | Hold expensive resources (GPU, DB)  |
| Want automatic load balancing      | Need predictable placement          |
| Embarrassingly parallel workloads  | Stateful services (caches, models)  |

In [None]:
@ray.remote
class Counter:
    def __init__(self, start=0):
        self.value = start

    def increment(self):
        self.value += 1
        return self.value

    def get(self):
        return self.value
    
# Create a counter actor
# Pass starting value = 10
counter = Counter.remote(start=10)
# call methods on the actor
ref = counter.increment.remote()
print(f"Counter after increment: {ray.get(ref)}")

#Multiple calls maintin state
refs = [counter.increment.remote() for _ in range(5)]
print(ray.get(refs))  # [12, 13, 14, 15, 16]

Counter after increment: 11
[12, 13, 14, 15, 16]


In [None]:
# Actors: Shared State Across Workers
#
# Unlike tasks (stateless), actors keep state in a dedicated worker process.
# Multiple workers can send messages to the same actor to read/write shared data.
#
# ┌────────────────────────────────────────────────────────────────┐
# │  Worker Process (dedicated to this actor)                      │
# │  ┌──────────────────────────────────────┐                      │
# │  │  DataStore instance                  │                      │
# │  │  self.data = {"key_0": 0, "key_1": 1, ...}                  │
# │  └──────────────────────────────────────┘                      │
# └────────────────────────────────────────────────────────────────┘
#         ^           ^           ^
#         |           |           |
#    store.put    store.put   store.get
#    .remote()    .remote()   .remote()
#         |           |           |
# ┌───────┴───┐ ┌─────┴─────┐ ┌───┴───────┐
# │ worker 0  │ │ worker 1  │ │  driver   │
# └───────────┘ └───────────┘ └───────────┘

@ray.remote
class DataStore:
    """An actor that holds a dictionary - state persists across all method calls."""
    def __init__(self):
        self.data = {}  # This lives for the actor's lifetime

    def put(self, key, value):
        self.data[key] = value

    def get(self, key):
        return self.data.get(key)

@ray.remote
def worker(store, key, value):
    """A task that writes to the shared DataStore actor."""
    # store is an ActorHandle, not the object itself
    # .remote() sends an async message to the actor (doesn't wait)
    ray.get(store.put.remote(key, value))  # Wait for put to complete
    return f"Stored {key}"

# 1. Create ONE DataStore instance (lives in its own worker process)
store = DataStore.remote()  # Returns ActorHandle, not the object

# 2. Launch 10 workers - all share the SAME store
#    Each worker sends a put() message to the actor
refs = [worker.remote(store, f"key_{i}", i) for i in range(10)]
ray.get(refs)  # Wait for all workers to finish

# 3. All data is now in the actor's self.data dict
#    store.get.remote() sends a message to the actor and returns a Future
print(ray.get(store.get.remote("key_5")))  # 5
print(ray.get(store.get.remote("key_10")))  # None

5
None


In [None]:
# Actor Lifecycle
#
# REGULAR ACTORS: tied to the driver that created them
# ┌─────────────────┐                    ┌─────────────────┐
# │  Driver Process │  Worker.remote()   │  Worker Process │
# │  worker = ──────┼───────────────────>│  Actor instance │
# │  (driver exits) │                    │  (actor dies)   │
# └─────────────────┘                    └─────────────────┘
#
# NAMED + DETACHED ACTORS: survive driver exit, retrievable by name
# ┌─────────────────┐                    ┌─────────────────┐
# │  Driver 1       │  options(name=..., │  Worker Process │
# │  creates actor──┼─ lifetime=detached)│  Actor instance │
# │  (driver exits) │                    │  (STILL ALIVE!) │
# └─────────────────┘                    │                 │
# ┌─────────────────┐                    │                 │
# │  Driver 2       │  get_actor(name)   │                 │
# │  retrieves ─────┼───────────────────>│  Same instance! │
# └─────────────────┘                    └─────────────────┘
#
# | Actor Type | Dies When                          |
# |------------|-------------------------------------|
# | Regular    | Driver exits or ray.kill()         |
# | Named      | Driver exits or ray.kill()         |
# | Detached   | Only ray.kill()                    |

@ray.remote
class Worker:
    def __init__(self, name):
        self.name = name
        print(f"Worker {name} started")
    
    def work(self):
        return f"{self.name} did work"

# 1. Create a regular actor (dies when driver exits)
worker = Worker.remote("worker-1")
print(ray.get(worker.work.remote()))

# 2. Kill gracefully (waits for current task to finish)
#    force=True would interrupt immediately
ray.kill(worker, no_restart=True)

# 3. Create a NAMED + DETACHED actor
#    - name: retrievable from anywhere in cluster via ray.get_actor()
#    - lifetime="detached": survives even if this notebook closes
named_worker = Worker.options(
    name="global_worker",
    lifetime="detached",
    # Other options:
    # num_cpus=2,          # Reserve 2 CPUs
    # num_gpus=1,          # Reserve 1 GPU
    # max_restarts=3,      # Auto-restart on failure
    # max_concurrency=10,  # Handle 10 concurrent calls (async)
).remote("global")

print(ray.get(named_worker.work.remote()))

# 4. Get existing actor by name (works from ANY process in the cluster)
same_worker = ray.get_actor("global_worker")
print(ray.get(same_worker.work.remote()))  # Same actor!

# 5. Clean up - must explicitly kill detached actors
ray.kill(named_worker, no_restart=True)

worker-1 did work
[36m(Worker pid=23469)[0m Worker worker-1 started
[33m(raylet)[0m It looks like you're creating a detached actor in an anonymous namespace. In order to access this actor in the future, you will need to explicitly connect to this namespace with ray.init(namespace="0034eab4-36b3-4217-a570-6c9e892633e5", ...)
global did work
global did work


[36m(Worker pid=23472)[0m Worker global started


---

## 5. Object Store: The Secret Sauce

The **Object Store** (Plasma) is shared memory that all workers can access. This is what makes Ray fast.

```
┌─────────────────────────────────────────────────────────────────────────┐
│                              SINGLE NODE                                │
│                                                                         │
│    ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐            │
│    │ Worker  │    │ Worker  │    │ Worker  │    │ Worker  │            │
│    └────┬────┘    └────┬────┘    └────┬────┘    └────┬────┘            │
│         │              │              │              │                  │
│         └──────────────┴──────┬───────┴──────────────┘                  │
│                               │                                         │
│                    ┌──────────▼──────────┐                              │
│                    │    OBJECT STORE     │                              │
│                    │   (Shared Memory)   │                              │
│                    │                     │                              │
│                    │  [array1] [array2]  │                              │
│                    │  [model]  [config]  │                              │
│                    └─────────────────────┘                              │
│                                                                         │
│    Key insight: Workers don't copy data. They read directly from        │
│    shared memory. NumPy arrays are ZERO-COPY!                           │
└─────────────────────────────────────────────────────────────────────────┘
```

### ObjectRef: The Pointer

When a task returns, the result goes into the object store. You get an `ObjectRef` back - it's just a pointer, not the data itself.

In [None]:
# The object store
@ray.remote
def create_array():
    import numpy as np
    return np.zeros((10000, 10000))  # ~800MB

ref = create_array.remote()  # Returns immediately with ObjectRef
print(ref)  # ObjectRef(...)

# The 800MB array is in the object store, not in your process
# ref is just a pointer to it
# obtain the actual object with ray.get()
array = ray.get(ref)  # Blocks until the object is ready
print(array.shape)  # (10000, 10000)

ObjectRef(e5b2fae6e42f6bbaffffffffffffffffffffffff0100000001000000)
(10000, 10000)


### Zero-Copy Reads (NumPy/Arrow)

For NumPy arrays and Arrow tables, Ray uses **zero-copy deserialization**. Multiple workers read the same data without copying:

```
┌─────────────────────────────────────────────────────────────────────────┐
│  Object Store                                                           │
│  ┌───────────────────────────────────────────────────────┐              │
│  │  numpy_array (800MB)                                   │              │
│  │  [████████████████████████████████████████████████]   │              │
│  └───────────────────────────────────────────────────────┘              │
│         ▲              ▲              ▲              ▲                   │
│         │              │              │              │                   │
│    (zero-copy)    (zero-copy)   (zero-copy)    (zero-copy)              │
│         │              │              │              │                   │
│    ┌────┴────┐    ┌────┴────┐   ┌────┴────┐    ┌────┴────┐             │
│    │ Worker1 │    │ Worker2 │   │ Worker3 │    │ Worker4 │             │
│    │ arr.sum │    │ arr.mean│   │ arr.std │    │ arr.max │             │
│    └─────────┘    └─────────┘   └─────────┘    └─────────┘             │
│                                                                         │
│    No copies made! All workers read the same memory location.           │
└─────────────────────────────────────────────────────────────────────────┘
```

In [None]:
# Zero-copy Reads
import numpy as np

@ray.remote
def create_data():
    data = np.random.random((10000, 10000))  # ~800MB
    return data

@ray.remote
def process_data(data):
    # data is not copied - zero-copy read from object store
    return np.sum(data)

data_ref = create_data.remote()
print("data_ref: ", data_ref)
# Launch 10 tasks that all read the same data
# NO COPYING - all 10 workers read from the same shared memory
refs = [process_data.remote(data_ref) for _ in range(5)]
results = ray.get(refs)
print(results)  # [0.0, 0.0, ..., 0.0]


data_ref:  ObjectRef(278af2822d9a793affffffffffffffffffffffff0100000001000000)
[np.float64(49999601.18804898), np.float64(49999601.18804898), np.float64(49999601.18804898), np.float64(49999601.18804898), np.float64(49999601.18804898)]


In [None]:
# Put data in object store explicitly
#
# ray.put() stores data once, then you pass the ref to many tasks.
# Ray AUTO-RESOLVES refs when passed to tasks - no ray.get() needed inside!
#
# ┌─────────────────────────────────────────────────────────────────┐
# │  Object Store (shared memory)                                   │
# │  ┌─────────────────────────────────────┐                        │
# │  │  large_array (800MB, stored ONCE)   │                        │
# │  └─────────────────────────────────────┘                        │
# └─────────────────────────────────────────────────────────────────┘
#         ^           ^           ^
#         |           |           |  (zero-copy reads)
#    process()   process()   process()
#
# GOOD: ray.put() once, pass ref 10 times  -> 1 copy in memory
# BAD:  pass array directly 10 times       -> 10 copies serialized
#
# Object store config:
#   ray.init(object_store_memory=10 * 1024 * 1024 * 1024)  # 10GB
#   Default: 30% of RAM
#
# Reference counting:
#   ref = ray.put(data)
#   del ref  # Object becomes eligible for garbage collection

import numpy as np

large_array = np.random.rand(10000, 10000)  # ~800MB
ref = ray.put(large_array)  # Store once in object store

@ray.remote
def process(data):
    # data is ALREADY the numpy array (Ray auto-resolved the ref)
    # DO NOT call ray.get() here!
    return data.sum()

# Bad: Pass data directly (serialized 10 times!)
start = time.time()
results = ray.get([process.remote(large_array) for _ in range(5)])
bad_time = time.time() - start
print(f"Bad (pass data):  {bad_time:.2f}s")

# Good: Pass ref - Ray resolves it automatically, zero-copy
start = time.time()
results = ray.get([process.remote(ref) for _ in range(5)])
good_time = time.time() - start
print(f"Good (pass ref):  {good_time:.2f}s")
print(f"Speedup: {bad_time / good_time:.1f}x")
print(f"Results (all same): {results[0]:.2f}")

Bad (pass data):  16.23s
Good (pass ref):  2.30s
Speedup: 7.1x
Results (all same): 49998857.32


---

## 6. Summary: Patterns & Anti-Patterns

### The Good (Do This)

```python
# 1. Batch submissions - submit all, then wait
refs = [task.remote(i) for i in range(1000)]
results = ray.get(refs)

# 2. Use ray.put() for shared data
data_ref = ray.put(large_data)  # 1 serialization
refs = [process.remote(data_ref) for _ in range(100)]  # Just passing refs

# 3. Process results as they arrive
while refs:
    ready, refs = ray.wait(refs, num_returns=1)
    handle_result(ray.get(ready[0]))

# 4. Use actors for expensive resources
@ray.remote(num_gpus=1)
class ModelServer:
    def __init__(self):
        self.model = load_heavy_model()  # Only loaded once!
```

### The Bad (Don't Do This)

```python
# 1. DON'T submit and wait one at a time
for i in range(1000):
    result = ray.get(task.remote(i))  # Blocks each time!

# 2. DON'T pass large data directly (serialized N times!)
refs = [process.remote(large_data) for _ in range(100)]  # 100 copies!

# 3. DON'T create too many tiny tasks
refs = [add_one.remote(i) for i in range(1_000_000)]  # Overhead > work

# 4. DON'T call ray.get() inside tasks (blocks a worker)
@ray.remote
def bad_task():
    return ray.get(other_task.remote())  # Wastes a worker!
```

---