Skip to content

Commit

Permalink
LocalParticpant : added a reject(workitem) method
Browse files Browse the repository at this point in the history
  • Loading branch information
jmettraux committed Apr 19, 2010
1 parent 3aae82b commit 7b97228
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.txt
Expand Up @@ -4,6 +4,7 @@

== ruote - 2.1.10 not yet released

- LocalParticpant : added a reject(workitem) method
- participant exp : dispatched = true set right after dispatch


Expand Down
1 change: 1 addition & 0 deletions CREDITS.txt
Expand Up @@ -38,6 +38,7 @@ Richard Jennings
Feedback
--------

Avishai Shalom - discussion and ideas about participant/worker locality
Olle (foenixx) - barley issue with Sinatra 1.0
Gonzalo Suarez - many help
Francisco Kiko - many help
Expand Down
2 changes: 1 addition & 1 deletion TODO.txt
Expand Up @@ -195,6 +195,7 @@
[o] CompositeStorage.new('msgs' => AmqpStorage.new(''), ...)
[x] let the storage participant leverage Ruote::FlowExpressionId.from_id(s)
[o] Andrew's technique http://groups.google.com/group/openwferu-users/browse_thread/thread/c2aa4b53d1664d45/8523a1a5ee98fd71
[o] Avishai : LocalParticipant : repost dispatch message

[ ] exp : exp (restricted form of eval ?)
[ ] exp : case (is it necessary ?)
Expand Down Expand Up @@ -341,6 +342,5 @@
and perhaps also on
http://ruote.rubyforge.org/implementing_participants.html (Avish)

[ ] Avishai : LocalParticipant : repost dispatch message
[ ] Avishai : Worker : hook for rejecting the dispatch message

37 changes: 37 additions & 0 deletions lib/ruote/part/local_participant.rb
Expand Up @@ -31,6 +31,8 @@ module Ruote
# Assumes the class that includes this module has a #context method
# that points to the worker or engine ruote context.
#
# It's "local" because it has access to the ruote storage.
#
module LocalParticipant

attr_accessor :context
Expand All @@ -47,6 +49,41 @@ def reply_to_engine (workitem)
'workitem' => workitem.h,
'participant_name' => workitem.participant_name)
end

# WARNING : this method is only for 'stateless' participants, ie
# participants that are registered in the engine by passing their class
# and a set of options, like in
#
# engine.register_participant 'alpha', MyParticipant, 'info' => 'none'
#
# This reject method replaces the workitem in the [internal] message queue
# of the ruote engine (since it's a local participant, it has access to
# the storage and it's thus easy).
# The idea is that another worker will pick up the workitem and
# do the participant dispatching.
#
# This is an advanced technique. It was requested by people who
# want to have multiple workers and have only certain worker/participants
# do the handling.
# Using reject is not the best method, it's probably better to implement
# this by re-opening the Ruote::Worker class and changing the
# cannot_handle(msg) method.
#
# reject could be useful anyway, not sure now, but one could imagine
# scenarii where some participants reject workitems temporarily (while
# the same participant on another worker would accept it).
#
# Well, here it is, use with care.
#
def reject (workitem)

@context.storage.put_msg(
'dispatch',
'fei' => workitem.h.fei,
'workitem' => workitem.h,
'participant_name' => workitem.participant_name,
'rejected' => true)
end
end
end

46 changes: 46 additions & 0 deletions test/functional/ft_38_participant_reject.rb
@@ -0,0 +1,46 @@

#
# testing ruote
#
# Mon Apr 19 14:38:54 JST 2010
#
# Qcon Tokyo, special day
#

require File.join(File.dirname(__FILE__), 'base')

require 'ruote/part/local_participant'


class FtParticipantRejectTest < Test::Unit::TestCase
include FunctionalBase

class DifficultParticipant
include Ruote::LocalParticipant
def initialize (opts)
end
def consume (workitem)
context.tracer << "diff\n"
if workitem.fields['rejected'].nil?
workitem.fields['rejected'] = true
reject(workitem)
else
reply_to_engine(workitem)
end
end
end

def test_workitems_dispatching_message

pdef = Ruote.process_definition do
alpha
end

@engine.register_participant :alpha, DifficultParticipant

#noisy

assert_trace(%w[ diff diff ], pdef)
end
end

0 comments on commit 7b97228

Please sign in to comment.