In [None]:
%load_ext autoreload
%autoreload 2

# Who am I ? 

```ipython
In [1]: whoami
Out[1]: Amine Dirhoussi, 🇲🇦 freelance Data scientist/Data engineer.
        Currently working on Document AI. 
        Loves distributed systems. 
        Amateur Grappler and Kickboxer.

In [2]: whereis(amine)
Out[2]: 🤖 github.com/aminediro
        twitch.tv/aminediro
        📖 medium.com/@aminedirhoussi1
        linkedin.com/in/ahmed-amine-dirhoussi-45213886/

```

# DaskQueue : a lightweight persistent distributed task queue based on dask



The `daskqueue` python library leverages Dask Actors to implement distributed tasks-queues. **simple load balanced** `QueuePool` and a `ConsummerPool` class to consume `Messages` from these queues.

Actors were introduced in dask *1.23.0*. Actors point to a user-defined-object living on a remote worker. 
Anyone with that actor handle can call methods on that remote object.

Why actors are the essential building blocks of **daskqueue** ? 

1. Actors are **stateful**, they can hold on to and mutate state. They are allowed to update their state in-place, which is very useful when spawning distributed queues !

2. **NO CENTRAL SCHEDULING NEEDED** : Operations on actors sidestep the central scheduler, and so do not contribute to the 4000 task/second overhead. They also avoid an extra network hop and so have lower latencies. Actors can communicate between themselves in a P2P manner, which makes it pretty neat when having a huge number of queue-to-consummers connections.


You can simply `pip install` it and start using it right away on your dask cluster. The only dependancy is `dask[distributed]`

In [1]:
%pip install daskqueue


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m23.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
from daskqueue import ConsumerPool, QueuePool, Durability
from distributed import Client, LocalCluster
from pprint import pprint

Let's define a simple task we want to distribute

In [3]:
def process_item():
    return sum(i * i for i in range(10**2))

## Connect to a dask cluster

`daskqueue` uses dask actor under the hood. Starting a dask client with `direct_to_workers` set to `True` will bypass the scheduler and provide direct communication to workers.

In [4]:
from dask.distributed import Client

client = Client("tcp://127.0.0.1:46107",direct_to_workers=True)
client

0,1
Connection method: Direct,
Dashboard: http://127.0.0.1:8787/status,

0,1
Comm: tcp://127.0.0.1:46107,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: 20 minutes ago,Total memory: 31.29 GiB

0,1
Comm: tcp://127.0.0.1:34763,Total threads: 3
Dashboard: http://127.0.0.1:39739/status,Memory: 7.82 GiB
Nanny: tcp://127.0.0.1:35003,
Local directory: /tmp/dask-worker-space/worker-hpaezvfy,Local directory: /tmp/dask-worker-space/worker-hpaezvfy
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 99.02 MiB,Spilled bytes: 0 B
Read bytes: 786.86 kiB,Write bytes: 221.67 kiB

0,1
Comm: tcp://127.0.0.1:33607,Total threads: 3
Dashboard: http://127.0.0.1:46365/status,Memory: 7.82 GiB
Nanny: tcp://127.0.0.1:40531,
Local directory: /tmp/dask-worker-space/worker-soflq2z7,Local directory: /tmp/dask-worker-space/worker-soflq2z7
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 99.18 MiB,Spilled bytes: 0 B
Read bytes: 834.70 kiB,Write bytes: 222.07 kiB

0,1
Comm: tcp://127.0.0.1:38489,Total threads: 3
Dashboard: http://127.0.0.1:32817/status,Memory: 7.82 GiB
Nanny: tcp://127.0.0.1:43445,
Local directory: /tmp/dask-worker-space/worker-_c3p2emt,Local directory: /tmp/dask-worker-space/worker-_c3p2emt
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 99.19 MiB,Spilled bytes: 0 B
Read bytes: 835.16 kiB,Write bytes: 218.77 kiB

0,1
Comm: tcp://127.0.0.1:40197,Total threads: 3
Dashboard: http://127.0.0.1:38017/status,Memory: 7.82 GiB
Nanny: tcp://127.0.0.1:36767,
Local directory: /tmp/dask-worker-space/worker-15pige2e,Local directory: /tmp/dask-worker-space/worker-15pige2e
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 98.59 MiB,Spilled bytes: 0 B
Read bytes: 787.05 kiB,Write bytes: 222.84 kiB


## Daskqueue : getting started

<center>
  <img src="https://miro.medium.com/v2/resize:fit:720/format:webp/1*ov_HTf-Bzhu6UFIo9ZO38A.jpeg" />
</center>

### Instantiate the queue pool
- The QueuePool manages n-queues actors
- The QueuePool distributes work across the n queues using a simple round-robin

In [5]:
n_queues = 2
queue_pool = QueuePool(client,n_queues=n_queues)
queue_pool

2023-04-20 17:20:37,770,770 INFO: Created 2 queues in Cluster and one QueueManager.


QueuePool : 2 queue(s)
	TransientQueue-05169e3a-fa33-4c94-b9b2-07e6f7f7efc4: 0 pending items
	TransientQueue-95729a11-f3ba-4c55-a7d8-8de60ec2f1fa: 0 pending items

### Instantiate a consumer pool

Each spawned consumer will contact the Queuepool to get an assigned queue to pull from. The queuepool assigned a queue to a consumer in round robin fashion. We can't have a `number_consumers < number_queues`

In [6]:
n_consumers=2
consumer_pool = ConsumerPool(client,queue_pool,n_consumers=n_consumers)
consumer_pool

Consumers : 2 Consumers(s)
	GeneralConsumer-0: 0 received, 0 pending tasks
	GeneralConsumer-1: 0 received, 0 pending tasks

### Submit work

We're going to submit 10 tasks to the queuepool.

In [7]:
N = 10
for _ in range(N): 
    queue_pool.submit(process_item)

In [8]:
queue_pool

QueuePool : 2 queue(s)
	TransientQueue-05169e3a-fa33-4c94-b9b2-07e6f7f7efc4: 5 pending items
	TransientQueue-95729a11-f3ba-4c55-a7d8-8de60ec2f1fa: 5 pending items

In [9]:
# Starting the worker to start polling work from an assigned queue
consumer_pool.start()

2023-04-20 17:21:12,733,733 INFO: Starting 2 consumers


In [10]:
consumer_pool.join()

2023-04-20 17:21:14,327,327 INFO: Waiting for the 2 consumers to process all items in queue_pool...
2023-04-20 17:21:14,336,336 INFO: All consumers are done ! 10 items processed. 
2023-04-20 17:21:14,337,337 INFO: Cancelling 2 consumers.
2023-04-20 17:21:14,339,339 INFO: Consumer state : None


In [11]:
result = consumer_pool.results()

We can retrieve the results by calling the`.results()` from the consumerpool. The method return a dict `mapping` each worker to a `default_dict` of `{hash(msg) : result}`

In [12]:
pprint(result)

{'GeneralConsumer-0': defaultdict(<function GeneralConsumer.__init__.<locals>.<lambda> at 0x7fd955dc93f0>,
                                  {692322777257066378: 328350,
                                   708434128201522032: 328350,
                                   1339246389826661926: 328350,
                                   1502975474082217614: 328350,
                                   1507791329801986328: 328350}),
 'GeneralConsumer-1': defaultdict(<function GeneralConsumer.__init__.<locals>.<lambda> at 0x7fd955dc9510>,
                                  {149798472518062484: 328350,
                                   822411524003274166: 328350,
                                   1826531207948055505: 328350,
                                   2079226159229595093: 328350,
                                   2105286298775364967: 328350})}


### Batch submission


Nothing new here ! Dask can already do this (very well I might add ). 

> **Let's now see why daskqueue exists ?**

For all its greatness, Dask implements a **central scheduler**  involved in every decision, which can sometimes create a **central bottleneck**. This is a pretty serious limitation when trying use Dask in **high throughput** situation. 

A simple Task Queue is usually the best approach when trying to thousands or distribute millions of tasks.



In [13]:
n_consumers = 5
batch_size = 1000
consumer_pool = ConsumerPool(client,queue_pool,n_consumers=n_consumers,batch_size=batch_size)
consumer_pool

Consumers : 
	5 Consumers(s), 
	0 received 
	0 pending 

We can start before submitting messages to the queuepool. The consumers will asynchronously fetch work from a dedicted queue.

In [14]:
n_msgs = 100_000
batch_size = 10_000

In [15]:
msgs = queue_pool.batch_submit([(process_item,) for _ in range(n_msgs)],batch_size=batch_size)

In [16]:
consumer_pool.start()

2023-04-20 17:22:48,479,479 INFO: Starting 5 consumers


In [17]:
# We can watch consumer progress
consumer_pool.join(progress=True)

2023-04-20 17:22:48,663,663 INFO: Waiting for the 5 consumers to process all items in queue_pool...
2023-04-20 17:22:50,832,832 INFO: QueuePool : 2 queue(s)
	TransientQueue-05169e3a-fa33-4c94-b9b2-07e6f7f7efc4: 48000 pending items
	TransientQueue-95729a11-f3ba-4c55-a7d8-8de60ec2f1fa: 50000 pending items
2023-04-20 17:22:50,832,832 INFO: Consumers : 
	5 Consumers(s), 
	4000 received 
	5 pending 
2023-04-20 17:22:53,663,663 INFO: QueuePool : 2 queue(s)
	TransientQueue-05169e3a-fa33-4c94-b9b2-07e6f7f7efc4: 32000 pending items
	TransientQueue-95729a11-f3ba-4c55-a7d8-8de60ec2f1fa: 39000 pending items
2023-04-20 17:22:53,663,663 INFO: Consumers : 
	5 Consumers(s), 
	27863 received 
	8 pending 
2023-04-20 17:22:55,969,969 INFO: QueuePool : 2 queue(s)
	TransientQueue-05169e3a-fa33-4c94-b9b2-07e6f7f7efc4: 18000 pending items
	TransientQueue-95729a11-f3ba-4c55-a7d8-8de60ec2f1fa: 31000 pending items
2023-04-20 17:22:55,970,970 INFO: Consumers : 
	5 Consumers(s), 
	49110 received 
	10 pending 
202

In [18]:
consumer_pool

Consumers : 
	5 Consumers(s), 
	100000 received 
	0 pending 

## Durable Queues vs Transient


Another major feature that daskqueue provides is the **queues durability**. In environments and use cases where durability is important, applications must use durable queues and make sure that published messages are persisted.

- Transient Queue store messages in memory
- Durable queue use on-disk data structure to save the messages
- Durable queues point to a single directory per queue
- Durable queues will be recovered on cluster reboot


I wrote an in-depth blog post where I discuss the various implementation details : https://medium.com/@aminedirhoussi1/daskqueue-dask-based-distributed-task-queue-6fb95517dfea

<center>
  <img src="https://miro.medium.com/v2/resize:fit:720/format:webp/1*mQgpVYJ5bbM2srxLkJ0RNg.jpeg" />
</center>


In [19]:
!rm -rf queue_dir

_  = client.restart()

In [20]:
n_queues = 2
n_consumers = 4
n_msgs = 100_000
batch_size =10_000

In [21]:
queue_pool = QueuePool(client, n_queues, durability="durable",dirpath='notebooks/queue_dir')
queue_pool

2023-04-20 17:23:47,347,347 INFO: Created 2 queues in Cluster and one QueueManager.


QueuePool : 2 queue(s)
	DurableQueue-3d59b47d-e2ee-4c30-b470-d3fde2a00d18: 0 pending items
	DurableQueue-84f0f3ec-f15e-4b17-b166-9f362b115ea8: 0 pending items

What just happened? We can take a look at the created directory. `daskqueue` created a dedicated directory  per queue each with a `LogSegment` file and an `IndexSegment`. 

```bash
queue_dir
├── default-queue-0
│   ├── 00000000000000000000.log
│   └── default-queue-0.index
└── default-queue-1
    ├── 00000000000000000000.log
    └── default-queue-1.index
```
The `IndexSegment` file is [**bitcask**](https://docs.riak.com/riak/kv/2.2.3/setup/planning/backend/bitcask/index.html) data structure that is read to memory on startup. The bitcask  provides : 
- Low latency per item read or written
- Predictable lookup and insert performance
- Fast, bounded crash recovery and easy backup

The `LogSegment` file stores the actual message and is `mmaped` into memory.

We can now submit work to the `queue_pool` as before, but now the tasks are persisted to disk.

In [22]:
tasks = [(process_item,) for _ in range(n_msgs)]

In [23]:
msgs = queue_pool.batch_submit(tasks,batch_size=10_000)
queue_pool

QueuePool : 2 queue(s)
	DurableQueue-3d59b47d-e2ee-4c30-b470-d3fde2a00d18: 50000 pending items
	DurableQueue-84f0f3ec-f15e-4b17-b166-9f362b115ea8: 50000 pending items

In [24]:
!pdu --max-depth 3 queue_dir/

 52.4M     ┌──default-queue-0.index   │██████░░░░░░░░░░░░                  │ 17%
104.9M     ├──00000000000000000000.log│████████████░░░░░░                  │ 33%
157.3M   ┌─┴default-queue-0           │██████████████████                  │ 50%
 52.4M   │ ┌──default-queue-1.index   │██████░░░░░░░░░░░░                  │ 17%
104.9M   │ ├──00000000000000000000.log│████████████░░░░░░                  │ 33%
157.3M   ├─┴default-queue-1           │██████████████████                  │ 50%
314.6M ┌─┴queue_dir/                  │████████████████████████████████████│100%


----
Let's simulate a sudden cluster shutdown. We will try to access the `queue_pool` handle after the cluster restart just to be sure that we lost the actor.

In [25]:
client.restart()

try : 
    print(queue_pool)
except Exception as e: 
    print(e)

Worker holding Actor was lost.  Status: cancelled


Now, let's respawn the **QueuePool** with the same config and see the queue content. 

In [26]:
queue_pool = QueuePool(client, n_queues, durability="durable",dirpath='notebooks/queue_dir')
queue_pool

2023-04-20 17:25:11,926,926 INFO: Created 2 queues in Cluster and one QueueManager.


QueuePool : 2 queue(s)
	DurableQueue-c69ce240-fe25-4fea-87a3-d47a0c516127: 50000 pending items
	DurableQueue-f74d83a3-277b-4b9c-8ab4-9b6f151b4071: 50000 pending items

> **We see that we didn't lose any items !** 😍 

In [27]:
consumer_pool = ConsumerPool(client,queue_pool,n_consumers=n_consumers,batch_size=1000)
consumer_pool

Consumers : 4 Consumers(s)
	GeneralConsumer-0: 0 received, 0 pending tasks
	GeneralConsumer-1: 0 received, 0 pending tasks
	GeneralConsumer-2: 0 received, 0 pending tasks
	GeneralConsumer-3: 0 received, 0 pending tasks

In [28]:
consumer_pool.start()
consumer_pool.join(progress=True)

2023-04-20 17:25:16,615,615 INFO: Starting 4 consumers
2023-04-20 17:25:16,616,616 INFO: Waiting for the 4 consumers to process all items in queue_pool...
2023-04-20 17:25:18,950,950 INFO: QueuePool : 2 queue(s)
	DurableQueue-c69ce240-fe25-4fea-87a3-d47a0c516127: 43000 pending items
	DurableQueue-f74d83a3-277b-4b9c-8ab4-9b6f151b4071: 43000 pending items
2023-04-20 17:25:18,951,951 INFO: Consumers : 4 Consumers(s)
	GeneralConsumer-0: 4580 received, 2 pending tasks
	GeneralConsumer-1: 5000 received, 1 pending tasks
	GeneralConsumer-2: 3894 received, 2 pending tasks
	GeneralConsumer-3: 2405 received, 2 pending tasks
2023-04-20 17:25:21,467,467 INFO: QueuePool : 2 queue(s)
	DurableQueue-c69ce240-fe25-4fea-87a3-d47a0c516127: 33000 pending items
	DurableQueue-f74d83a3-277b-4b9c-8ab4-9b6f151b4071: 33000 pending items
2023-04-20 17:25:21,468,468 INFO: Consumers : 4 Consumers(s)
	GeneralConsumer-0: 9062 received, 2 pending tasks
	GeneralConsumer-1: 12041 received, 2 pending tasks
	GeneralConsum

In [29]:
consumer_pool

Consumers : 4 Consumers(s)
	GeneralConsumer-0: 25000 received, 0 pending tasks
	GeneralConsumer-1: 35000 received, 0 pending tasks
	GeneralConsumer-2: 25000 received, 0 pending tasks
	GeneralConsumer-3: 15000 received, 0 pending tasks

In [None]:
assert sum([len(r) for _ , r in consumer_pool.results().items()]) == n_msgs