Skip to content
Lin Zhao edited this page Aug 2, 2013 · 1 revision

Table of Contents

Messagebus::Swarm Overview

The swarm is an abstraction on top of the Messagebus Consumer's. It provides 3 things:

1) a mapping of destinations to workers who process the actual message

  "jms.topic.deal.v1.created" => DealCreatedWorker

2) a set of conventions around how to process a message

Instead of writing message consumers, you just define the actual message processing logic. The Drone handles consuming the messages and passing them on to you. Basically:

      destination_name = 'jms.topic.dealUpdated'
      work_processor = destination_to_worker_mapping[topic]
      work_processor.perform(message_payload)

There are 2 styles of workers. The simplest kind just takes a message and executes it.

      message_payload == { "some_key" => "some_value" }
    end
  end

Or if you want to handle multiple different topics with one worker:

  class DealCreatedWorker
    def self.perform_on_destination(message_payload, destination_name)
      message_payload == { "some_key" => "some_value" }
      destination_name == "jms.topic.deal.v1.created"
    end
  end

When you successfully process a message, all you need to do is let your perform method return.

  class DealCreatedWorker
    def self.perform(message_payload)
      # success
    end
  end

When you have an error processing a message, let the error trickle out, and later your message will be retried (per your queue/topic's configuration). This will be logged as an error.

  class DealCreatedWorker
    def self.perform(message_payload)
      raise "OMG this is broken, retry later"
    end
  end

If you want to skip processing a message, you can raise AbortProcessing and like an error, it'll retry later. This will not be handled as an error, it just won't be ack'd.

  class DealCreatedWorker
    def self.perform(message_payload)
      raise Messagebus::Swarm::Drone::AbortProcessing.new if i_cant_do_this_now
    end
  end

3) a script for starting/stopping a series of drones

The messagebus gem includes a binary called messagebus_swarm. When you run messagebus_swarm it creates a series of drones that process received messages. Let's assume we have a config with 3 drones:

config/messagebus.yml

  ....
  :workers:
    -
      :destination: jms.topic.deals.v1.dealCreated
      :worker: DealCreatedWorker
      :subscription_id: deal_created_worker_id
      :ack_on_error: false
      :drones: 3

When you boot up your swarm:

  messagebus_swarm -c config/messagebus.yml start &
  export SWARM_PID=$!

3 drones will be booted, setup to process (in parallel) jobs that come down on the dealCreated topic.

For a given subscription_id, HornetQ only sends 1 copy of a message. So this example configuration means you can at most process 3 of these messages at once, but each drone will be processing a different message.

You later can gracefully shut down this swarm:

  messagebus_swarm stop $SWARM_PID

Process Layout

Depending on how your config is setup, the underlying drones will either be processes or threads. By default, messagebus_swarm uses threads. This is most appropriate if you're using a ruby version that doesn't have a GIL (JRuby, Rubinius). However, if you're using a ruby version that has a (MRI) you'll likely want to work in process mode.

Thread mode (Default)

All one process.

  :swarm_config:
    # Don't fork use threads. Don't need to specify this
    :fork: false
  :workers:
    -
      :destination: jms.topic.deals.v1.dealCreated
      :worker: DealCreatedWorker
      :subscription_id: deal_created_worker_id
      :ack_on_error: false
      :drones: 3

Process mode

One master process with each drone running in its own forked subprocess.

  :swarm_config:
    # Fork off to run drones
    :fork: true
  :workers:
    -
      :destination: jms.topic.deals.v1.dealCreated
      :worker: DealCreatedWorker
      :subscription_id: deal_created_worker_id 
      :ack_on_error: false
      :drones: 3

Execute Code After Process Fork

If you need to execute any code after the subprocess has forked (eg- re-establish shared sockets/descriptors (ActiveRecord, Memcached, etc)), you can provide a block of code to Messagebus::Swarm::Controller.after_fork

  Messagebus::Swarm::Controller.after_fork do
    ActiveRecord::Base.establish_connection
  end

messagebus_swarm stop <pid>

messagebus_swarm start kicks off a long-running process. It does not daemonize. To stop the swarm, use use the stop command

  messagebus_swarm stop <PARENT_PROCESS_PID>

This is a graceful stop in that it will tell the workers to shut down and not pick up any additional jobs. It will not abort an in progress job.

This sends a TERM signal to . The parent process receives that signal, then automatically relays it to all the drone child processes. The drones take that signal to gracefully shutdown.

TERM was somewhat arbitrarily chosen, that's what's used by default by kill, and defaulting to graceful stop seems like a safe idea.