# Chapter 5 — Distributed Python
Examples for Celery, Dramatiq, and SCOOP.
> Many cells require external services (Redis/RabbitMQ) and won't run in an offline notebook.

## Celery — minimal app & tasks

In [None]:

from celery import Celery

# Choose your broker/backend; here we default to Redis on localhost.
# For RabbitMQ, set broker='pyamqp://guest@localhost//' and a suitable backend (e.g. 'rpc://', or Redis).
app = Celery('tasks', broker='redis://localhost//', backend='redis://localhost')
# Helpful for startup races with broker containers
app.conf.broker_connection_retry_on_startup = True

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)


### Client calls (requires running worker)

In [None]:

from tasks import add
# Basic fire-and-forget enqueue
r1 = add.delay(4, 4)
# Or via apply_async
r2 = add.apply_async(args=[10, 5])

print("Queued tasks. If you have a result backend configured, you can .get():")
try:
    print("add(4,4) =", r1.get(timeout=10))
    print("add(10,5) =", r2.get(timeout=10))
except Exception as e:
    print("Could not fetch results (did you configure a backend?):", e)


## Celery flows: group, chain, chord (Redis backend for chord)

In [None]:

from celery import group, chain, chord
from tasks import add, mul, xsum

# GROUP: run many independent tasks and collect all results
g = group(add.s(i, i) for i in range(10))()
print("GROUP results:", g.get())

# CHAIN: pipe result from one task into the next
c = chain(add.s(4, 4) | mul.s(8))()
print("CHAIN result:", c.get())

# CHORD: fan-out (group) + fan-in (callback)
# Requires a backend that supports chords (Redis, Database, Memcached, ...).
ch = chord((add.s(i, i) for i in range(10)), xsum.s())()
print("CHORD result:", ch.get())


## Monte Carlo π with Celery

In [None]:

from celery import Celery, group, chord
import random

app = Celery('pi_calculator', broker='redis://localhost//', backend='redis://localhost')
app.conf.broker_connection_retry_on_startup = True

@app.task
def simulate_points(n_points: int):
    inside = 0
    for _ in range(n_points):
        x, y = random.random(), random.random()
        if x*x + y*y <= 1.0:
            inside += 1
    return {'inside': inside, 'total': n_points}

@app.task
def calculate_pi(partials):
    total_points = sum(d['total'] for d in partials)
    inside = sum(d['inside'] for d in partials)
    return 4.0 * inside / total_points

@app.task
def run_simulation(n_points_per_task: int, n_tasks: int):
    sims = group(simulate_points.s(n_points_per_task) for _ in range(n_tasks))
    return chord(sims, calculate_pi.s())()


In [None]:

from pi_calculator import run_simulation, simulate_points, calculate_pi
from celery import group

# Two ways to run: 1) group + get + calculate locally; 2) single chord via run_simulation

# 1) Group then reduce manually
n_points_per_task = 200_000
n_tasks = 10
grp_res = group(simulate_points.s(n_points_per_task) for _ in range(n_tasks))()
partials = grp_res.get()
pi1 = calculate_pi(partials)
print("Pi (manual reduce):", pi1)

# 2) Single chord
chord_res = run_simulation.delay(n_points_per_task, n_tasks)
print("Pi (chord):", chord_res.get())


## Celery Queues and Routing

In [None]:

from celery import Celery
from kombu import Queue, Exchange

app = Celery('myapp', broker='redis://localhost//', backend='redis://localhost/')
app.conf.broker_connection_retry_on_startup = True

# Define queues and exchanges
app.conf.task_queues = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('high_priority', Exchange('priority'), routing_key='high.#'),
    Queue('low_priority',  Exchange('priority'), routing_key='low.#'),
)

# Routing rules
app.conf.task_routes = {
    'tasks.send_email':      {'queue': 'high_priority'},
    'tasks.process_data':    {'queue': 'low_priority'},
    'tasks.backup_database': {'queue': 'default'},
}

# Misc
app.conf.imports = ('tasks',)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'
app.conf.result_expires = 3600


In [None]:

from celery_app import app
import time

@app.task(queue='high_priority', priority=0)  # max priority
def send_email(recipient, subject, body):
    print(f"Sent email to {recipient} with subject '{subject}'")
    time.sleep(2)
    return f"Email sent to {recipient}"

@app.task(queue='low_priority', priority=9)   # min priority
def process_data(data):
    print(f"Start processing data: {data}")
    time.sleep(10)
    return f"Processed Data: {data}"

@app.task(queue='default')
def backup_database():
    print("Backup of database is running...")
    time.sleep(5)
    return "Backup completed"


In [None]:

from tasks import send_email, process_data, backup_database

if __name__ == "__main__":
    print("Starting tasks...")
    send_email.delay("user@example.com", "Welcome!", "Thanks for registration")
    send_email.delay("admin@example.com", "Alarm", "Critical problem found")
    process_data.delay("dataset_1")
    process_data.delay("dataset_2")
    backup_database.delay()
    print("Tasks sent")


## Dramatiq — actors

In [None]:

import dramatiq
import time

# Tip: For Redis broker, you can configure via environment variables or explicit setup.
# This minimal example relies on defaults when launched with `dramatiq dramaserver`.

@dramatiq.actor
def wait(t, n):
    time.sleep(t)
    print(f"I am the actor {n} and I will wait for {t} secs")


In [None]:

from dramaserver import wait

# Fire-and-forget multiple messages
for i in range(10):
    wait.send(i, f"#{i}")

print("End Program")


In [None]:

from dramatiq import group
from dramaserver import wait

g = group([
    wait.message(10, 'A'),
    wait.message(5,  'B'),
    wait.message(4,  'C'),
    wait.message(7,  'D'),
]).run()

print("End Program")


### Dramatiq with results via Redis

In [None]:

import time
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend

broker = RedisBroker(host="localhost")
dramatiq.set_broker(broker)
result_backend = RedisBackend(host="localhost")
broker.add_middleware(Results(backend=result_backend))

@dramatiq.actor(store_results=True)
def wait(t, n):
    time.sleep(t)
    print(f"I am the actor {n} and I will wait for {t} secs")
    return f"I waited for {t} secs"


In [None]:

from dramatiq import group
from dramaserver_results import wait
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend
import dramatiq

broker = RedisBroker(host="localhost")
dramatiq.set_broker(broker)
result_backend = RedisBackend(host="localhost")
broker.add_middleware(Results(backend=result_backend))

g = group([
    wait.message(10, 'A'),
    wait.message(5,  'B'),
    wait.message(4,  'C'),
    wait.message(7,  'D'),
]).run()

for res in g.get_results(block=True, timeout=12000):
    print(res)

print("End Program")


## SCOOP — basic mapping and reduction

In [None]:

from scoop import futures

def worker(value):
    print("I am the Worker %s" % value)

if __name__ == "__main__":
    # Synchronize with list() so the program waits for completion
    list(futures.map(worker, range(4)))


In [None]:

from scoop import futures
import math
import numpy as np

def func(value):
    result = math.sqrt(value)
    print("The value %s and the elaboration is %s" % (value, result))
    return result

if __name__ == "__main__":
    data = np.array([10,3,6,1,4,8,25,9])
    results = list(futures.map(func, data))
    for result in results:
        print("This is the result: %s" % result)


In [None]:

from scoop import futures
import math
import numpy as np
import operator

def func(value):
    result = math.sqrt(value)
    print("The value %s and the elaboration is %s" % (value, result))
    return result

if __name__ == "__main__":
    data = np.array([10,3,6,1,4,8,25,9])
    # Single reduce
    total = sum(futures.map(func, data))
    print("This is the reduction result:", total)

    # Using mapReduce
    total2 = futures.mapReduce(func, operator.add, data)
    print("This is the reduction result (mapReduce):", total2)

    # Chained mapping then reduction (mean of int(sqrt(x)))
    import numpy as np
    mean_val = np.mean(list(futures.map(int, futures.map(func, data))))
    print("Mean of int(sqrt(x)):", mean_val)


### Monte Carlo π with SCOOP

In [None]:

from scoop import futures
import random

def simulate_points(n_points):
    inside = 0
    for _ in range(n_points):
        x, y = random.random(), random.random()
        if x*x + y*y <= 1:
            inside += 1
    return inside

def main():
    n_points_total = 10_000_00  # 1e6 (adjust up for more accuracy)
    n_workers = 4
    n_points_per_worker = n_points_total // n_workers
    results = futures.map(simulate_points, [n_points_per_worker] * n_workers)
    total_inside = sum(results)
    pi_estimate = 4 * total_inside / n_points_total
    print(f"Estimate value of pi: {pi_estimate}")

if __name__ == "__main__":
    main()
