<?xml version="1.0" encoding="UTF-8"?>
<commit>
  <added type="array">
    <added>
      <filename>lib/rosetta_queue/exception_handler.rb</filename>
    </added>
    <added>
      <filename>spec/rosetta_queue/exception_handler_spec.rb</filename>
    </added>
    <added>
      <filename>spec/rosetta_queue/message_handler_spec.rb</filename>
    </added>
  </added>
  <modified type="array">
    <modified>
      <diff>@@ -4,3 +4,4 @@ tmp/*
 *.swp
 log/*
 pkg
+features/support/tmp</diff>
      <filename>.gitignore</filename>
    </modified>
    <modified>
      <diff>@@ -1,6 +1,9 @@
-== 0.2.x (git)
+== 0.4.x (git)
 
+== 0.4.0
  === New features
+ * Allows for custom exception handling logic to be registered for publishing and consuming actions. See ExceptionHandler for more info. (Ben Mabey)
+  * RQ was doing a blanket rescue for publishing and consuming and then logging the errors.  If you still want this behaviour you will need to register a block/class to do that. Again, see ExceptionHandler for more details.
  * Added core time extension when ActiveSupport not present. (Chris Wyckoff)
  === Bufixes
  * Closing the connection after publishing message now instead of unsubscribing. (Chris Wyckoff)
@@ -29,7 +32,7 @@
     * Allows user to purge a queue.
     * Works for AMQP adapters only.
   * Beanstalk Adaper (David Brady)
-    * Still needs some work to have it take advantage of beanstalk's subscribe funtionality.
+    * Still needs some work to have it take advantage of beanstalk's subscribe functionality.
 
 == 0.1.0 / 2008-01-28 - Initial Release
 RosettaQueue was realased in the wild! RQ's initial development was primarily sponsored by Alliance Health Networks (thanks!!).  The original authors were Chris Wyckoff and Ben Mabey. The initial release included adapters for stomp and amqp, in addition to a null and fake adapters for testing.</diff>
      <filename>History.txt</filename>
    </modified>
    <modified>
      <diff>@@ -1,10 +1,10 @@
 require 'rubygems'
 require 'spec/rake/spectask'
 require 'cucumber/rake/task'
- 
+
 Cucumber::Rake::Task.new do |t|
   t.cucumber_opts = &quot;--format pretty&quot;
-end   
+end
 
 desc &quot;Run the specs under spec&quot;
 Spec::Rake::SpecTask.new do |t|</diff>
      <filename>Rakefile</filename>
    </modified>
    <modified>
      <diff>@@ -11,6 +11,7 @@ require 'rosetta_queue/exceptions'
 require 'rosetta_queue/filters'
 require 'rosetta_queue/logger'
 require 'rosetta_queue/message_handler'
+require 'rosetta_queue/exception_handler'
 require 'rosetta_queue/producer'
 require 'rosetta_queue/consumer_managers/base'
 require 'rosetta_queue/consumer_managers/threaded'</diff>
      <filename>lib/rosetta_queue.rb</filename>
    </modified>
    <modified>
      <diff>@@ -68,7 +68,7 @@ module RosettaQueue
           ack = @options[:ack]
           queue.subscribe(@options) do |header, msg|
             RosettaQueue.logger.info(&quot;Receiving from #{destination} :: #{msg}&quot;)
-            message_handler.on_message(Filters.process_receiving(msg))
+            message_handler.handle_message(msg)
             header.ack if ack
           end
         end
@@ -107,7 +107,7 @@ module RosettaQueue
 
           queue.bind(exchange).subscribe(@options) do |header, msg|
             RosettaQueue.logger.info(&quot;Receiving from #{destination} :: #{msg}&quot;)
-            message_handler.on_message(Filters.process_receiving(msg))
+            message_handler.handle_message(msg)
             header.ack if ack
           end
         end</diff>
      <filename>lib/rosetta_queue/adapters/amqp_evented.rb</filename>
    </modified>
    <modified>
      <diff>@@ -68,7 +68,7 @@ module RosettaQueue
           @queue = conn.queue(destination, @options)
           @queue.subscribe(@options) do |msg|
             RosettaQueue.logger.info(&quot;Receiving from #{destination} :: #{msg}&quot;)
-            message_handler.on_message(Filters.process_receiving(msg))
+            message_handler.handle_message(msg)
             @queue.ack if ack
           end
         end
@@ -100,7 +100,7 @@ module RosettaQueue
           @queue.bind(exchange)
           @queue.subscribe(@options) do |msg|
             RosettaQueue.logger.info(&quot;Receiving from #{destination} :: #{msg}&quot;)
-            message_handler.on_message(Filters.process_receiving(msg))
+            message_handler.handle_message(msg)
             @queue.ack if ack
           end
         end</diff>
      <filename>lib/rosetta_queue/adapters/amqp_synch.rb</filename>
    </modified>
    <modified>
      <diff>@@ -10,7 +10,7 @@ module RosettaQueue
         end
 
         def destination_for(message_handler)
-          raise DestinationNotFound.new(&quot;Missing destination!&quot;) unless message_handler.destination
+          raise DestinationNotFound.new(&quot;Missing destination on message handler #{message_handler.inspect}.&quot;) unless message_handler.destination
           @dest ||= Destinations.lookup(message_handler.destination.to_sym)
         end
 </diff>
      <filename>lib/rosetta_queue/adapters/base.rb</filename>
    </modified>
    <modified>
      <diff>@@ -36,7 +36,7 @@ module RosettaQueue
         running do
           msg = receive.body
           RosettaQueue.logger.info(&quot;Receiving from #{destination} :: #{msg}&quot;)
-          message_handler.on_message(filter_receiving(msg))
+          message_handler.handle_message(msg)
         end
       end
 </diff>
      <filename>lib/rosetta_queue/adapters/beanstalk.rb</filename>
    </modified>
    <modified>
      <diff>@@ -47,7 +47,7 @@ module RosettaQueue
           msg = receive(options).body
           Thread.current[:processing] = true
           RosettaQueue.logger.info(&quot;Receiving from #{destination} :: #{msg}&quot;)
-          message_handler.on_message(filter_receiving(msg))
+          message_handler.handle_message(msg)
           Thread.current[:processing] = false
         end
       end</diff>
      <filename>lib/rosetta_queue/adapters/stomp.rb</filename>
    </modified>
    <modified>
      <diff>@@ -37,6 +37,22 @@ module RosettaQueue
         @receiving.call(message)
       end
 
+      def safe_process_sending(message)
+        safe(:process_sending, message)
+      end
+
+      def safe_process_receiving(message)
+        safe(:process_receiving, message)
+      end
+
+      private
+
+      def safe(filter_call, message)
+        send(filter_call)
+      rescue StandardError
+        message
+      end
+
     end
   end
 end</diff>
      <filename>lib/rosetta_queue/filters.rb</filename>
    </modified>
    <modified>
      <diff>@@ -2,7 +2,6 @@ module RosettaQueue
   module MessageHandler
 
     module ClassMethods
-
       attr_reader :destination, :options_hash
 
       def options(options = {})
@@ -20,20 +19,34 @@ module RosettaQueue
 
     def self.included(receiver)
       receiver.extend(ClassMethods)
-      attr_accessor :adapter_proxy
+    end
 
-      def destination
-        self.class.destination
-      end
+    attr_accessor :adapter_proxy
 
-      def options_hash
-        self.class.options_hash
-      end
+    def destination
+      self.class.destination
+    end
 
-      def ack
-        adapter_proxy.ack unless adapter_proxy.nil?
+    def options_hash
+      self.class.options_hash
+    end
+
+    def handle_message(unfiltered_message)
+      ExceptionHandler::handle(:publishing,
+        lambda {
+          { :message =&gt; Filters.safe_process_receiving(unfiltered_message),
+            :destination =&gt; destination,
+            :action =&gt; :consuming,
+            :options =&gt; options_hash
+          }
+        } ) do
+        on_message(Filters.process_receiving(unfiltered_message))
       end
+    end
 
+    def ack
+      adapter_proxy.ack unless adapter_proxy.nil?
     end
+
   end
 end</diff>
      <filename>lib/rosetta_queue/message_handler.rb</filename>
    </modified>
    <modified>
      <diff>@@ -4,11 +4,18 @@ module RosettaQueue
     include MessageHandler
 
     def self.publish(destination, message, options = {})
-      RosettaQueue::Adapter.instance.send_message(Destinations.lookup(destination), Filters.process_sending(message), options)
-
-      rescue Exception =&gt; e
-        RosettaQueue.logger.error(&quot;Caught exception in Consumer.publish: #{$!}\n&quot; + e.backtrace.join(&quot;\n\t&quot;))
+      ExceptionHandler::handle(:publishing,
+        lambda {
+          {:message =&gt; Filters.safe_process_sending(message),
+           :action =&gt; :publishing,
+           :destination =&gt; destination,
+           :options =&gt; options}
+        }) do
+        RosettaQueue::Adapter.instance.send_message(
+          Destinations.lookup(destination),
+          Filters.process_sending(message),
+          options)
+      end
     end
-
   end
 end</diff>
      <filename>lib/rosetta_queue/producer.rb</filename>
    </modified>
    <modified>
      <diff>@@ -28,7 +28,7 @@ module RosettaQueue
     # Example:
     # consume_once_with ClientStatusConsumer
     def consume_once_with(consumer)
-      consumer.new.on_message(RosettaQueue::Consumer.receive(consumer.destination))
+      consumer.new.handle_message(RosettaQueue::Consumer.receive(consumer.destination))
     end
 
     # Consumes the first message on queue and returns it.</diff>
      <filename>lib/rosetta_queue/spec_helpers/helpers.rb</filename>
    </modified>
    <modified>
      <diff>@@ -20,10 +20,9 @@ module RosettaQueue::Gateway
 
     describe &quot;#do_exchange&quot; do
 
-      it &quot;should filter the message and forward it to the handler&quot; do
+      it &quot;should delegate message handling to the message handler&quot; do
         when_receiving_exchange {
-          ::RosettaQueue::Filters.should_receive(:process_receiving).with(@msg).and_return(&quot;Filtered Message&quot;)
-          @handler.should_receive(:on_message).with(&quot;Filtered Message&quot;)
+          @handler.should_receive(:handle_message).with(&quot;Hello World!&quot;)
         }
       end
     end
@@ -35,7 +34,7 @@ module RosettaQueue::Gateway
       RosettaQueue.logger.stub!(:info)
       @msg = &quot;Hello World!&quot;
       @adapter = AmqpSynchAdapter.new({:user =&gt; &quot;foo&quot;, :password =&gt; &quot;bar&quot;, :host =&gt; &quot;localhost&quot;})
-      @handler = mock(&quot;handler&quot;, :on_message =&gt; true, :destination =&gt; :foo, :options_hash =&gt; {:durable =&gt; true})
+      @handler = mock(&quot;handler&quot;, :handle_message =&gt; true, :destination =&gt; :foo, :options_hash =&gt; {:durable =&gt; true})
     end
 
     describe AmqpSynchAdapter do
@@ -68,7 +67,7 @@ module RosettaQueue::Gateway
         end
 
         before(:each) do
-          @handler = mock(&quot;handler&quot;, :on_message =&gt; true, :destination =&gt; :foo, :options_hash =&gt; {:durable =&gt; true })
+          @handler = mock(&quot;handler&quot;, :handle_message =&gt; true, :destination =&gt; :foo, :options_hash =&gt; {:durable =&gt; true })
         end
 
         it &quot;should pass message handler to exchange strategy&quot; do
@@ -97,7 +96,7 @@ module RosettaQueue::Gateway
         @queue = mock(&quot;Bunny::Queue&quot;, :pop =&gt; @msg, :publish =&gt; true, :unsubscribe =&gt; true)
         Bunny.stub!(:new).and_return(@conn = mock(&quot;Bunny::Client&quot;, :queue =&gt; @queue, :exchange =&gt; @exchange, :status =&gt; :connected, :stop =&gt; nil))
         @queue.stub!(:subscribe).and_yield(@msg)
-        @handler = mock(&quot;handler&quot;, :on_message =&gt; true, :destination =&gt; :foo)
+        @handler = mock(&quot;handler&quot;, :handle_message =&gt; true, :destination =&gt; :foo)
         @exchange = SynchExchange::DirectExchange.new({:user =&gt; 'user', :password =&gt; 'pass', :host =&gt; 'host', :opts =&gt; {:vhost =&gt; &quot;foo&quot;}})
       end
 
@@ -170,7 +169,7 @@ module RosettaQueue::Gateway
         @queue = mock(&quot;Bunny::Queue&quot;, :pop =&gt; @msg, :bind =&gt; @bound_queue = mock(&quot;Bunny::Queue&quot;, :pop =&gt; @msg), :publish =&gt; true, :unbind =&gt; true)
         Bunny.stub!(:new).and_return(@conn = mock(&quot;Bunny::Client&quot;, :queue =&gt; @queue, :exchange =&gt; @exchange, :status =&gt; :connected))
         @queue.stub!(:subscribe).and_yield(@msg)
-        @handler = mock(&quot;handler&quot;, :on_message =&gt; true, :destination =&gt; :foo, :options =&gt; {:durable =&gt; false})
+        @handler = mock(&quot;handler&quot;, :handle_message =&gt; true, :destination =&gt; :foo, :options =&gt; {:durable =&gt; false})
       end
 
       def do_receiving_exchange
@@ -214,7 +213,7 @@ module RosettaQueue::Gateway
 
         it &quot;should forward the message body onto the handler&quot; do
           when_receiving_exchange {
-            @handler.should_receive(:on_message).with(&quot;Hello World!&quot;)
+            @handler.should_receive(:handle_message).with(&quot;Hello World!&quot;)
           }
         end
 </diff>
      <filename>spec/rosetta_queue/adapters/amqp_synchronous_spec.rb</filename>
    </modified>
    <modified>
      <diff>@@ -9,7 +9,7 @@ module RosettaQueue
 
       before(:each) do
         @msg = &quot;Hello World!&quot;
-        @handler = mock('handler', :on_message =&gt; &quot;&quot;, :destination =&gt; :foo)
+        @handler = mock('handler', :handle_message =&gt; &quot;&quot;, :destination =&gt; :foo)
         @msg_obj = mock(&quot;message&quot;, :body =&gt; @msg, :delete =&gt; true)
         @conn = mock(&quot;Beanstalk::Pool&quot;, :put =&gt; true, :reserve =&gt; @msg_obj)
         ::Beanstalk::Pool.stub!(:new).and_return(@conn)</diff>
      <filename>spec/rosetta_queue/adapters/beanstalk_spec.rb</filename>
    </modified>
    <modified>
      <diff>@@ -9,7 +9,7 @@ module RosettaQueue
 
       before(:each) do
         @msg = &quot;Hello World!&quot;
-        @handler = mock('handler', :destination =&gt; :foo, :options_hash =&gt; {:persistent =&gt; false, :ack =&gt; &quot;client&quot;}, :on_message =&gt; &quot;&quot;)
+        @handler = mock('handler', :destination =&gt; :foo, :options_hash =&gt; {:persistent =&gt; false, :ack =&gt; &quot;client&quot;}, :handle_message =&gt; &quot;&quot;)
         @msg_obj = mock(&quot;message&quot;, :body =&gt; @msg, :headers =&gt; {&quot;message-id&quot; =&gt; 2})
         @conn = mock(&quot;Stomp::Connection&quot;, :ack =&gt; true, :send =&gt; true, :subscribe =&gt; true, :receive =&gt; @msg_obj, :unsubscribe =&gt; true, :disconnect =&gt; true)
         ::Stomp::Connection.stub!(:open).and_return(@conn)
@@ -69,7 +69,7 @@ module RosettaQueue
         describe &quot;no ack&quot; do
 
           before(:each) do
-            @handler = mock('handler', :destination =&gt; :foo, :options_hash =&gt; {:persistent =&gt; false}, :on_message =&gt; &quot;&quot;)
+            @handler = mock('handler', :destination =&gt; :foo, :options_hash =&gt; {:persistent =&gt; false}, :handle_message =&gt; &quot;&quot;)
           end
 
           it &quot;should not acknowledge client&quot; do</diff>
      <filename>spec/rosetta_queue/adapters/stomp_spec.rb</filename>
    </modified>
    <modified>
      <diff>@@ -22,6 +22,19 @@ module RosettaQueue
       end
     end
 
+    ['sending', 'receiving'].each do |action|
+      describe &quot;#safe_process_#{action}&quot; do
+        it &quot;returns the orginal message if an exception occurs while filtering&quot; do
+          Filters.define do |f|
+            f.send(action) { |message| raise &quot;foo&quot; }
+          end
+
+          Filters.send(&quot;safe_process_#{action}&quot;, &quot;Bar&quot;).should == &quot;Bar&quot;
+        end
+
+      end
+    end
+
 
     describe &quot;#process_sending&quot; do
       it &quot;should process the passed in message with the defined sending filter&quot; do</diff>
      <filename>spec/rosetta_queue/filters_spec.rb</filename>
    </modified>
    <modified>
      <diff>@@ -30,12 +30,6 @@ module RosettaQueue
         RosettaQueue::Adapter.stub!(:instance).and_return(@adapter)
       end
 
-      # it &quot;should look up the destination defined on the class&quot; do
-      #   Destinations.should_receive(:lookup).with(:test_queue).and_return(&quot;/queue/test_queue&quot;)
-      #   # when
-      #    @gateway.publish('some message')
-      # end
-
       it &quot;should publish messages to queue with the options defined in the class&quot; do
         # TO DO: REFACTOR #publish METHOD SO THAT YOU PASS IN MESSAGE HANDLER AS WITH CONSUMER
         pending
@@ -48,18 +42,35 @@ module RosettaQueue
     end
 
     describe &quot;.publish&quot; do
-      # it &quot;should look up the destination defined on the class&quot; do
-      #   Destinations.should_receive(:lookup).with(:test_queue).and_return(&quot;/queue/test_queue&quot;)
-      #   # when
-      #   Producer.publish(:test_queue, &quot;blah&quot;)
-      # end
-
       it &quot;should send the message to the adpater along with the options&quot; do
         # expect
         @adapter.should_receive(:send_message).with(&quot;/queue/test_queue&quot;, &quot;Hello World!&quot;, {:persistent =&gt; true})
         # when
         Producer.publish(:test_queue, &quot;Hello World!&quot;, {:persistent =&gt; true})
       end
+
+      it &quot;delgates exception handling to the ExceptionHandler for :publishing&quot; do
+        ExceptionHandler.should_receive(:handle).with(:publishing, anything)
+        Producer.publish(:test_queue, &quot;Hello World!&quot;, {:persistent =&gt; true})
+      end
+
+      it &quot;wraps the publishing in an ExceptionHandler::handler block&quot; do
+        @adapter.should_not_receive(:send_message)
+        ExceptionHandler.stub!(:handle).and_return(&quot;I was wrapped&quot;)
+        Producer.publish(:test_queue, &quot;m&quot;).should == &quot;I was wrapped&quot;
+      end
+
+      it &quot;provides additional message information to the ExceptionHandler&quot; do
+        ExceptionHandler.should_receive(:handle).with do |_, hash_proc|
+          hash_proc.call.should == {
+          :message =&gt; &quot;message&quot;,
+          :action =&gt; :publishing,
+          :destination =&gt; :test_queue,
+          :options =&gt; {:persistent =&gt; true}}
+        end
+        Producer.publish(:test_queue, &quot;message&quot;, {:persistent =&gt; true})
+      end
+
     end
 
   end</diff>
      <filename>spec/rosetta_queue/producer_spec.rb</filename>
    </modified>
  </modified>
  <removed type="array">
    <removed>
      <filename>features/support/tmp/barconsumer.log</filename>
    </removed>
    <removed>
      <filename>features/support/tmp/fooconsumer.log</filename>
    </removed>
    <removed>
      <filename>features/support/tmp/point-to-point.log</filename>
    </removed>
    <removed>
      <filename>features/support/tmp/pub-sub.log</filename>
    </removed>
  </removed>
  <parents type="array">
    <parent>
      <id>896758e1eaac8e7731c544e566d8c02ad510cea9</id>
    </parent>
  </parents>
  <author>
    <name>Ben Mabey</name>
    <email>ben@benmabey.com</email>
  </author>
  <url>http://github.com/bmabey/rosetta_queue/commit/944c9778bcecbbeeb5040fd5b476b535d91fdcca</url>
  <id>944c9778bcecbbeeb5040fd5b476b535d91fdcca</id>
  <committed-date>2009-09-28T15:18:05-07:00</committed-date>
  <authored-date>2009-09-28T15:12:33-07:00</authored-date>
  <message>Allowing registration of custom exception handling.

See lib/rosetta_queue/exception_handler.rb for more info.
This removes some default exception handling that was
present in RQ prior to this for publishing and consuming.

Since every app tends to want to handle exceptions differently
for messaging but are generally pretty consistent across
consumers/publishers this feature gives the needed flexibility
and helps reduce duplication within apps.</message>
  <tree>a051043428b040e765f9728c9d15734bc689c0f2</tree>
  <committer>
    <name>Ben Mabey</name>
    <email>ben@benmabey.com</email>
  </committer>
</commit>
