Switch branches/tags
Nothing to show
Find file
Fetching contributors…
Cannot retrieve contributors at this time
201 lines (133 sloc) 5.08 KB


Powerhose is a single master/ multiple worker zmq lib, that can be used to push some work to specialized workers.

Powerhose uses Circus to manage the life of workers.

WARNING: This is still a work in progress - no stable version has been released yet

WARNING 2: consider using the fork until it's merged into gevent-zeromq



Let's create a worker that knows how to calculate a square of a number:

import sys
from powerhose.client.worker import Worker

endpoint = sys.argv[1]
workpoint = sys.argv[2]

def square(*args):
    number = int((args)[0][1])
    return str(number * number)

worker = Worker(endpoint, workpoint, target=square)
except KeyboardInterrupt:

The program can then be called like this:

$ python ipc://master.ipc ipc://routing.ipc

In this example, the Worker is instanciated with:

  • the endpoint, which is the socket where the master listens to workers that want to register.
  • the workpoint, the socket where the worker gets his jobs.
  • the target, which is the callable that receives jobs to perform.

The square function is getting the value in a string, and has to return the result as a string that's sent back to the master. Of course, you would use a real serializer/deserialzer when you operate with more complex data structures.


Now we want to run several workers, let's create another script for this:

from powerhose.client.workers import Workers
import sys

cmd = '%s examples/ ipc://worker-routing-$WID.ipc'

workers = Workers(cmd % sys.executable)
except KeyboardInterrupt:

The Workers class will take care if creating 5 workers (default value) by running the provided command. Notice the $WID value - it will be changed with an id that's unique per worker.

Running the workers is then simply done with:

$ python

The script uses the Circus library, which takes care of making sure the workers are respawned in case they die.


The master can look like this:

from powerhose.jobrunner import JobRunner
import time
import random
import sys

endpoint = "ipc:///tmp/master-routing.ipc"

runner = JobRunner(endpoint)

    # wait to have at least 1 worker
    print 'Waiting for some workers to register -- run'
    while len(runner.workers) < 1:

    while True:
        print runner.execute('1', str(random.randrange(1000)))
except KeyboardInterrupt:
    print 'bye'

The master is run by create a JobRunner instance tied to am endpoint. Then the jobs are sent via the execute method.



  • The system is based on a single master and multiple workers.
  • The worker registers itself to the master, provides a socket, and wait for some work on it.
  • Workers are performing the work synchronously and send back the result immediatly.
  • The master use a simple round robin strategy to send some work to the workers. If all are busy, it waits a bit before it times out.
  • The worker pings the master on a regular basis and exits if it's unable to reach it. It attempts several time to reconnect to give a chance to the master to come back.
  • Workers are language agnostic
  • the system is not responsible to respawn a master or a worker that dies. It can use daemontools for this.

Registering a worker

  • The Master binds an endpoint and wait for workers to connect to it
  • The Worker connects to the master and provides its own socket.
  • The Master adds the worker in the list of available workers, and connect to the worker socket.
W                          M
--- PING + endoint  -->   Register the Worker
<-- PONG            ---

A worker can also unregister itself:

W                      M
--- REMOVE       -->   Register the Worker
<-- REMOVED      ---

Performing a task

  • The Master chooses the next worker in the queue of available workers
  • Once the master has a worker, it removes it from the queue and send work to it.
  • The worker peforms the job synchronously then return the result.
  • The master waits for the result, and after a certain timeout, ask another worker and remove the laggy worker from the queue
  • The master gets back the result, and put back the worker in the queue
M                 W
  --> JOB         --> do the job
  <-- JOBRES      ---


  • The worker pings the master every N seconds.
  • If the master fails to answer after several attempts, the worker exits
  • The master that receives a ping from a unknown worker, registers it by adding it to the queue.
W                      M
--- PING + endpoint   -->   possibly : Register the Worker
<-- PONG              ---