In [1]:
import ray, sys, platform
ray.shutdown()
ray.init()


  from .autonotebook import tqdm as notebook_tqdm
2025-09-08 18:22:04,350	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2025-09-08 18:22:07,555	INFO worker.py:1951 -- Started a local Ray instance.


0,1
Python version:,3.12.10
Ray version:,2.49.1


In [None]:
#Adding @remote turns a function into a task
#calling .remote() returns a future
#Ray tasks run concurrent cross CPUs

import ray, time, os

@ray.remote
def slow_square(x):
    time.sleep(0.2)
    return (x*x, os.getpid())

t0 = time.perf_counter()
futs = [slow_square.remote(i) for i in range(8)]
vals = ray.get(futs)

print(f"Results: {vals}")
print(f"TIme: {time.perf_counter() - t0:.2f}s")

Results: [(0, 46664), (1, 41872), (4, 52524), (9, 47132), (16, 4000), (25, 26024), (36, 31560), (49, 42408)]
TIme: 0.21s


In [20]:
# Ray put and Ray get

big = list(range(1000000))
ref = ray.put(big)

@ray.remote
def sum_ref(r): return sum(r)

futs = [sum_ref.remote(ref) for _ in range(4)]

print(ray.get(futs)[:2], "... (all sums equal)")

[499999500000, 499999500000] ... (all sums equal)


In [None]:
# Ray wait lets you stream results as they come. Perfect for pipelines

import random

@ray.remote
def jitter(x):
    time.sleep(random.uniform(0.0, 0.5))
    return x

futs = [jitter.remote(i) for i in range(10)]
done, pending = ray.wait(futs, num_returns=3)
print(f"First Done {ray.get(done)}")

First Done [1, 3, 6]


In [26]:
# Retreis, timeouts and cancellations

@ray.remote(max_retries=2)
def flaky(i):
    if i % 3 == 0:
        raise RuntimeError("boom")
    return i

futs = [flaky.remote(i) for i in  range(6)]

try:
    print(ray.get(futs))
except ray.exceptions.RayTaskError as e:
    print("Some tasks failed after retries:", e)
    
    

# timeout & cancel example
@ray.remote
def sleepy(): time.sleep(5); return "done"

f = sleepy.remote()
ready, pending = ray.wait([f], timeout=1.0)
if pending:
    ray.cancel(f, force=True)
    print("Cancelled long task.")
    
    

Some tasks failed after retries: [36mray::flaky()[39m (pid=52524, ip=127.0.0.1)
  File "python\\ray\\_raylet.pyx", line 1936, in ray._raylet.execute_task
  File "C:\Users\fiona\AppData\Local\Temp\ipykernel_1392\1460381954.py", line 6, in flaky
RuntimeError: boom
Cancelled long task.


2025-09-08 18:45:07,471	ERROR worker.py:429 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::flaky()[39m (pid=26024, ip=127.0.0.1)
  File "python\\ray\\_raylet.pyx", line 1936, in ray._raylet.execute_task
  File "C:\Users\fiona\AppData\Local\Temp\ipykernel_1392\1460381954.py", line 6, in flaky
RuntimeError: boom


In [32]:
# Resource aware scheduling

@ray.remote(num_cpus = 4)
def heavy(i):
    time.sleep(0.5)
    return i

t0 = time.perf_counter()
futs = [heavy.remote(i) for i in range(8)]
ray.get(futs)

print(f"Wall time groups by 4-CPU slots -> {time.perf_counter()-t0:.2f}s")


Wall time groups by 4-CPU slots -> 1.01s


In [33]:
#Statefull agents

@ray.remote
class Counter:
    def __init__(self): self.n = 0
    def inc(self, k=1):
        self.n += k
        return self.n
    def get(self): return self.n
    
c = Counter.remote()
print(ray.get([c.inc.remote() for _ in range(3)]))

print(f"Final {ray.get(c.get.remote())}")


[1, 2, 3]
Final 3


In [38]:
# Actor Concurrency
@ray.remote(max_concurrency = 4)
class Adder:
    def add(self, x, y):
        time.sleep(0.2)
        return x + y
    
a = Adder.remote()
futs = [a.add.remote(i,i) for i in range(12)]
print("First 5:", ray.get(futs[:3]))


First 5: [0, 2, 4]


In [43]:
@ray.remote(num_gpus=1)
class GPUHolder:
    def where(self):
        import os
        return (os.getpid(), "I hold the GPU")
g = GPUHolder.remote()
print(ray.get(g.where.remote()))

(12652, 'I hold the GPU')


In [45]:
# A batched gateway pattern
import asyncio

@ray.remote
class Batcher:
    def __init__(self, max_batch=8, flush_ms=50):
        self.queue = []
        self.max_batch = max_batch
        self.flush_ms = flush_ms

    async def submit(self, x):
        loop = asyncio.get_event_loop()
        fut = loop.create_future()
        self.queue.append((x, fut))
        if len(self.queue) >= self.max_batch:
            await self._flush()
        else:
            # timer-based flush
            loop.call_later(self.flush_ms/1000.0, lambda: asyncio.create_task(self._flush()))
        return await fut

    async def _flush(self):
        if not self.queue: 
            return
        batch, self.queue = self.queue, []
        xs = [x for x,_ in batch]
        # "process" the batch in one go (here just square)
        ys = [x*x for x in xs]
        for (_, fut), y in zip(batch, ys):
            if not fut.done(): fut.set_result(y)

b = Batcher.remote(max_batch=4, flush_ms=30)
futs = [b.submit.remote(i) for i in range(10)]
print("Batched results:", ray.get(futs))


Batched results: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


In [47]:
%pip install -qU "ray[tune]"

import ray, time, random
from ray import tune
from ray.air import session

ray.shutdown(); ray.init(ignore_reinit_error=True, include_dashboard=False)

def objective(cfg):
    x = cfg["x"]
    time.sleep(0.1)
    loss = (x - 3.0)**2 + random.uniform(-0.02, 0.02)
    session.report({"loss": loss})   # <-- critical: report the metric

search_space = {"x": tune.uniform(-5, 5)}
tuner = tune.Tuner(
    objective,
    param_space=search_space,
    tune_config=tune.TuneConfig(num_samples=20, metric="loss", mode="min"),
)
results = tuner.fit()

print("Completed trials:", results.num_completed_trials)
best = results.get_best_result(metric="loss", mode="min")
print("Best config:", best.config, "| best loss:", best.metrics["loss"])


0,1
Current time:,2025-09-08 19:14:12
Running for:,00:00:05.20
Memory:,13.7/15.7 GiB

Trial name,status,loc,x,iter,total time (s),loss
objective_9c643_00000,TERMINATED,127.0.0.1:9244,-0.578285,1,0.102486,12.795
objective_9c643_00001,TERMINATED,127.0.0.1:39616,-0.843782,1,0.10178,14.7661
objective_9c643_00002,TERMINATED,127.0.0.1:36932,0.170927,1,0.100817,7.99845
objective_9c643_00003,TERMINATED,127.0.0.1:51920,0.924318,1,0.101796,4.32576
objective_9c643_00004,TERMINATED,127.0.0.1:5180,-0.284215,1,0.10123,10.799
objective_9c643_00005,TERMINATED,127.0.0.1:48628,-4.03419,1,0.101094,49.4921
objective_9c643_00006,TERMINATED,127.0.0.1:28500,-2.08403,1,0.102021,25.8505
objective_9c643_00007,TERMINATED,127.0.0.1:15060,1.54617,1,0.101331,2.10525
objective_9c643_00008,TERMINATED,127.0.0.1:25920,-2.8935,1,0.101118,34.7225
objective_9c643_00009,TERMINATED,127.0.0.1:5776,2.93857,1,0.101298,-0.000407724


2025-09-08 19:14:12,990	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to 'C:/Users/fiona/ray_results/objective_2025-09-08_19-14-07' in 0.0428s.
2025-09-08 19:14:13,000	INFO tune.py:1041 -- Total run time: 5.21 seconds (5.15 seconds for the tuning loop).


AttributeError: 'ResultGrid' object has no attribute 'num_completed_trials'

In [48]:
%pip install -qU ray
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer   # we’ll just use it as a generic trainer

import numpy as np

def train_loop_per_worker(config):
    # y = 2x + 1 with noise; fit w via GD
    rng = np.random.default_rng(0)
    X = rng.normal(size=(2000, 1))
    y = 2*X + 1 + 0.1 * rng.normal(size=(2000, 1))

    w = np.zeros((1,1))
    b = np.zeros((1,))
    lr = config.get("lr", 0.1)
    steps = config.get("steps", 200)
    bs = config.get("batch_size", 64)

    for t in range(steps):
        idx = rng.integers(0, len(X), size=(bs,))
        xb, yb = X[idx], y[idx]
        preds = xb @ w + b
        err = preds - yb
        # grads
        gw = (xb.T @ err) / bs
        gb = err.mean(axis=0)
        # update
        w -= lr * gw
        b -= lr * gb
    # report simple metric
    final_mse = float(((X @ w + b - y)**2).mean())
    return {"mse": final_mse, "w": float(w[0,0]), "b": float(b[0])}

ray.shutdown(); ray.init(ignore_reinit_error=True)
trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=1, use_gpu=False),  # keep Windows-happy
    train_loop_config={"lr":0.2, "steps":300, "batch_size":128},
)
result = trainer.fit()
print("Train result:", result)
ray.shutdown()


2025-09-08 19:16:16,965	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


Note: you may need to restart the kernel to use updated packages.


2025-09-08 19:16:19,397	INFO worker.py:1951 -- Started a local Ray instance.
2025-09-08 19:16:20,943	INFO tune.py:616 -- [output] This uses the legacy output and progress reporter, as Jupyter notebooks are not supported by the new engine, yet. For more information, please see https://github.com/ray-project/ray/issues/36949
2025-09-08 19:16:20,950	INFO data_parallel_trainer.py:339 -- GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.


== Status ==
Current time: 2025-09-08 19:16:21 (running for 00:00:00.12)
Using FIFO scheduling algorithm.
Logical resource usage: 2.0/20 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: C:/Users/fiona/AppData/Local/Temp/ray/session_2025-09-08_19-16-17_734065_1392/artifacts/2025-09-08_19-16-20/TorchTrainer_2025-09-08_19-16-20/driver_artifacts
Number of trials: 1/1 (1 PENDING)




[36m(TorchTrainer pid=35244)[0m GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.


== Status ==
Current time: 2025-09-08 19:16:26 (running for 00:00:05.14)
Using FIFO scheduling algorithm.
Logical resource usage: 2.0/20 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: C:/Users/fiona/AppData/Local/Temp/ray/session_2025-09-08_19-16-17_734065_1392/artifacts/2025-09-08_19-16-20/TorchTrainer_2025-09-08_19-16-20/driver_artifacts
Number of trials: 1/1 (1 RUNNING)




[36m(RayTrainWorker pid=37128)[0m Setting up process group for: env:// [rank=0, world_size=1]
[36m(RayTrainWorker pid=37128)[0m [W908 19:16:27.000000000 socket.cpp:755] [c10d] The client socket has failed to connect to [kubernetes.docker.internal]:53506 (system error: 10049 - The requested address is not valid in its context.).
2025-09-08 19:16:27,902	ERROR tune_controller.py:1331 -- Trial task failed for trial TorchTrainer_ebc24_00000
Traceback (most recent call last):
  File "c:\Users\fiona\Documents\GitHub\Transformers\transformers-mini\.venv312\Lib\site-packages\ray\air\execution\_internal\event_manager.py", line 110, in resolve_future
    result = ray.get(future)
             ^^^^^^^^^^^^^^^
  File "c:\Users\fiona\Documents\GitHub\Transformers\transformers-mini\.venv312\Lib\site-packages\ray\_private\auto_init_hook.py", line 22, in auto_init_wrapper
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "c:\Users\fiona\Documents\GitHub\Transformers\transformers-m

== Status ==
Current time: 2025-09-08 19:16:27 (running for 00:00:06.97)
Using FIFO scheduling algorithm.
Logical resource usage: 2.0/20 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
Result logdir: C:/Users/fiona/AppData/Local/Temp/ray/session_2025-09-08_19-16-17_734065_1392/artifacts/2025-09-08_19-16-20/TorchTrainer_2025-09-08_19-16-20/driver_artifacts
Number of trials: 1/1 (1 ERROR)
Number of errored trials: 1
+--------------------------+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Trial name               |   # failures | error file                                                                                                                                                                                                             |
|--------------------------+--------------+---------------------

TrainingFailedError: The Ray Train run failed. Please inspect the previous error messages for a cause. After fixing the issue (assuming that the error is not caused by your own application logic, but rather an error such as OOM), you can restart the run from scratch or continue this run.
To continue this run, you can use: `trainer = TorchTrainer.restore("C:/Users/fiona/ray_results/TorchTrainer_2025-09-08_19-16-20")`.
To start a new run that will retry on training failures, set `train.RunConfig(failure_config=train.FailureConfig(max_failures))` in the Trainer's `run_config` with `max_failures > 0`, or `max_failures = -1` for unlimited retries.

In [None]:
%pip install -qU ray[serve]
import ray
from ray import serve

ray.shutdown(); ray.init(ignore_reinit_error=True)
serve.shutdown()
serve.start(detached=False)

@serve.deployment(num_replicas=2)   # scale horizontally
class Doubler:
    def __call__(self, x: int) -> int:
        return x * 2

Doubler.deploy()
handle = Doubler.get_handle()
print(await handle.remote(21))   # in notebooks, you can `await` directly

await serve.shutdown()
ray.shutdown()
