# Efficient Parallel Task Execuition with *ray*
Since *ray.Pool*s appear to be batched and hence always wait for the longest task to finish, we need a more efficient way.

We have seen that policy evaluation doesn't vary in duration so much, so round robin is a good strategy for that use case. However, trajectory evaluation varies greatly. But the evaluation can be executed in a non-functional way. Workers don't need to return the result to the caller. They can instead store the result anywhere. Overall task termination status can be polled in a classical check-and-wait loop. The approach illustrated in this little tutorial takes little more that the average time over all task executions, i.e. it is almost perfectly efficient. Hence we'll implement that approach for the self play actors.

In [1]:
import ray
from alphazero.ray.generic import TaskMonitor, RayFilePickler, SimpleCountingDispatcher
from alphazero.gomoku_game import RandomBoardInitializer, GomokuGame
from alphazero.interfaces import MctsParams, PolicySpec
from alphazero.self_play import SelfPlay
from alphazero.policies.ray_impl import HeuristicRayPolicy
from alphazero.ray.trainer import create_pool, PolicyRef, SelfPlayDelegator

### Some Dummy implementations
Workers pick up tasks from a common dispatcher and provide the result to the collector. The dispatcher acts as common counter, but it could as well enumerate input resources.

In [2]:
import uuid
import time

@ray.remote
class Worker:
    def __init__(self, wid, dispatcher, collector):
        self.wid = wid
        self.collector = collector
        self.dispatcher = dispatcher

    def init(self, *args, **kwargs):
        pass

    def work(self):
        """
        fetch task, report result - until all jobs are done
        """
        while True:
            task = ray.get(self.dispatcher.get_task.remote())
            if task is None:
                break
            seqno, effort = task
            time.sleep(effort)
            the_result = str(uuid.uuid4())
            self.collector.collect.remote(self.wid, seqno, the_result)

@ray.remote
class Collector:

    def __init__(self, monitor):
        self.result = []
        self.monitor = monitor

    def collect(self, wid, seqno, load):
        """
        collect all results
        """
        self.result.append((wid, seqno, load))
        self.monitor.report.remote()

    def get_result(self):
        return self.result


@ray.remote
class Dispatcher:

    def __init__(self, num_tasks):
        self.seqno = 0
        self.num_tasks = num_tasks

    def get_task(self):
        """
        Provide tasks of different 'effort' until exhausted
        """
        self.seqno = self.seqno + 1
        if self.seqno <= self.num_tasks:
            return self.seqno, self.seqno % 4 + 1
        else:
            return None


In [3]:
rctx = ray.init(ignore_reinit_error=True)

2022-08-05 07:49:36,717	INFO services.py:1470 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [4]:
N_WORKERS = 8
N_TASKS = 32

In [5]:
the_monitor = TaskMonitor.remote()

In [6]:
the_collector = Collector.remote(the_monitor)

$N_t$ tasks, duration 1-4s, average $t=2.5s$, with $N_w$ workers: We expect $N_t/N_w*t$ seconds total duration

In [7]:
the_dispatcher = Dispatcher.remote(N_TASKS)

In [8]:
workers = [Worker.remote(wid, the_dispatcher, the_collector) for wid in range(N_WORKERS)]

In [9]:
for worker in workers:
    worker.work.remote()
from alphazero.utils import Timer

with Timer(verbose=True):
    while True:
        time.sleep(.5)
        status = ray.get(the_monitor.get_status.remote())
        if status == N_TASKS:
            break

elapsed time: 11655.386852 ms


A quick check confirms that actor order is sufficiently random to rule out systematic inefficiencies.

In [10]:
result = ray.get(the_collector.get_result.remote())
result

[(5, 4, 'f28fe873-a59f-4c16-b356-48a747623595'),
 (4, 8, '5d6746be-80a9-4154-a742-a4ac986abc80'),
 (1, 1, 'ba5eff57-99dd-4523-b61e-5487ef8ddfcd'),
 (0, 5, '1e10c386-bc58-4faa-b4d0-15088f3cbd59'),
 (6, 2, 'df8307e1-209f-48f5-94a2-11a9ac29640d'),
 (2, 6, '1b41d957-27d9-40c8-87c1-e3805c589c2d'),
 (5, 9, '667647a9-90ae-4979-a433-d98285d1a094'),
 (0, 12, '3454fbd6-05d5-41b8-ba75-6588aa6ada30'),
 (7, 3, 'e35bc9da-b1e5-413c-a293-5daac6cda282'),
 (0, 16, '38fae090-489e-4f6a-bb77-718ce3087e21'),
 (3, 7, '42eceadb-c171-4e08-b815-9cb323322391'),
 (4, 10, '08284df5-02dd-4fec-a490-0770cd1a8280'),
 (6, 13, 'fa5116cd-d791-41ea-9800-bdf7d57a4d73'),
 (4, 20, '73372d5b-f5de-41b1-b047-624aff2129a0'),
 (1, 11, 'f236f9a4-e870-45b3-be2f-24c1a7640cd6'),
 (7, 17, '964851dd-b742-4402-aed9-d6b27c4ea0eb'),
 (2, 14, '8f8f0019-5859-4cc7-ab72-8ba44decdcdf'),
 (6, 21, 'c252881e-dedd-452b-8266-8e1ead159140'),
 (5, 15, '0321b600-d079-4b26-8529-98652a2da0d7'),
 (7, 24, '7c0b4ed0-4ddf-4074-9e1e-830c0ad033fc'),
 (0, 18, 

In [11]:
ray.shutdown()

---
# Collecting worker results in a file

In [12]:
rctx = ray.init(ignore_reinit_error=True)

2022-08-05 07:50:06,248	INFO services.py:1470 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [13]:
import os
filename = os.getcwd() + "/tmp.pickle"
f = RayFilePickler.remote(filename, 'wb+')

In [14]:
f.write.remote([1,2,3])
f.write.remote([1,2,3]);

In [15]:
f.close.remote();

On the ray dashboard, you can observe that closing the file also removes the actor.

### Check the result

In [16]:
!ls -lt *tmp*

-rw-r--r--  1 wgiersche  staff  44 Aug  5 07:50 tmp.pickle


In [17]:
from pickle import Unpickler
def read_entries(file_name):
    fp = open(file_name, 'rb')
    rows = []
    while True:
        try:
            rows.append(Unpickler(fp).load())
        except EOFError:
            return rows

In [18]:
read_entries(file_name=filename)

[[1, 2, 3], [1, 2, 3]]

### Clean up

In [19]:
!rm -f tmp.pickle

In [30]:
ray.shutdown()

---
# Efficient Parallel SelfPlay

In [31]:
ray.init(ignore_reinit_error=True);

2022-08-05 07:53:17,317	INFO services.py:1470 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [32]:
filename = os.getcwd() + "/tmp.pickle"
the_writer = RayFilePickler.remote(filename, 'wb+')

In [33]:
the_counter = SimpleCountingDispatcher.remote(2)

### The *Business Logic*

In [34]:
params = MctsParams(
    cpuct = 1.0,
    num_simulations=100,
    model_threshold=.2)

# Number of self-play workers
N_SP = 1

# Number of policy workers
N_P = 1

rbi = RandomBoardInitializer(15, 4, 5, 9, 5, 9)
game = GomokuGame(15, rbi)

In [35]:
the_dispatcher = create_pool(num_workers=N_P, policy=HeuristicRayPolicy(),
                         board_size=15, cut_off = 0.5)
selfplays = [SelfPlay.remote(mcts_params=params) for _ in range(N_SP)]
for selfplay in selfplays:
    selfplay.init.remote(15, game, PolicySpec(pool_ref=PolicyRef(the_dispatcher)))

[2m[36m(PolicyWorker pid=32915)[0m 2022-08-05 07:53:27.720722: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
[2m[36m(PolicyWorker pid=32915)[0m To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [36]:
workers = [SelfPlayDelegator.remote(1, the_writer, the_counter, selfplay) for selfplay in selfplays]

In [37]:
for worker in workers:
    worker.work.remote()

In [38]:
the_writer.close.remote();

In [39]:
!ls -lt *tmp*

-rw-r--r--  1 wgiersche  staff  10497 Aug  5 07:54 tmp.pickle


In [40]:
ray.shutdown()

In [54]:
trajectories = read_entries('the_heuristic_eight.pickle')

In [56]:
len(trajectories), len(trajectories[0])

(4, 19)