# mini cluster of ESP32 - test
![Broccoli](https://raw.githubusercontent.com/Wei1234c/Broccoli/master/jpgs/Broccoli_cluster_cover.gif)

ref: [Celery Canvas](http://docs.celeryproject.org/en/latest/userguide/canvas.html)

### Imports

In [1]:
import os
import sys
import time

sys.path.append(os.path.abspath(os.path.join('..', '..', 'codes', 'broccoli', 'client')))
sys.path.append(os.path.abspath(os.path.join('..', '..', 'codes', 'broccoli', 'node')))
sys.path.append(os.path.abspath(os.path.join('..', '..', '..', 'external', 'mqtt_network')))

import client
from collections import OrderedDict

from canvas import *
import tasks

My name is Client_366


### Start client
啟動 client 物件。  

在本機上有一個 Broker 物件，負責：
- 管理本機上的 task queue
- 接受使用者發出的運算要求，並將之排入本機上的 task queue
- 通知遠端的 workers 協助處理 tasks
- 將工作發送給 workers 做處理
- 收集 workers 傳回的運算結果
- 將運算結果整合之後，傳回給使用者

而我們透過 client 物件來與 Broker 物件溝通

In [2]:
the_client = client.Client()
the_client.start()

while not the_client.status['Is connected']:            
    time.sleep(1)
    print('Node not ready yet.')


Sending 281 bytes
Message:
OrderedDict([('command', 'set connection name'), ('correlation_id', '2018-04-05 17:48:10.213166 1'), ('kwargs', {'name': 'Client_366'}), ('message_id', '2018-04-05 17:48:10.213166 1'), ('message_type', 'command'), ('need_result', True), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


[Connected: ('123.240.210.68', 1883)]
[Listen to messages]
Node not ready yet.


### Reset workers
如果需要確保 workers 的狀態都一致，或者需要重新 depoly Python module files 到 workers 上面去，可以發送指令給遠端的 workers，要求做 reboot 回到最初的狀態。

In [3]:
the_client.reset_workers()
time.sleep(15)  # wait until workers ready.


Sending 236 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:10.964663 10'), ('message_id', '2018-04-05 17:48:10.964663 10'), ('message_type', 'exec'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366'), ('to_exec', 'import machine;machine.reset()')])


Data received: 236 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:10.964663 10'), ('message_id', '2018-04-05 17:48:10.964663 10'), ('message_type', 'exec'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366'), ('to_exec', 'import machine;machine.reset()')])

No module named 'machine'

Data received: 263 bytes
Message:
OrderedDict([('command', 'set connection name'), ('correlation_id', '6515'), ('kwargs', {'name': 'NodeMCU_b4e62d890c49'}), ('message_id', '6515'), ('message_type', 'command'), ('need_result', True), ('receiver', 'Hub'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Data received: 263 bytes
Message:
OrderedDict([('

### Upload tasks module file
Reboot 之後的 workers 只有最基本的功能，如果我們需要 workers 執行額外的功能 (functions)，則需要先將定義這些 functions 的 module 檔案上傳給 workers 並且要求它們 import。  

例如： 我們上傳給每一個 worker 一個`tasks.py`的檔案，其中定義幾個 functions:
```
from canvas import Task

@Task
def add(x, y, op=None):
    return op(x, y) if op else x + y

@Task
def xsum(x):
    return sum(x)

@Task
def mul(x, y, op=None):
    return op(x, y) if op else x * y

@Task
def mapper(word):
    return (word, 1) if len(word) > 3 else None

```

Workers 收到這個`tasks.py`檔案之後，會做`import tasks`的動作，所以就可以呼叫 tasks.add() 這個 function。

In [4]:
tasks_file = os.path.join('..', '..', 'codes', 'broccoli', 'client', 'tasks.py')
the_client.sync_file(tasks_file, load_as_tasks = True)


Sending 550 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:26.045000 25'), ('kwargs', {'filename': 'tasks.py', 'file': 'from canvas import Task\n\n@Task\ndef add(x, y, op=None):\n    return op(x, y) if op else x + y\n\n@Task\ndef xsum(x):\n    return sum(x)\n\n@Task\ndef mul(x, y, op=None):\n    return op(x, y) if op else x * y\n\n@Task\ndef mapper(word):\n    return (word, 1) if len(word) > 3 else None\n', 'load_as_tasks': True}), ('message_id', '2018-04-05 17:48:26.045000 25'), ('message_type', 'file'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])



<font color='blue'>
## DEMOs

### Chains
Celery [Chains](http://docs.celeryproject.org/en/latest/userguide/canvas.html#chains) 的主要作用是把多個運算**串聯**起來，前一個運算的結果是下一個運算的參數，這樣就可以組成一個完整的運算過程，例如下例中用`chain`組成一個 ((4+4) * 8) * 10  = 640 的計算過程
```
>>> from celery import chain
>>> from proj.tasks import add, mul

>>> # (4 + 4) * 8 * 10
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))
proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10)

>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))()
>>> res.get()
640
```

我們可以在 ESP32 cluster 上面也做同樣的事情：

In [5]:
ch = chain(tasks.add.s(4, 4), tasks.mul.s(8), tasks.mul.s(10))
ch.get()


Sending 257 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:26.144669 29'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:48:26.144669 29'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 550 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:26.045000 25'), ('kwargs', {'filename': 'tasks.py', 'file': 'from canvas import Task\n\n@Task\ndef add(x, y, op=None):\n    return op(x, y) if op else x + y\n\n@Task\ndef xsum(x):\n    return sum(x)\n\n@Task\ndef mul(x, y, op=None):\n    return op(x, y) if op else x * y\n\n@Task\ndef mapper(word):\n    return (word, 1) if len(word) > 3 else None\n', 'load_as_tasks': True}), ('message_id', '2018-04-05 17:48:26.045000 25'), ('message_type', 'file'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 257 bytes
Message:
OrderedDict([('correlation_id', '20


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '15450'), ('function', 'dequeue_task'), ('message_id', '15450'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Processed result:
OrderedDict([('correlation_id', '15450'), ('message_id', '2018-04-05 17:48:28.673897 141'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371'), ('reply_to', 'Client_366'), ('result', (None, 0)), ('sender', 'Client_366')])


Sending 208 bytes
Message:
OrderedDict([('correlation_id', '15450'), ('message_id', '2018-04-05 17:48:28.673897 141'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371'), ('reply_to', 'Client_366'), ('result', (None, 0)), ('sender', 'Client_366')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '15459'), ('function', 'dequeue_task'), ('message_id', '15459'), ('message_type', 'function'), ('need_result', True), ('rec

640

### Groups
Celery [Groups](http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups) 的主要作用是把多個運算**併聯**起來，把很多同質性的運算同時發送給許多遠端的 workers 協助處理，再收集 workers 傳回來的結果彙整成為一個結果集，例如下例中用`group`同時計算 (2+2) 和 (4+4)，結果是 [4, 8]
```
>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(2, 2), add.s(4, 4))
(proj.tasks.add(2, 2), proj.tasks.add(4, 4))
If you call the group, the tasks will be applied one after another in the current process, and a GroupResult instance is returned that can be used to keep track of the results, or tell how many tasks are ready and so on:

>>> g = group(add.s(2, 2), add.s(4, 4))
>>> res = g()
>>> res.get()
[4, 8]
```
我們可以在 ESP32 cluster 上面也做同樣的事情：

In [6]:
gp = group([tasks.add.s(2, 2), tasks.add.s(4, 4)])
gp.get()


Sending 259 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:30.147152 216'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:48:30.147152 216'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 259 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:30.147152 216'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:48:30.147152 216'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '17744'), ('function', 'dequeue_task'), ('message_id', '17744'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Processed result:
OrderedDict([('correlation_id', '17744'), ('messa

[4, 8]

我們可以用 iterators:
```
>>> group(add.s(i, i) for i in xrange(10))()
```

In [7]:
gp = group([tasks.add.s(i, i) for i in range(10)])
gp.get()


Sending 259 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:31.363503 273'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:48:31.363503 273'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 259 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:31.363503 273'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:48:31.363503 273'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '19033'), ('function', 'dequeue_task'), ('message_id', '19033'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Processed result:
OrderedDict([('correlation_id', '19033'), ('messa


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '19701'), ('function', 'dequeue_task'), ('message_id', '19701'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Processed result:
OrderedDict([('correlation_id', '19701'), ('message_id', '2018-04-05 17:48:32.730661 339'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:31.408888 279', 'function': 'tasks.add', 'args': (5, 5), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:31.408888 279', 'task_id': '2018-04-05 17:48:31.408888 279'}, 4)), ('sender', 'Client_366')])


Sending 489 bytes
Message:
OrderedDict([('correlation_id', '19701'), ('message_id', '2018-04-05 17:48:32.730661 339'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e6


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '20634'), ('function', 'dequeue_task'), ('message_id', '20634'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Processed result:
OrderedDict([('correlation_id', '20634'), ('message_id', '2018-04-05 17:48:33.895365 390'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890499'), ('reply_to', 'Client_366'), ('result', (None, 0)), ('sender', 'Client_366')])


Sending 208 bytes
Message:
OrderedDict([('correlation_id', '20634'), ('message_id', '2018-04-05 17:48:33.895365 390'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890499'), ('reply_to', 'Client_366'), ('result', (None, 0)), ('sender', 'Client_366')])


Data received: 526 bytes
Message:
OrderedDict([('correlation_id', '20826'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 14, 'function': 'tasks.add', 'args': [7, 7], 'r

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

### Chords
Celery [Chords](http://docs.celeryproject.org/en/latest/userguide/canvas.html#chords) 的主要作用是由兩段運算所組成的，第一段是一個`Groups`運算，其運算的結果會傳給第二段中的運算，作為其運算所需的參數。  

其作用可以用以下的例子來說明，`header`運算的結果會傳給`callback`做進一步的處理：
```
>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(10)]
>>> result = chord(header)(callback)
>>> result.get()
9900
```
我們可以在 ESP32 cluster 上面也做同樣的事情：

In [8]:
callback = tasks.xsum.s()
header = [tasks.add.s(i, i) for i in range(10)]
async_result = chord(header)(callback)
async_result.get()


Sending 259 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:34.720288 429'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:48:34.720288 429'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '22106'), ('function', 'dequeue_task'), ('message_id', '22106'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '22106'), ('message_id', '2018-04-05 17:48:34.867152 443'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890c49'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:34.720237 428', 'function': 'tasks.add', 'args': (0, 0), 'need_result': True, 'reply_to': '


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '23417'), ('function', 'dequeue_task'), ('message_id', '23417'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '23417'), ('message_id', '2018-04-05 17:48:36.341813 505'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890c49'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:34.776143 436', 'function': 'tasks.add', 'args': (5, 5), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:34.776143 436', 'task_id': '2018-04-05 17:48:34.776143 436'}, 4)), ('sender', 'Client_366')])


Sending 489 bytes
Message:
OrderedDict([('correlation_id', '23417'), ('message_id', '2018-04-05 17:48:36.341813 505'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e6

Message:
OrderedDict([('correlation_id', '24326'), ('function', 'dequeue_task'), ('message_id', '24326'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '24326'), ('message_id', '2018-04-05 17:48:37.482078 553'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890c49'), ('reply_to', 'Client_366'), ('result', (None, 0)), ('sender', 'Client_366')])


Sending 208 bytes
Message:
OrderedDict([('correlation_id', '24326'), ('message_id', '2018-04-05 17:48:37.482078 553'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890c49'), ('reply_to', 'Client_366'), ('result', (None, 0)), ('sender', 'Client_366')])


Data received: 526 bytes
Message:
OrderedDict([('correlation_id', '24561'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 14, 'function': 'tasks.add', 'args': [7, 7], 'reply_to': 'Client_366', 'm

Message:
OrderedDict([('correlation_id', '28400'), ('function', 'dequeue_task'), ('message_id', '28400'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Processed result:
OrderedDict([('correlation_id', '28400'), ('message_id', '2018-04-05 17:48:41.523419 760'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'), ('reply_to', 'Client_366'), ('result', (None, 0)), ('sender', 'Client_366')])


Sending 208 bytes
Message:
OrderedDict([('correlation_id', '28400'), ('message_id', '2018-04-05 17:48:41.523419 760'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'), ('reply_to', 'Client_366'), ('result', (None, 0)), ('sender', 'Client_366')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '28406'), ('function', 'dequeue_task'), ('message_id', '28406'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('r

上述的運算可以直接寫成：
```
chord(add.s(i, i) for i in xrange(10))(tsum.s()).get()
```
我們可以在 ESP32 cluster 上面也做同樣的事情：

In [9]:
async_result = chord([tasks.add.s(i, i) for i in range(10)])(tasks.xsum.s())
async_result.get()


Sending 259 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:43.720981 882'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:48:43.720981 882'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 259 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:43.720981 882'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:48:43.720981 882'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '31484'), ('function', 'dequeue_task'), ('message_id', '31484'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '31484'), ('messa


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '32492'), ('function', 'dequeue_task'), ('message_id', '32492'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Processed result:
OrderedDict([('correlation_id', '32492'), ('message_id', '2018-04-05 17:48:45.913267 992'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:43.786616 889', 'function': 'tasks.add', 'args': (6, 6), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:43.786616 889', 'task_id': '2018-04-05 17:48:43.786616 889'}, 3)), ('sender', 'Client_366')])


Sending 489 bytes
Message:
OrderedDict([('correlation_id', '32492'), ('message_id', '2018-04-05 17:48:45.913267 992'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e6

Message:
OrderedDict([('correlation_id', '34244'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 16, 'function': 'tasks.add', 'args': [8, 8], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:43.786651 891', 'message_type': 'result', 'task_id': '2018-04-05 17:48:43.786651 891', 'correlation_id': '2018-04-05 17:48:43.786651 891', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '34244'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '34349'), ('function', 'dequeue_task'), ('message_id', '34349'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '34349'), ('message_id', '2018-04-05 17:48:47.496301 1065'), ('message_type', 'result'), ('rec

90

### Map & Starmap
Celery [Map & Starmap](http://docs.celeryproject.org/en/latest/userguide/canvas.html#map-starmap) 的主要作用和 Python 中的`map`指令一樣，會對一個 list 中的每個 element 做指定的運算，例如下例會分別對`range(10)`,`range(100)`做`sum`運算：
```
>>> ~xsum.map([range(10), range(100)])
[45, 4950]
```
我們可以在 ESP32 cluster 上面也做同樣的事情，但是須先使用`list()`對`range`物件做展開：

In [10]:
gp = tasks.xsum.map([list(range(10)), list(range(100))])
gp.get()


Sending 261 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:49.449756 1163'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:48:49.449756 1163'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '35585'), ('function', 'dequeue_task'), ('message_id', '35585'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Processed result:
OrderedDict([('correlation_id', '35585'), ('message_id', '2018-04-05 17:48:49.759007 1173'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890499'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:49.449700 1162', 'function': 'tasks.xsum', 'args': ([0, 1, 2, 3, 4, 5, 6, 7, 8, 9],), 'n

[45, 4950]


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '38197'), ('function', 'dequeue_task'), ('message_id', '38197'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])



`starmap`的作用和`map`指令一樣，會對一個 list 中的每個 element 做指定的運算，只是會先做 star展開，將一個`list`展開成為 positional arguments：
```
>>> ~add.starmap(zip(range(10), range(10)))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
```
我們可以在 ESP32 cluster 上面也做同樣的事情，但是須先使用`list()`對`zip`物件做展開：

In [11]:
gp = tasks.add.starmap(list(zip(range(10), range(10))))
gp.get()


Processed result:
OrderedDict([('correlation_id', '38197'), ('message_id', '2018-04-05 17:48:51.273174 1234'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890499'), ('reply_to', 'Client_366'), ('result', (None, 0)), ('sender', 'Client_366')])


Sending 209 bytes
Message:
OrderedDict([('correlation_id', '38197'), ('message_id', '2018-04-05 17:48:51.273174 1234'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890499'), ('reply_to', 'Client_366'), ('result', (None, 0)), ('sender', 'Client_366')])


Sending 261 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:51.314332 1237'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:48:51.314332 1237'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 261 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:51.314332 1237'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Cli


Data received: 528 bytes
Message:
OrderedDict([('correlation_id', '41171'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 6, 'function': 'tasks.add', 'args': [3, 3], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:51.415987 1241', 'message_type': 'result', 'task_id': '2018-04-05 17:48:51.415987 1241', 'correlation_id': '2018-04-05 17:48:51.415987 1241', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '41171'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '41274'), ('function', 'dequeue_task'), ('message_id', '41274'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Processed result:
OrderedDict([('correlation_id', '41274'), ('message_id', '2018-04-05 17:48:54.224012 1339'), ('mes


Data received: 529 bytes
Message:
OrderedDict([('correlation_id', '42631'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 12, 'function': 'tasks.add', 'args': [6, 6], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:51.416048 1244', 'message_type': 'result', 'task_id': '2018-04-05 17:48:51.416048 1244', 'correlation_id': '2018-04-05 17:48:51.416048 1244', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '42631'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '42736'), ('function', 'dequeue_task'), ('message_id', '42736'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Processed result:
OrderedDict([('correlation_id', '42736'), ('message_id', '2018-04-05 17:48:56.585402 1410'), ('me

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

### Chunks
Celery [Chunks](http://docs.celeryproject.org/en/latest/userguide/canvas.html#chunks) 的主要作用是把一大串的資料切成指定的份數，分發給遠端的 workers 協處處理，例如：
```
>>> res = add.chunks(zip(range(100), range(100)), 10)()
>>> res.get()
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
 [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
 [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
 [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
 [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
 [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
 [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
 [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
 [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
 [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]
```
我們可以在 ESP32 cluster 上面也做同樣的事情，但是須先使用`list()`對`zip`物件做展開：

In [12]:
ck = tasks.add.chunks(list(zip(range(100), range(100))), 10)
async_result = ck()
async_result.get()


Sending 261 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:57.559084 1450'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:48:57.559084 1450'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 261 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:48:57.559084 1450'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:48:57.559084 1450'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '45274'), ('function', 'dequeue_task'), ('message_id', '45274'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '45274'), ('m


Data received: 528 bytes
Message:
OrderedDict([('correlation_id', '45949'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 0, 'function': 'tasks.add', 'args': [0, 0], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.559018 1449', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.559018 1449', 'correlation_id': '2018-04-05 17:48:57.559018 1449', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '45949'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '46053'), ('function', 'dequeue_task'), ('message_id', '46053'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '46053'), ('message_id', '2018-04-05 17:48:59.559769 1625'), ('mes


Sending 494 bytes
Message:
OrderedDict([('correlation_id', '47848'), ('message_id', '2018-04-05 17:49:00.723347 1666'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:57.629095 1460', 'function': 'tasks.add', 'args': (9, 9), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:57.629095 1460', 'task_id': '2018-04-05 17:48:57.629095 1460'}, 90)), ('sender', 'Client_366')])


Data received: 529 bytes
Message:
OrderedDict([('correlation_id', '48265'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 16, 'function': 'tasks.add', 'args': [8, 8], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.629076 1459', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.629076 1459', 'correlation_id': '2018-04-05 17:48:57.629076 1459', 'need_result': True, 'sender': 'Client_366'}}), ('message_id'


Data received: 531 bytes
Message:
OrderedDict([('correlation_id', '49409'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 22, 'function': 'tasks.add', 'args': [11, 11], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.629135 1462', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.629135 1462', 'correlation_id': '2018-04-05 17:48:57.629135 1462', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '49409'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '49514'), ('function', 'dequeue_task'), ('message_id', '49514'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Processed result:
OrderedDict([('correlation_id', '49514'), ('message_id', '2018-04-05 17:49:02.502887 1743'), ('

Message:
OrderedDict([('correlation_id', '50811'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 32, 'function': 'tasks.add', 'args': [16, 16], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.629215 1467', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.629215 1467', 'correlation_id': '2018-04-05 17:48:57.629215 1467', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '50811'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Data received: 531 bytes
Message:
OrderedDict([('correlation_id', '51118'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 34, 'function': 'tasks.add', 'args': [17, 17], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.629240 1468', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.629240 1468', 'correlation_id': '2018-04-05 17:48:57.629240 1468', 'need_result': True, 'sender': 'Client_

Message:
OrderedDict([('correlation_id', '51992'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 36, 'function': 'tasks.add', 'args': [18, 18], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.629295 1469', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.629295 1469', 'correlation_id': '2018-04-05 17:48:57.629295 1469', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '51992'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '52098'), ('function', 'dequeue_task'), ('message_id', '52098'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '52098'), ('message_id', '2018-04-05 17:49:05.447579 1856'), ('message_type', 'result'), 


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '53411'), ('function', 'dequeue_task'), ('message_id', '53411'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '53411'), ('message_id', '2018-04-05 17:49:06.466819 1897'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890c49'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:57.629668 1480', 'function': 'tasks.add', 'args': (29, 29), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:57.629668 1480', 'task_id': '2018-04-05 17:48:57.629668 1480'}, 70)), ('sender', 'Client_366')])


Sending 496 bytes
Message:
OrderedDict([('correlation_id', '53411'), ('message_id', '2018-04-05 17:49:06.466819 1897'), ('message_type', 'result'), ('receiver', 'Node


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '54719'), ('function', 'dequeue_task'), ('message_id', '54719'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Processed result:
OrderedDict([('correlation_id', '54719'), ('message_id', '2018-04-05 17:49:08.065560 1950'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890499'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:57.629769 1485', 'function': 'tasks.add', 'args': (34, 34), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:57.629769 1485', 'task_id': '2018-04-05 17:48:57.629769 1485'}, 65)), ('sender', 'Client_366')])


Sending 496 bytes
Message:
OrderedDict([('correlation_id', '54719'), ('message_id', '2018-04-05 17:49:08.065560 1950'), ('message_type', 'result'), ('receiver', 'Node


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '56291'), ('function', 'dequeue_task'), ('message_id', '56291'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Processed result:
OrderedDict([('correlation_id', '56291'), ('message_id', '2018-04-05 17:49:09.620722 1999'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:57.629862 1490', 'function': 'tasks.add', 'args': (39, 39), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:57.629862 1490', 'task_id': '2018-04-05 17:48:57.629862 1490'}, 60)), ('sender', 'Client_366')])


Sending 496 bytes
Message:
OrderedDict([('correlation_id', '56291'), ('message_id', '2018-04-05 17:49:09.620722 1999'), ('message_type', 'result'), ('receiver', 'Node


Processed result:
OrderedDict([('correlation_id', '57586'), ('message_id', '2018-04-05 17:49:11.244763 2053'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:57.629939 1494', 'function': 'tasks.add', 'args': (43, 43), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:57.629939 1494', 'task_id': '2018-04-05 17:48:57.629939 1494'}, 56)), ('sender', 'Client_366')])


Sending 496 bytes
Message:
OrderedDict([('correlation_id', '57586'), ('message_id', '2018-04-05 17:49:11.244763 2053'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:57.629939 1494', 'function': 'tasks.add', 'args': (43, 43), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:5

Message:
OrderedDict([('correlation_id', '59312'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 88, 'function': 'tasks.add', 'args': [44, 44], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.629959 1495', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.629959 1495', 'correlation_id': '2018-04-05 17:48:57.629959 1495', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '59312'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '59417'), ('function', 'dequeue_task'), ('message_id', '59417'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Processed result:
OrderedDict([('correlation_id', '59417'), ('message_id', '2018-04-05 17:49:12.765521 2106'), ('message_type', 'result'), 

Message:
OrderedDict([('correlation_id', '60716'), ('function', 'dequeue_task'), ('message_id', '60716'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Processed result:
OrderedDict([('correlation_id', '60716'), ('message_id', '2018-04-05 17:49:13.840719 2149'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:57.630097 1503', 'function': 'tasks.add', 'args': (52, 52), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:57.630097 1503', 'task_id': '2018-04-05 17:48:57.630097 1503'}, 47)), ('sender', 'Client_366')])


Sending 496 bytes
Message:
OrderedDict([('correlation_id', '60716'), ('message_id', '2018-04-05 17:49:13.840719 2149'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'), ('repl


Data received: 532 bytes
Message:
OrderedDict([('correlation_id', '61932'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 106, 'function': 'tasks.add', 'args': [53, 53], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.630112 1504', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.630112 1504', 'correlation_id': '2018-04-05 17:48:57.630112 1504', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '61932'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '62037'), ('function', 'dequeue_task'), ('message_id', '62037'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Processed result:
OrderedDict([('correlation_id', '62037'), ('message_id', '2018-04-05 17:49:15.097305 2202'), (

Message:
OrderedDict([('correlation_id', '62777'), ('function', 'dequeue_task'), ('message_id', '62777'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Processed result:
OrderedDict([('correlation_id', '62777'), ('message_id', '2018-04-05 17:49:16.256524 2244'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:57.630264 1512', 'function': 'tasks.add', 'args': (61, 61), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:57.630264 1512', 'task_id': '2018-04-05 17:48:57.630264 1512'}, 38)), ('sender', 'Client_366')])


Sending 496 bytes
Message:
OrderedDict([('correlation_id', '62777'), ('message_id', '2018-04-05 17:49:16.256524 2244'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'), ('repl


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '64571'), ('function', 'dequeue_task'), ('message_id', '64571'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '64571'), ('message_id', '2018-04-05 17:49:17.659626 2295'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890c49'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:57.630344 1517', 'function': 'tasks.add', 'args': (66, 66), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:57.630344 1517', 'task_id': '2018-04-05 17:48:57.630344 1517'}, 33)), ('sender', 'Client_366')])


Sending 496 bytes
Message:
OrderedDict([('correlation_id', '64571'), ('message_id', '2018-04-05 17:49:17.659626 2295'), ('message_type', 'result'), ('receiver', 'Node


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '66021'), ('function', 'dequeue_task'), ('message_id', '66021'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Processed result:
OrderedDict([('correlation_id', '66021'), ('message_id', '2018-04-05 17:49:19.175594 2347'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:57.630431 1522', 'function': 'tasks.add', 'args': (71, 71), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:57.630431 1522', 'task_id': '2018-04-05 17:48:57.630431 1522'}, 28)), ('sender', 'Client_366')])


Sending 496 bytes
Message:
OrderedDict([('correlation_id', '66021'), ('message_id', '2018-04-05 17:49:19.175594 2347'), ('message_type', 'result'), ('receiver', 'Node

Message:
OrderedDict([('correlation_id', '67643'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 144, 'function': 'tasks.add', 'args': [72, 72], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.630451 1523', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.630451 1523', 'correlation_id': '2018-04-05 17:48:57.630451 1523', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '67643'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '67748'), ('function', 'dequeue_task'), ('message_id', '67748'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Processed result:
OrderedDict([('correlation_id', '67748'), ('message_id', '2018-04-05 17:49:21.195062 2435'), ('message_type', 'result'),


Data received: 532 bytes
Message:
OrderedDict([('correlation_id', '69601'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 154, 'function': 'tasks.add', 'args': [77, 77], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.630537 1528', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.630537 1528', 'correlation_id': '2018-04-05 17:48:57.630537 1528', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '69601'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '69706'), ('function', 'dequeue_task'), ('message_id', '69706'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Processed result:
OrderedDict([('correlation_id', '69706'), ('message_id', '2018-04-05 17:49:22.805299 2502'), (


Data received: 532 bytes
Message:
OrderedDict([('correlation_id', '71109'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 164, 'function': 'tasks.add', 'args': [82, 82], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.630621 1533', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.630621 1533', 'correlation_id': '2018-04-05 17:48:57.630621 1533', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '71109'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '71214'), ('function', 'dequeue_task'), ('message_id', '71214'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '71214'), ('message_id', '2018-04-05 17:49:24.487515 2563'), (


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '72713'), ('function', 'dequeue_task'), ('message_id', '72713'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Processed result:
OrderedDict([('correlation_id', '72713'), ('message_id', '2018-04-05 17:49:26.221595 2625'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:48:57.630758 1541', 'function': 'tasks.add', 'args': (90, 90), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:48:57.630758 1541', 'task_id': '2018-04-05 17:48:57.630758 1541'}, 9)), ('sender', 'Client_366')])


Sending 495 bytes
Message:
OrderedDict([('correlation_id', '72713'), ('message_id', '2018-04-05 17:49:26.221595 2625'), ('message_type', 'result'), ('receiver', 'NodeM


Data received: 532 bytes
Message:
OrderedDict([('correlation_id', '74080'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 182, 'function': 'tasks.add', 'args': [91, 91], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.630777 1542', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.630777 1542', 'correlation_id': '2018-04-05 17:48:57.630777 1542', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '74080'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '74185'), ('function', 'dequeue_task'), ('message_id', '74185'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Processed result:
OrderedDict([('correlation_id', '74185'), ('message_id', '2018-04-05 17:49:27.706257 2690'), (


Processed result:
OrderedDict([('correlation_id', '76009'), ('message_id', '2018-04-05 17:49:28.911442 2744'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'), ('reply_to', 'Client_366'), ('result', (None, 0)), ('sender', 'Client_366')])


Sending 209 bytes
Message:
OrderedDict([('correlation_id', '76009'), ('message_id', '2018-04-05 17:49:28.911442 2744'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'), ('reply_to', 'Client_366'), ('result', (None, 0)), ('sender', 'Client_366')])


Data received: 532 bytes
Message:
OrderedDict([('correlation_id', '76171'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 194, 'function': 'tasks.add', 'args': [97, 97], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.630883 1548', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.630883 1548', 'correlation_id': '2018-04-05 17:48:57.630883 1548', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '76171'), ('message_typ

[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
 [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
 [40, None, 44, None, 48, 50, 52, 54, 56, 58],
 [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
 [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
 [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
 [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
 [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
 [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
 [None, 182, 184, 186, 188, 190, 192, 194, 196, 198]]

### Word Count
最後我們以 Hadoop 領域中的 "Hello World" 範例 "Word Count" 來測試。  

我們會把一個文字檔的內容拆解成 words 並將每個 word 發送給 workers 處理，workers 要做的主要是一個`mapper`處理：
```
def mapper(word):
    return (word, 1) if len(word) > 3 else None
```
worker 會將處理的結果傳回來，client 這邊會有一個`reduce`function 將結果彙整。  

我們可以在 ESP32 cluster 上面這樣做：

In [13]:
import word_count

text_file = os.path.join('..', '..', 'codes', 'broccoli', 'client', 'test.txt')
words_count, counts = word_count.count_words(text_file)
print('********** result:\nwords count: {}\n\n{}\n**********'.format(words_count, counts[:10]))


Sending 261 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:49:30.141453 2806'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:49:30.141453 2806'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 261 bytes
Message:
OrderedDict([('correlation_id', '2018-04-05 17:49:30.141453 2806'), ('function', 'fetch_task'), ('kwargs', {'broker': 'Client_366'}), ('message_id', '2018-04-05 17:49:30.141453 2806'), ('message_type', 'function'), ('receiver', 'Hub'), ('reply_to', 'Client_366'), ('sender', 'Client_366')])


Data received: 531 bytes
Message:
OrderedDict([('correlation_id', '77132'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': 42, 'function': 'tasks.add', 'args': [21, 21], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:48:57.629351 1472', 'message_type': 'result', 'task_id': '2018-04-05 17:48:57.629351 1472', 'correlation_id'


Processed result:
OrderedDict([('correlation_id', '78644'), ('message_id', '2018-04-05 17:49:31.784822 2966'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:49:30.195978 2812', 'function': 'tasks.mapper', 'args': ('Fyler',), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:49:30.195978 2812', 'task_id': '2018-04-05 17:49:30.195978 2812'}, 88)), ('sender', 'Client_366')])


Sending 500 bytes
Message:
OrderedDict([('correlation_id', '78644'), ('message_id', '2018-04-05 17:49:31.784822 2966'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:49:30.195978 2812', 'function': 'tasks.mapper', 'args': ('Fyler',), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-

Timeout: no result returned for request with id 2018-04-05 17:49:30.141388 2805

Data received: 535 bytes
Message:
OrderedDict([('correlation_id', '80487'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': None, 'function': 'tasks.mapper', 'args': ['and'], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:49:30.196046 2816', 'message_type': 'result', 'task_id': '2018-04-05 17:49:30.196046 2816', 'correlation_id': '2018-04-05 17:49:30.196046 2816', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '80487'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '80594'), ('function', 'dequeue_task'), ('message_id', '80594'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Processed result:
OrderedDict


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '81888'), ('function', 'dequeue_task'), ('message_id', '81888'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Processed result:
OrderedDict([('correlation_id', '81888'), ('message_id', '2018-04-05 17:49:34.895806 3102'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890499'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:49:30.196141 2822', 'function': 'tasks.mapper', 'args': ('a',), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:49:30.196141 2822', 'task_id': '2018-04-05 17:49:30.196141 2822'}, 78)), ('sender', 'Client_366')])


Sending 496 bytes
Message:
OrderedDict([('correlation_id', '81888'), ('message_id', '2018-04-05 17:49:34.895806 3102'), ('message_type', 'result'), ('receiver', 'Nod


Data received: 543 bytes
Message:
OrderedDict([('correlation_id', '83314'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': ['with', 1], 'function': 'tasks.mapper', 'args': ['with'], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:49:30.196126 2821', 'message_type': 'result', 'task_id': '2018-04-05 17:49:30.196126 2821', 'correlation_id': '2018-04-05 17:49:30.196126 2821', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '83314'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Data received: 547 bytes
Message:
OrderedDict([('correlation_id', '83069'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': ['astray', 1], 'function': 'tasks.mapper', 'args': ['astray'], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:49:30.196178 2824', 'message_type': 'result', 'task_id': '2018-04-05 17:49:30.196178 2824', 'correlation_id': '2018-04-05 17:49:30


Data received: 543 bytes
Message:
OrderedDict([('correlation_id', '84236'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': ['fold', 1], 'function': 'tasks.mapper', 'args': ['fold'], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:49:30.196229 2827', 'message_type': 'result', 'task_id': '2018-04-05 17:49:30.196229 2827', 'correlation_id': '2018-04-05 17:49:30.196229 2827', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '84236'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '84384'), ('function', 'dequeue_task'), ('message_id', '84384'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '84384'), ('message_id', '2018-04-05 17:49:37.40920

Message:
OrderedDict([('correlation_id', '85777'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': ['hands', 1], 'function': 'tasks.mapper', 'args': ['hands'], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:49:30.196326 2833', 'message_type': 'result', 'task_id': '2018-04-05 17:49:30.196326 2833', 'correlation_id': '2018-04-05 17:49:30.196326 2833', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '85777'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '85884'), ('function', 'dequeue_task'), ('message_id', '85884'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Processed result:
OrderedDict([('correlation_id', '85884'), ('message_id', '2018-04-05 17:49:38.732374 3269'), ('message_type


Data received: 534 bytes
Message:
OrderedDict([('correlation_id', '87031'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': None, 'function': 'tasks.mapper', 'args': ['to'], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:49:30.196390 2837', 'message_type': 'result', 'task_id': '2018-04-05 17:49:30.196390 2837', 'correlation_id': '2018-04-05 17:49:30.196390 2837', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '87031'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '87136'), ('function', 'dequeue_task'), ('message_id', '87136'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '87136'), ('message_id', '2018-04-05 17:49:39.997098 3324'),

Message:
OrderedDict([('correlation_id', '88107'), ('function', 'dequeue_task'), ('message_id', '88107'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Processed result:
OrderedDict([('correlation_id', '88107'), ('message_id', '2018-04-05 17:49:41.565641 3394'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:49:30.196555 2846', 'function': 'tasks.mapper', 'args': ('the',), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:49:30.196555 2846', 'task_id': '2018-04-05 17:49:30.196555 2846'}, 54)), ('sender', 'Client_366')])


Sending 498 bytes
Message:
OrderedDict([('correlation_id', '88107'), ('message_id', '2018-04-05 17:49:41.565641 3394'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371'), ('r

Message:
OrderedDict([('correlation_id', '89705'), ('function', 'dequeue_task'), ('message_id', '89705'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Processed result:
OrderedDict([('correlation_id', '89705'), ('message_id', '2018-04-05 17:49:43.126149 3446'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890499'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:49:30.196641 2851', 'function': 'tasks.mapper', 'args': ('him',), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:49:30.196641 2851', 'task_id': '2018-04-05 17:49:30.196641 2851'}, 49)), ('sender', 'Client_366')])


Sending 498 bytes
Message:
OrderedDict([('correlation_id', '89705'), ('message_id', '2018-04-05 17:49:43.126149 3446'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890499'), ('r


Processed result:
OrderedDict([('correlation_id', '91086'), ('message_id', '2018-04-05 17:49:44.245054 3490'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890499'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:49:30.196706 2855', 'function': 'tasks.mapper', 'args': ('him:',), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:49:30.196706 2855', 'task_id': '2018-04-05 17:49:30.196706 2855'}, 45)), ('sender', 'Client_366')])


Sending 499 bytes
Message:
OrderedDict([('correlation_id', '91086'), ('message_id', '2018-04-05 17:49:44.245054 3490'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890499'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:49:30.196706 2855', 'function': 'tasks.mapper', 'args': ('him:',), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '92577'), ('function', 'dequeue_task'), ('message_id', '92577'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '92577'), ('message_id', '2018-04-05 17:49:45.629332 3547'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890c49'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:49:30.196793 2860', 'function': 'tasks.mapper', 'args': ('grossly',), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:49:30.196793 2860', 'task_id': '2018-04-05 17:49:30.196793 2860'}, 40)), ('sender', 'Client_366')])


Sending 502 bytes
Message:
OrderedDict([('correlation_id', '92577'), ('message_id', '2018-04-05 17:49:45.629332 3547'), ('message_type', 'result'), ('receiver'

Message:
OrderedDict([('correlation_id', '93982'), ('function', 'dequeue_task'), ('message_id', '93982'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Processed result:
OrderedDict([('correlation_id', '93982'), ('message_id', '2018-04-05 17:49:47.064128 3605'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:49:30.196878 2865', 'function': 'tasks.mapper', 'args': ('bleated',), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:49:30.196878 2865', 'task_id': '2018-04-05 17:49:30.196878 2865'}, 35)), ('sender', 'Client_366')])


Sending 502 bytes
Message:
OrderedDict([('correlation_id', '93982'), ('message_id', '2018-04-05 17:49:47.064128 3605'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'),

Message:
OrderedDict([('correlation_id', '95264'), ('function', 'dequeue_task'), ('message_id', '95264'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Processed result:
OrderedDict([('correlation_id', '95264'), ('message_id', '2018-04-05 17:49:48.389443 3659'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:49:30.196963 2870', 'function': 'tasks.mapper', 'args': ('mournful',), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:49:30.196963 2870', 'task_id': '2018-04-05 17:49:30.196963 2870'}, 30)), ('sender', 'Client_366')])


Sending 503 bytes
Message:
OrderedDict([('correlation_id', '95264'), ('message_id', '2018-04-05 17:49:48.389443 3659'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371')


Data received: 551 bytes
Message:
OrderedDict([('correlation_id', '96641'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': ['mournful', 1], 'function': 'tasks.mapper', 'args': ['mournful'], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:49:30.196963 2870', 'message_type': 'result', 'task_id': '2018-04-05 17:49:30.196963 2870', 'correlation_id': '2018-04-05 17:49:30.196963 2870', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '96641'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '96774'), ('function', 'dequeue_task'), ('message_id', '96774'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '96774'), ('message_id', '2018-04-05 17:49:


Data received: 223 bytes
Message:
OrderedDict([('correlation_id', '97829'), ('function', 'dequeue_task'), ('message_id', '97829'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d891371'), ('sender', 'NodeMCU_b4e62d891371')])


Processed result:
OrderedDict([('correlation_id', '97829'), ('message_id', '2018-04-05 17:49:51.160583 3777'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d891371'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:49:30.197130 2880', 'function': 'tasks.mapper', 'args': ('said',), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:49:30.197130 2880', 'task_id': '2018-04-05 17:49:30.197130 2880'}, 20)), ('sender', 'Client_366')])


Sending 499 bytes
Message:
OrderedDict([('correlation_id', '97829'), ('message_id', '2018-04-05 17:49:51.160583 3777'), ('message_type', 'result'), ('receiver', '


Data received: 543 bytes
Message:
OrderedDict([('correlation_id', '99496'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': ['Wolf', 1], 'function': 'tasks.mapper', 'args': ['Wolf'], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:49:30.197165 2882', 'message_type': 'result', 'task_id': '2018-04-05 17:49:30.197165 2882', 'correlation_id': '2018-04-05 17:49:30.197165 2882', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '99496'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Data received: 535 bytes
Message:
OrderedDict([('correlation_id', '99152'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': None, 'function': 'tasks.mapper', 'args': ['the'], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:49:30.197148 2881', 'message_type': 'result', 'task_id': '2018-04-05 17:49:30.197148 2881', 'correlation_id': '2018-04-05 17:49:30.197148 2881


Data received: 538 bytes
Message:
OrderedDict([('correlation_id', '101329'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': None, 'function': 'tasks.mapper', 'args': ['"No'], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:49:30.197261 2888', 'message_type': 'result', 'task_id': '2018-04-05 17:49:30.197261 2888', 'correlation_id': '2018-04-05 17:49:30.197261 2888', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '101329'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Data received: 225 bytes
Message:
OrderedDict([('correlation_id', '101435'), ('function', 'dequeue_task'), ('message_id', '101435'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])


Processed result:
OrderedDict([('correlation_id', '101435'), ('message_id', '2018-04-05 17:49:54.542205 3



Data received: 225 bytes
Message:
OrderedDict([('correlation_id', '102889'), ('function', 'dequeue_task'), ('message_id', '102889'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890499'), ('sender', 'NodeMCU_b4e62d890499')])


Processed result:
OrderedDict([('correlation_id', '102889'), ('message_id', '2018-04-05 17:49:55.740529 3979'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890499'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:49:30.197386 2894', 'function': 'tasks.mapper', 'args': ('Lamb',), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:49:30.197386 2894', 'task_id': '2018-04-05 17:49:30.197386 2894'}, 6)), ('sender', 'Client_366')])


Sending 499 bytes
Message:
OrderedDict([('correlation_id', '102889'), ('message_id', '2018-04-05 17:49:55.740529 3979'), ('message_type', 'result'), ('receiver


Data received: 225 bytes
Message:
OrderedDict([('correlation_id', '103901'), ('function', 'dequeue_task'), ('message_id', '103901'), ('message_type', 'function'), ('need_result', True), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890a95'), ('sender', 'NodeMCU_b4e62d890a95')])


Processed result:
OrderedDict([('correlation_id', '103901'), ('message_id', '2018-04-05 17:49:57.029533 4033'), ('message_type', 'result'), ('receiver', 'NodeMCU_b4e62d890a95'), ('reply_to', 'Client_366'), ('result', ({'sender': 'Client_366', 'message_type': 'function', 'message_id': '2018-04-05 17:49:30.197473 2899', 'function': 'tasks.mapper', 'args': ('tasted',), 'need_result': True, 'reply_to': 'Client_366', 'correlation_id': '2018-04-05 17:49:30.197473 2899', 'task_id': '2018-04-05 17:49:30.197473 2899'}, 1)), ('sender', 'Client_366')])


Sending 501 bytes
Message:
OrderedDict([('correlation_id', '103901'), ('message_id', '2018-04-05 17:49:57.029533 4033'), ('message_type', 'result'), ('receive


Data received: 551 bytes
Message:
OrderedDict([('correlation_id', '105191'), ('function', 'enqueue_result'), ('kwargs', {'message': {'result': ['grass"', 1], 'function': 'tasks.mapper', 'args': ['grass"'], 'reply_to': 'Client_366', 'message_id': '2018-04-05 17:49:30.197492 2900', 'message_type': 'result', 'task_id': '2018-04-05 17:49:30.197492 2900', 'correlation_id': '2018-04-05 17:49:30.197492 2900', 'need_result': True, 'sender': 'Client_366'}}), ('message_id', '105191'), ('message_type', 'function'), ('receiver', 'Client_366'), ('reply_to', 'NodeMCU_b4e62d890c49'), ('sender', 'NodeMCU_b4e62d890c49')])

********** result:
words count: 94

[('Lamb', 5), ('grossly', 3), ('Wolf', 2), ('year', 1), ('with', 1), ('voice', 1), ('violent', 1), ('tone', 1), ('thus', 1), ('then', 1)]
**********


In [14]:
# Stopping
the_client.stop()
the_client = None
print('\n[________________ Demo stopped ________________]\n')

[Closed: ('123.240.210.68', 1883)]

[________________ Demo stopped ________________]

