Runs workers via zmq to perform any kind of task
Pull request Compare This branch is 152 commits behind mozilla-services:master.
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Failed to load latest commit information.
examples
libhose
powerhose
.gitignore
Makefile
NOTES
README.rst
setup.py

README.rst

Powerhose

Powerhose is a single master/ multiple worker zmq lib, that can be used to push some work to specialized workers.

Powerhose uses Circus to manage the life of workers.

WARNING: This is still a work in progress - no stable version has been released yet

WARNING 2: consider using the https://github.com/traviscline/gevent-zeromq fork until it's merged into gevent-zeromq

Example

Worker

Let's create a worker that knows how to calculate a square of a number:

import sys
from powerhose.client.worker import Worker

endpoint = sys.argv[1]
workpoint = sys.argv[2]

def square(*args):
    number = int((args)[0][1])
    return str(number * number)

worker = Worker(endpoint, workpoint, target=square)
try:
    worker.run()
except KeyboardInterrupt:
    worker.stop()

The program can then be called like this:

$ python worker.py ipc://master.ipc ipc://routing.ipc

In this example, the Worker is instanciated with:

  • the endpoint, which is the socket where the master listens to workers that want to register.
  • the workpoint, the socket where the worker gets his jobs.
  • the target, which is the callable that receives jobs to perform.

The square function is getting the value in a string, and has to return the result as a string that's sent back to the master. Of course, you would use a real serializer/deserialzer when you operate with more complex data structures.

Workers

Now we want to run several workers, let's create another script for this:

from powerhose.client.workers import Workers
import sys

cmd = '%s examples/square_worker.py ipc://worker-routing-$WID.ipc'


workers = Workers(cmd % sys.executable)
try:
    workers.run()
except KeyboardInterrupt:
    workers.stop()

The Workers class will take care if creating 5 workers (default value) by running the provided command. Notice the $WID value - it will be changed with an id that's unique per worker.

Running the workers is then simply done with:

$ python workers.py

The script uses the Circus library, which takes care of making sure the workers are respawned in case they die.

Master

The master can look like this:

from powerhose.jobrunner import JobRunner
import time
import random
import sys

endpoint = "ipc:///tmp/master-routing.ipc"

runner = JobRunner(endpoint)
runner.start()

try:
    # wait to have at least 1 worker
    print 'Waiting for some workers to register -- run square_worker.py'
    while len(runner.workers) < 1:
        sys.stdout.write('.')
        sys.stdout.flush()
        time.sleep(1.)

    while True:
        print runner.execute('1', str(random.randrange(1000)))
except KeyboardInterrupt:
    runner.stop()
    print 'bye'

The master is run by create a JobRunner instance tied to am endpoint. Then the jobs are sent via the execute method.

Protocol

Principles:

  • The system is based on a single master and multiple workers.
  • The worker registers itself to the master, provides a socket, and wait for some work on it.
  • Workers are performing the work synchronously and send back the result immediatly.
  • The master use a simple round robin strategy to send some work to the workers. If all are busy, it waits a bit before it times out.
  • The worker pings the master on a regular basis and exits if it's unable to reach it. It attempts several time to reconnect to give a chance to the master to come back.
  • Workers are language agnostic
  • the system is not responsible to respawn a master or a worker that dies. It can use daemontools for this.

Registering a worker

  • The Master binds an endpoint and wait for workers to connect to it
  • The Worker connects to the master and provides its own socket.
  • The Master adds the worker in the list of available workers, and connect to the worker socket.
W                          M
--- PING + endoint  -->   Register the Worker
<-- PONG            ---

A worker can also unregister itself:

W                      M
--- REMOVE       -->   Register the Worker
<-- REMOVED      ---

Performing a task

  • The Master chooses the next worker in the queue of available workers
  • Once the master has a worker, it removes it from the queue and send work to it.
  • The worker peforms the job synchronously then return the result.
  • The master waits for the result, and after a certain timeout, ask another worker and remove the laggy worker from the queue
  • The master gets back the result, and put back the worker in the queue
M                 W
  --> JOB         --> do the job
  <-- JOBRES      ---

Heartbeat

  • The worker pings the master every N seconds.
  • If the master fails to answer after several attempts, the worker exits
  • The master that receives a ping from a unknown worker, registers it by adding it to the queue.
W                      M
--- PING + endpoint   -->   possibly : Register the Worker
<-- PONG              ---