Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Framework for creating queue based, threaded asychronous job processors.
Ruby
branch: master

Fetching latest commit…

Cannot retrieve the latest commit at this time

Failed to load latest commit information.
examples
lib
spec
test
.document
.gitignore
.rspec
CHANGELOG
Gemfile
Gemfile.lock
LICENSE.txt
README.rdoc
Rakefile
VERSION
task_tempest.gemspec

README.rdoc

task_tempest

Framework for building threaded asynchronous job processors.

Description

task_tempest basically lets you build glorified loops, reading messages from a queue and dispatching them to classes to handle. In short, it's just another background job processor for Ruby.

task_tempest is based on the idea of a thread pool; each background job is executed on a thread. This can achieve high throughput, but your code will need to be threadsafe.

task_tempest can also run in fibered mode in conjunction with EventMachine. This can achieve high(er?) throughput, but your code will need to be fiber aware.

task_tempest is queue agnostic; you can use whatever queue you like, as long as it supports two operations: enqueue, dequeue.

task_tempest is used in production at Onespot and processes over 1 million jobs a day.

Quickstart

Run the code below and both MathTempest.log and MathTempest.task.log will be created in your current working directory. To stop the tempest, send the process an Interrupt or SIGTERM.

require "task_tempest"

class MemoryQueue
  def initialize; @array = []; end
  def enqueue(message); @array.push(message); end
  def dequeue; @array.shift; end
end

class MathTempest < TaskTempest::Engine
  configure do
    threads 2
    queue MemoryQueue.new
  end
end

class Adder
  extend TaskTempest::Task
  def process(a, b)
    c = a + b
    task_logger.info "#{a} + #{b} = #{c}"
  end
end

10.times do
  a, b = rand(10), rand(10)
  MathTempest.submit(Adder, a, b)
end

MathTempest.run

Queue

The queue object given in the TaskTempest::Engine's configure block must define 2 methods: enqueue and dequeue.

task_tempest doesn't care what kind of queue you use or what you actually store in it, as long as dequeue returns a message as described here. You are responsible for any serialization and deserialization that may be required.

A message is an array with the following format:

[task_id, task_class_name, arg1, arg2, ...]

An example message is:

[nil, "Adder", 2, 5]

The queue's enqueue method will receive a message as its first argument, plus any additional arguments given to submit.

The queue's dequeue method must return a message.

If the task_id of a message is nil, then task_tempest will automatically assign an id.

Tasks

Messages are dispatched to a task class to handle. A task class must extend TaskTempest::Task and define a process method. The process method will be called with the arguments from the message being dispatched.

class Averager
  extend TaskTempest::Task
  configure_task do
    ...
  end
  def process(*args)
    avg = args.inject(0){ |memo, n| memo += n; memo } / args.length
    task_logger.info "The average of #{args.inspect} is #{avg}"
  end
end

MathTempest.submit(Averager, 2, 4)
# Will produce a message like ["9ac4f", "Averager", 2, 4]
# And eventually Averager.process(2, 4) will be called.

MathTempest.submit(Averager, 2, 4, 6)
# Will produce a message like ["b733c", "Averager", 2, 4, 6]
# And eventually Averager.process(2, 4, 6) will be called.

See TaskTempest::Task::Configuration for what options can be set in a task's configure block.

Logging

task_tempest logs to a main log and a task log.

The main log shows a high level view of all the tasks that are run (when they started, when the finished, if they failed, if they timed out, etc).

The task log shows detailed information about each task. Each line in the task log is prefixed with a task id. This is so that if you see a task failed in the main log, you can grep for its id in the task log.

Any logging done in a task using task_logger will be written to the task log.

Callbacks

Each task class can define callbacks for success, failure, and timeout of a task.

class Averager
  extend TaskTempest::Task
  configure_task do
    timeout 1.5
    after_failure proc { |exception|
      HoptoadNotifier.notify(exception)
    }
    after_timeout proc {
      task_logger.warn "Crap, I took longer than 1.5 seconds"
    }
  end
  ...
end

See TaskTempest::Task::Configuration for the names of each callback and arguments yielded.

EventMachine + Fibers

The fibers option puts task_tempest into fibered mode and also defines the size of the fiber pool.

class FiberedTempest << TaskTempest::Engine
  configure do
    fibers 5
    ...
  end
  ...
end

Now when FiberedTempest.run is called, The EventMachine reactor will be started and each task will be dispatched on a fiber.

If you run in fibered mode, you need to install fiber_storm.

Daemonizing

There is no code in task_tempest to run as a daemon, that is left to you. It's easy with the Daemons gem though.

Assuming your tempest is defined in my_tempest.rb, just put the following code at the bottom of the file.

if $0 == __FILE__
  require "daemons"
  Daemons.run_proc(MyTempest.conf.name, :log_output => true) do
    MyTempest.run
  end
end

Now you can run it as a daemon from the command line.

ruby my_tempest.rb start
ruby my_tempest.rb stop
ruby my_tempest.rb run # Run in foreground

See the Daemons rdoc for more info.

Rails

When task_tempest is used as an asynchronous task processor for a Rails app, you probably want to load the Rails environment so you can have access to models, configuration, etc.

You can load the Rails environment however you like, but if you choose to load it in one of task_tempest's initialization callbacks, then there are some caveats to be aware of.

class MathTempest < TaskTempest::Engine
  configure do
    root{ Rails.root }
    after_initialize proc {
      require "config/environment" # Load Rails.
    }
  end
end

That won't work and will result in an exception saying Rails isn't defined, because the root configuration option is used during the initialization process before the after_initialized callback is called. To fix the error, change after_initialized to before_initialize.

Complete examples

See the examples directory.

Copyright

Copyright © 2010 Christopher J. Bottaro. See LICENSE for details.

Something went wrong with that request. Please try again.