touchlocal / remote_processing

A RPC mechanism that uses Workling as its transport

This URL has Read+Write access

dansketcher (author)
Mon Nov 23 17:55:45 -0800 2009
commit  6eed1634ce1701e7a93d8d7f9f46a1281a425268
tree    3afff6ca4a34213eb38fc2d0048c381976cbae7e
parent  678e6f4aee49e0a171a14761900e3b216662d95e
name age message
file README.markdown Loading commit data...
file init.rb
directory lib/
README.markdown

RemoteProcessing

RemoteProcessing is an RPC (Remote Procedure Call) mechanism that uses Workling as its Transport. This means that you get all the benefits of Workling to make code run in the background outside of your request, but with some mechanisms to make this a simpler and more pleasant thing to do.

In particular, Workling provides some rudimentary handling for getting response data back from a worker process, but you need to deal with the nitty-gritty of it yourself. The main focus of RemoteProcessing is to not get in the way of Workling for what it does well, but wrap up the complicated or unpleasant parts so that they are simpler to use. RemoteProcessing does not restrict how you configure Workling except to require that a ReturnStore is configured if you want to use the associated features in RemoteProcessing.

Installing RemoteProcessing

The easiest way of getting started with RemoteProcessing is like this:

script/plugin install git://github.com/touchlocal/remote_processing.git

See the Workling documentation for configuration details on it. The best idea is to set up Workling and make sure that it works FIRST, and then configure RemoteProcessing.

Configuring RemoteProcessing

As RemoteProcessing is designed to be a plug-in enhancement of Workling, it does not require any environment-level configuration. It does, however, require some slight changes to code in both the Worker and the caller invocation of the worker.

Take for example a standard Workling worker class

class PingPongWorker < Workling::Base

  def ping(options)
    begin
      puts("PingPongWorker#ping about to return for #{options[:uid]}")
      Workling.return.set(options[:uid], "pong")
      return true
    rescue Exception => e
      puts("PingPongWorker#ping about to RAISE for #{options.inspect.to_s} : #{e.message}")
      Workling.return.set(options[:uid], e) unless options[:uid].nil?
    end
  end

end

Ordinarily, you would call code similar to this in order to invoke the PingPongWorker asynchronously:

parameters = {}
results = nil
uid = PingPongWorker.async_ping(parameters)
uid.inspect
sleep(0.2)
results = Workling.return.get(uid)

Note that if you use the same parameters for more that one call to the async function, you should .clone the parameters beforehand as Workling injects a :uid key into them for each .async_* method call.

To make this process a bit less convoluted, we can easily include RemoteProcessing to wrap up the uid handling and so on:

class PingPongWorker < Workling::Base

  include RemoteProcessing::Responder
  remote_processing_responder :ping

  # ...

end

Now you can invoke the worker like this:

delegate = PingPongWorker.remote_delegate
delegate.async_ping(parameters)
delegate.synchronize # equivalent to the sleep(0.2) above, but will retry if no data is yet present, up to a timeout
results = delegate.response

Batch Processing

The real power of RemoteProcessing is when you have a few things that can be background-processed in parallel. The same include as specified above also allows the use of a Collection of Delegates, that can all be called and synchronized together:

# :autosynchronise defaults to false
# :timeout defaults to RemoteProcessing::MAX_TIMOUT (5 seconds)
collection = RemoteProcessing.execute(:autosynchronise => true, :timeout => 1) do |collection|
  PingPongWorker.remote_delegate(collection).async_ping({:id => 1})
  PingPongWorker.remote_delegate(collection).async_ping({:id => 2})
  PingPongWorker.remote_delegate(collection).async_ping({:id => 3})
end
# collection now holds an ordered list of Delegates, each with an
# (initially nil) response.
# if :autosynchronise is true, the collection will start to synchronize at 
# the end of the supplied block. If you can, leave :autosynchronise as
# as false, do any other work you can first (to give Workling time to
# populate the return store), and then synchronize at the last opportunity.

# collection.synchronize - Not necessary as :autosynchronise is used.
# .synchronize(timeout) can be called more than once if desired -
# the timeout can be different each time.

if collection.synchronized?
  # synchronized is GOOD, not synchronized is BAD as it means something went wrong or timed out.
  response1 = collection.shift.response
  response2 = collection.shift.response
  response3 = collection.shift.response
end

Durable Queues

The ability for queues and messages to survive in the event of a messaging server crash is important in the enterprise environment. Not all messages sent via this mechanism are useful to persist - search RPC calls may not be worth keeping in the event of a crash, but logging information on the other hand probably will be. To support this business need, durable message support has been included here.

Lets begin by adding a new method to our class:

class PingPongWorker < Workling::Base

  include RemoteProcessing::Responder
  remote_processing_responder :ping
  durable_methods :pang

  def ping(options)
    begin
      puts("PingPongWorker#ping about to return for #{options[:uid]}")
      Workling.return.set(options[:uid], "pong")
      return true
    rescue Exception => e
      puts("PingPongWorker#ping about to RAISE for #{options.inspect.to_s} : #{e.message}")
      Workling.return.set(options[:uid], e) unless options[:uid].nil?
    end
  end

  def pang(options)
    puts "PingPongWorker#pang"
  end

end

The new method pang (async_pang) has a declaration attached - durable_methods :pang. This tells RemoteProcessing to use the durable queues in Workling/RabbitMQ to send the message. While in the "Configuring RemoteProcessing" section above, we showed how no alteration of the default Workling configuration is required for the standard setup; however, for this feature, RemoteProcessing requires configuration some changes.

Here is the example Workling documentation for configuring it to use RabbitMQ:

# AQMP (RabbitMQ) SYNCHRONOUS INTERFACE implementation. Default port 5672
# Choose one of the Bunny or Carrot lines
Workling::Clients::SyncAmqpClient.client_class = Bunny # requires : config.gem "celldee-bunny", :lib => "bunny", :source => "http://gems.github.com"
Workling::Clients::SyncAmqpClient.client_class = Carrot # requires : config.gem "famoseagle-carrot", :lib => "carrot", :source => "http://gems.github.com"
Workling::Remote.dispatcher = Workling::Remote::Runners::ClientRunner.new
Workling::Remote.dispatcher.client = Workling::Clients::SyncAmqpClient.new
Workling::Return::Store.instance = Workling::Return::Store::SyncAmqpReturnStore.new

To use the durable queues, change:

Workling::Remote.dispatcher = Workling::Remote::Runners::ClientRunner.new
# to
Workling::Remote.dispatcher = RemoteProcessing::ClientRunner.new

and add:

Workling::Remote.dispatcher.durable_client = Workling::Clients::SyncAmqpClient.new(:durable_client)

Your workling.yml also must contain a durable_client section, as per:

sync_amqp_options:
  client: # default requirement
    queue_options:
      durable: false
    message_options:
      persistent: false
  durable_client:
    queue_options:
      durable: true
    message_options:
      persistent: true
  returnstore: # default requirement
    queue_options:
      durable: false
    message_options:
      persistent: false

At the moment, the durable queue mechanism is only supported via RabbitMQ, as only RabbitMQ has the need/ability to define one queue as durable and others as not. Starling, for example, can't do this, whereas Amazon SQS persists everything.

Everything else is as before, except as you call your async_* method over RemoteProcessing, the message is persisted on the server until such time as the Workling worker picks it up and removes it from the queue.

Copyright

The MIT License

Copyright (c) 2009 TouchLocal Ltd

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.