TODO: Write a gem description
Add this line to your application's Gemfile:
gem 'ElmerFudd'
And then execute:
$ bundle
Or install it yourself as:
$ gem install ElmerFudd
#!/usr/bin/env ruby
# Example app/worker/test_worker.rb in rails
require_relative "../../config/environment"
class TestLogger
def initialize(stream)
@stream = stream
end
def call(_env, message)
@stream.puts "[#{Time.now}] received on #{message.delivery_info.routing_key} payload: #{message.payload.inspect}"
end
end
class TestWorker < ElmerFudd::Worker
default_filters(ElmerFudd::JsonFilter)
handle_cast(Route('test.print')) do |env, message|
puts %{message: #{message.payload["text"]}}
end
handle_call(Route('test.ping')) do |env, message|
%{pong: #{message.payload["text"]}}
end
handle_call(Route('test.ping')) do |env, message|
%{pong: #{message.payload["text"]}}
end
handle_event(Route('test.log', 'a_topic_exchange' => 'test.#'),
handler: TestLogger.new($stdout))
end
if $PROGRAM_NAME == __FILE__
trap("TERM", "DEFAULT") { exit 0 }
$connection = Bunny.new
TestWorker.new($connection, concurrency: 4).start
loop { sleep(1) }
end
$connection = Bunny.new
$rabbit = ElmerFudd::JsonPublisher.new($connection)
$rabbit.cast('test.print', text: 'hello') # will print "message: hello"
$rabbit.call('test.ping', text: 'echo') #=> "pong: echo"
$rabbit.notify('a_topic_exchange', 'test.log.some.event', count: 1) # will print "[current time here] received on test.log.some.event payload: {count: 1}"
If you want to consume an event in all listening processes (instead of just first available one), pass an empty string as queue name:
handle_event(Route('', 'a_topic_exchange' => 'test.some.event'),
handler: TestLogger.new($stdout))
Filters allow to mutate an incoming message in consumer before it hits appropiate handler or to modify return value (or handle errors) after the handler finishes its job. You can define default filters which will be applied to all handlers in consumer or activate them on handler level by passing filters list as a param, i.e.:
handle_cast(Route('math.divide'),
filter: [DropFailedFilter.new(exception: ZeroDivisionError)]) do |env, message|
puts "#{payload['a'] / payload['b']}"
end
JsonFilter
- deserializes incoming messages using json and serializes call responses to jsonDropFailedFilter.new(exception: Exception, exception_message_matches: /.*/)
- ignore the message if handler raises matching exceptionAirbrakeFilter
- notify airbrake if handler raises exception and reraises it in filter chainActiveRecordConnectionPoolFilter
- allows to use worker with higher concurrency (takes a connection from pool for each message)RetryFilter.new(2, exception: Exception, exception_message_matches: /.*/)
- retries 2 times if handler raises a matching exception
- Fork it ( http://github.com//ElmerFudd/fork )
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request
** Credits