In [131]:
import numpy as np
import aiohttp
import time
import ray
import os 
import sys
import torch

os.environ["CONFIG_PATH"] = "../configs/small_model.toml"
sys.path.append("../src")

from neural_net import NeuralNet


In [123]:
BOARD_SIZE = 10

NUM_EVALUATIONS = 500
random_boards = np.random.random((NUM_EVALUATIONS, 4, BOARD_SIZE, BOARD_SIZE))

async with aiohttp.ClientSession() as session:
    start_time = time.perf_counter()
    for _ in range(5):    
        async with session.get(
            "http://localhost:8000",
            data=random_boards.tobytes(),
        ) as response:
            if response.status != 200:
                print("uh oh", response.status)

            content = await response.read()

            values_bytes, policy_logits_bytes = content.split(b"|||divider|||")
            values = np.frombuffer(values_bytes, dtype=np.float32).reshape((-1, 4))
            policy_logits = np.frombuffer(policy_logits_bytes, dtype=np.float32).reshape((-1, 6233))

            assert values.shape[1] == 4
            assert policy_logits.shape[1] == 6233

    end_time = time.perf_counter()

print((end_time - start_time) / (500 * 5))

0.00040913026640191676


In [133]:
import ray
import asyncio

@ray.remote
class Actor:
    def __init__(self):
        self.model = NeuralNet().to("mps")

    def run(self, boards):
        boards_tensor = torch.from_numpy(np.array(boards)).to(dtype=torch.float, device="mps")
        with torch.inference_mode():
            values_logits_tensor, policy_logits_tensor = self.model(boards_tensor)
        
        values = torch.softmax(values_logits_tensor, dim=1).cpu().numpy()
        policy_logits = policy_logits_tensor.cpu().numpy()

        return values, policy_logits
        

actor = Actor.remote()

await actor.run.remote()

TypeError: missing a required argument: 'boards'

[36m(TemporaryActor pid=25920)[0m Exception raised in creation task: The actor died because of an error raised in its creation task, [36mray::Actor.__init__()[39m (pid=25920, ip=127.0.0.1, actor_id=f72a1e94e64475a2080eaade01000000, repr=<__main__.FunctionActorManager._create_fake_actor_class.<locals>.TemporaryActor object at 0x103a48c10>)
[36m(TemporaryActor pid=25920)[0m            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[36m(TemporaryActor pid=25920)[0m RuntimeError: The actor with name Actor failed to import on the worker. This may be because needed library dependencies are not installed in the worker environment:
[36m(TemporaryActor pid=25920)[0m 
[36m(TemporaryActor pid=25920)[0m [36mray::Actor.__init__()[39m (pid=25920, ip=127.0.0.1, actor_id=f72a1e94e64475a2080eaade01000000, repr=<__main__.FunctionActorManager._create_fake_actor_class.<locals>.TemporaryActor object at 0x103a48c10>)
[36m(TemporaryActor pid=25920)[0m                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
[36m

In [128]:
import os 
os.getpid()

25751

In [145]:
import ray

NUM_EVALUATIONS = 1000
BATCH_SIZE = 100
random_boards = np.random.random((BATCH_SIZE, 4, BOARD_SIZE, BOARD_SIZE))

@ray.remote
class Actor:
    def __init__(self):
        pass

    def run(self, boards):
        boards_tensor = torch.from_numpy(np.array(boards)).to(dtype=torch.float, device="mps")
        with torch.inference_mode():
            values_logits_tensor, policy_logits_tensor = self.model(boards_tensor)
        
        values = torch.softmax(values_logits_tensor, dim=1).cpu().numpy()
        policy_logits = policy_logits_tensor.cpu().numpy()

        return values, policy_logits
        

actor = Actor.remote()

for i in range(NUM_EVALUATIONS):
    await actor.run.remote(random_boards.copy())