Skip to content

Commit

Permalink
added loads of missing method and class documentation, as well as som…
Browse files Browse the repository at this point in the history
…e further documentation of starling client config options.
  • Loading branch information
purzelrakete committed Oct 2, 2008
1 parent a85ca1e commit 9c63144
Show file tree
Hide file tree
Showing 18 changed files with 153 additions and 17 deletions.
2 changes: 1 addition & 1 deletion CHANGES.markdown
@@ -1,4 +1,4 @@
Version 0.3, 25.10.08
Version 0.3, 25.09.08
- added backgroundjob runner
- added automatic detection of starling, spawn and backgroundjob to set default runner
- made logging of exceptions more consistent across runners.
Expand Down
8 changes: 7 additions & 1 deletion README.markdown
Expand Up @@ -108,12 +108,18 @@ Workling copies a file called starling.yml into your applications config directo

Notice that the default production port is 15151. This means you'll need to start Starling with -p 15151 on production.

You can also use this config file to pass configuration options to the memcache client which workling uses to connect to starling:
You can also use this config file to pass configuration options to the memcache client which workling uses to connect to starling. use the key 'memcache_options' for this.

development:
listens_on: localhost:22122
sleep_time: 2
reset_time: 30
memcache_options:
<memcachesetting>: <value>
Sleep time determines the wait time between polls against polls. A single poll will do one .get on every queue (there is a corresponding queue for each worker method).

If there is a memcache error, the Poller will hang for a bit to give it a chance to fire up again and reset the connection. The wait time can be set with the key reset_time.

## Seeing what Starling is doing

Expand Down
4 changes: 4 additions & 0 deletions lib/workling.rb
@@ -1,3 +1,6 @@
#
# I can haz am in your Workling are belong to us!
#
module Workling
class WorklingError < StandardError; end
class WorklingNotFoundError < WorklingError; end
Expand All @@ -9,6 +12,7 @@ def initialize

mattr_accessor :load_path
@@load_path = File.expand_path(File.join(File.dirname(__FILE__), '../../../../app/workers'))
VERSION = "0.3"

#
# determine the runner to use if nothing is specifically set. workling will try to detect
Expand Down
3 changes: 3 additions & 0 deletions lib/workling/discovery.rb
@@ -1,3 +1,6 @@
#
# Discovery is responsible for loading workers in app/workers.
#
module Workling
class Discovery
cattr_accessor :discovered
Expand Down
5 changes: 5 additions & 0 deletions lib/workling/remote.rb
Expand Up @@ -5,8 +5,13 @@

require 'digest/md5'

#
# Scoping Module for Runners.
#
module Workling
module Remote

# set the desired runner here. this is initialized with Workling.default_runner.
mattr_accessor :dispatcher
@@dispatcher = Workling.default_runner

Expand Down
8 changes: 8 additions & 0 deletions lib/workling/remote/runners/backgroundjob_runner.rb
@@ -1,5 +1,10 @@
require 'workling/remote/runners/base'

#
# Use Ara Howards BackgroundJob to run the work. BackgroundJob loads Rails once per requested Job.
# It persists over the database, and there is no requirement for separate processes to be started.
# Since rails has to load before each request, it takes a moment for the job to run.
#
module Workling
module Remote
module Runners
Expand All @@ -11,6 +16,9 @@ def initialize
Workling::Starling::Routing::ClassAndMethodRouting.new
end

# passes the job to bj by serializing the options to xml and passing them to
# ./script/bj_invoker.rb, which in turn routes the deserialized args to the
# appropriate worker.
def run(clazz, method, options = {})
stdin = @@routing.queue_for(clazz, method) +
" " +
Expand Down
23 changes: 22 additions & 1 deletion lib/workling/remote/runners/base.rb
@@ -1,3 +1,23 @@
#
# Base class for Workling Runners.
#
# Runners must subclass this and implement the method
#
# Workling::Remote::Runners::Base#run(clazz, method, options = {})
#
# which is responsible for pushing the requested job into the background. Depending
# on the Runner, this may require other code to dequeue the job. The actual
# invocation of the runner should be done like this:
#
# Workling.find(clazz, method).dispatch_to_worker_method(method, options)
#
# This ensures for consistent logging and handling of propagated exceptions. You can
# also call the convenience method
#
# Workling::Remote::Runners::Base#dispatch!(clazz, method, options)
#
# which invokes this for you.
#
module Workling
module Remote
module Runners
Expand All @@ -8,7 +28,8 @@ def logger
Workling::Base.logger
end

# find the worker instance and invoke it.
# find the worker instance and invoke it. Invoking the worker method like this ensures for
# consistent logging and handling of propagated exceptions.
def dispatch!(clazz, method, options)
Workling.find(clazz, method).dispatch_to_worker_method(method, options)
end
Expand Down
7 changes: 7 additions & 0 deletions lib/workling/remote/runners/not_remote_runner.rb
@@ -1,9 +1,16 @@
require 'workling/remote/runners/base'

#
# directly dispatches to the worker method, in-process. options are first marshalled then dumped
# in order to simulate the sideeffects of a remote call.
#
module Workling
module Remote
module Runners
class NotRemoteRunner < Workling::Remote::Runners::Base

# directly dispatches to the worker method, in-process. options are first marshalled then dumped
# in order to simulate the sideeffects of a remote call.
def run(clazz, method, options = {})
options = Marshal.load(Marshal.dump(options)) # get this to behave more like the remote runners
dispatch!(clazz, method, options)
Expand Down
15 changes: 15 additions & 0 deletions lib/workling/remote/runners/spawn_runner.rb
@@ -1,5 +1,19 @@
require 'workling/remote/runners/base'

#
# Run the job over the spawn plugin. Refer to the README for instructions on
# installing Spawn.
#
# Spawn forks the entire process once for each job. This means that the job starts
# with a very low latency, but takes up more memory for each job.
#
# It's also possible to configure Spawn to start a Thread for each job. Do this
# by setting
#
# Workling::Remote::Runners::SpawnRunner.options = { :method => :thread }
#
# Have a look at the Spawn README to find out more about the characteristics of this.
#
module Workling
module Remote
module Runners
Expand All @@ -8,6 +22,7 @@ class SpawnRunner < Workling::Remote::Runners::Base
@@options = { :method => :fork }
include Spawn if Workling.spawn_installed?

# dispatches to Spawn, using the :fork option.
def run(clazz, method, options = {})
spawn(SpawnRunner.options) do # exceptions are trapped in here.
dispatch!(clazz, method, options)
Expand Down
13 changes: 13 additions & 0 deletions lib/workling/remote/runners/starling_runner.rb
@@ -1,5 +1,17 @@
require 'workling/remote/runners/base'

#
# Runs Jobs over Starling. For exact Instructions on using this runner, see the README.
#
# Starling is Blaine Cook's Ruby Queueing Server. Jobs are dispatched by placing
# the on Starling Queues. The Runner takes care of mapping of queue names to worker code.
# this is done with Workling::ClassAndMethodRouting, but you can use your own by sublassing Workling::Routing.
# Don’t worry about any of this if you’re not dealing directly with the queues.
#
# There’s a workling-starling-client daemon that polls Starling and then dispatches jobs these to the
# responsible workers. If you intend to run this on a remote machine, then just check out your rails project
# there and start up the starling client.
#
module Workling
module Remote
module Runners
Expand All @@ -12,6 +24,7 @@ def initialize
StarlingRunner.routing = Workling::Starling::Routing::ClassAndMethodRouting.new
end

# enqueues the job onto Starling.
def run(clazz, method, options = {})
StarlingRunner.client.set(@@routing.queue_for(clazz, method), options)

Expand Down
14 changes: 13 additions & 1 deletion lib/workling/return/store/base.rb
@@ -1,21 +1,33 @@
#
# Basic interface for getting and setting Data which needs to be passed between Workers and
# client code.
#
module Workling
module Return
module Store
mattr_accessor :instance

# set a value in the store with the given key. delegates to the returnstore.
def self.set(key, value)
self.instance.set(key, value)
end

# get a value from the store. this should be destructive. delegates to the returnstore.
def self.get(key)
self.instance.get(key)
end

#
# Base Class for Return Stores. Subclasses need to implement set and get.
#
class Base

# set a value in the store with the given key.
def set(key, value)
raise NotImplementedError.new("set(key, value) not implemented in #{ self.class }")
end


# get a value from the store. this should be destructive.
def get(key)
raise NotImplementedError.new("get(key) not implemented in #{ self.class }")
end
Expand Down
4 changes: 3 additions & 1 deletion lib/workling/return/store/memory_return_store.rb
@@ -1,6 +1,8 @@
require 'workling/return/store/base'

# this is for tests only - not for production use. aight?
#
# Stores directly into memory. This is for tests only - not for production use. aight?
#
module Workling
module Return
module Store
Expand Down
6 changes: 6 additions & 0 deletions lib/workling/return/store/starling_return_store.rb
@@ -1,6 +1,10 @@
require 'workling/return/store/base'
require 'workling/starling/client'

#
# Recommended Return Store if you are using the Starling Runner. This
# Simply sets and gets values against queues. 'key' is the name of the respective Queue.
#
module Workling
module Return
module Store
Expand All @@ -11,10 +15,12 @@ def initialize
self.client = Workling::Starling::Client.new
end

# set a value in the queue 'key'.
def set(key, value)
self.class.client.set(key, value)
end

# get a value from starling queue 'key'.
def get(key)
self.class.client.get(key)
end
Expand Down
24 changes: 20 additions & 4 deletions lib/workling/starling/client.rb
@@ -1,11 +1,30 @@
require 'workling/starling'

#
# Wrapper for the starling connection. The connection is made using fiveruns-memcache-client,
# or memcache-client, if this is not available. See the README for a discussion of the memcache
# clients.
#
# method_missing delegates all messages through to the underlying memcache connection.
#
module Workling
module Starling
class Client

# the url with which the memcache client expects to reach starling
attr_accessor :starling_url

# the memcache connection object
attr_accessor :connection

#
# the client attempts to connect to starling using the configuration options found in
#
# Workling::Starling.config. this can be configured in config/starling.yml.
#
# the initialization code will raise an exception if memcache-client cannot connect
# to starling.
#
def initialize
@starling_url = Workling::Starling.config[:listens_on]
options = [self.starling_url, Workling::Starling.config[:memcache_options]].compact
Expand All @@ -14,14 +33,11 @@ def initialize
raise_unless_connected!
end

# delegates directly through to the memcache connection.
def method_missing(method, *args)
@connection.send(method, *args)
end

def stats
@connection.stats
end

private
# make sure we can actually connect to starling on the given port
def raise_unless_connected!
Expand Down
16 changes: 10 additions & 6 deletions lib/workling/starling/poller.rb
@@ -1,12 +1,17 @@
require 'workling/starling'

#
# Polls Starling and dispatches jobs onto the correct workers.
#
module Workling
module Starling

class Poller

cattr_accessor :sleep_time # Seconds to sleep before looping
cattr_accessor :reset_time # Seconds to wait while resetting connection
# Seconds to sleep before looping
cattr_accessor :sleep_time

# Seconds to wait while resetting connection
cattr_accessor :reset_time

def initialize(routing)
Poller.sleep_time = Workling::Starling.config[:sleep_time] || 2
Expand All @@ -16,9 +21,8 @@ def initialize(routing)
@workers = ThreadGroup.new
end

def logger
Workling::Base.logger
end
# returns the Workling::Base.logger
def logger; Workling::Base.logger; end

def listen

Expand Down
4 changes: 4 additions & 0 deletions lib/workling/starling/routing/base.rb
@@ -1,3 +1,7 @@
#
# Base Class for Routing. Routing takes the worker method TestWorker#something,
# and serializes the signature in some way.
#
module Workling
module Starling
module Routing
Expand Down
9 changes: 9 additions & 0 deletions lib/workling/starling/routing/class_and_method_routing.rb
@@ -1,32 +1,41 @@
require 'workling/starling/routing/base'

#
# Holds a hash of routes. Each Worker method has a corresponding hash entry after building.
#
module Workling
module Starling
module Routing
class ClassAndMethodRouting < Base

# initializes and builds routing hash.
def initialize
super

build
end

# returns the worker method name, given the routing string.
def method_name(queue)
queue.split("__").last
end

# returns the routing string, given a class and method. delegating.
def queue_for(clazz, method)
ClassAndMethodRouting.queue_for(clazz, method)
end

# returns the routing string, given a class and method.
def self.queue_for(clazz, method)
"#{ clazz.to_s.tableize }/#{ method }".split("/").join("__") # Don't split with : because it messes up memcache stats
end

# returns all routed
def queue_names
self.keys
end

# dare you to remove this! go on!
def queue_names_routing_class(clazz)
self.select { |x, y| y.is_a?(clazz) }.map { |x, y| x }
end
Expand Down

0 comments on commit 9c63144

Please sign in to comment.