New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discussion: Fault tolerance for errors on event handlers #111

Open
andrzejsliwa opened this Issue Sep 15, 2017 · 13 comments

Comments

3 participants
@andrzejsliwa
Copy link
Member

andrzejsliwa commented Sep 15, 2017

Current implementation of PubSub Broker is not fault tolerant, which means that if one error handler will fail the "broadcasting" will stop processing event by other handlers which were added after failing one.

https://github.com/RailsEventStore/rails_event_store/blob/master/ruby_event_store/lib/ruby_event_store/pub_sub/broker.rb#L26

Because of current implementation of the broker I had to install such error isolation layer:

Configuration:

...
  def setup_event_handler_strategy
    Rails.configuration.event_handler_strategy =
      %w[development test].include?(Rails.env) ? :throw : :notify
  end
....

Usage of Error handling strategy:

module Infra
  module EventHandlers
    include Contracts::Core
    include Contracts::Builtin

    class UnknownHandlerStrategy < StandardError
    end

    Contract Config => Any
    def connect(event_handler_config)
      @configs ||= []
      @configs << event_handler_config.handlers
    end

    Contract KeywordArgs[event_store: RailsEventStore::Client,
                         event_handler_error_strategy: Optional[EventHandlerErrorStrategy]] => Any
    def register(event_store: Rails.configuration.core.event_store,
                 event_handler_error_strategy: EventHandlerErrorStrategy.new)
      configs.flatten.each do |c|
        events  = c.fetch(:events)
        handler = event_handler_error_strategy.build_error_handler(c.fetch(:handler), events)
        event_store.subscribe(handler, events)
      end
    end

    attr_reader :configs
  end
end

Implementation of Error handling strategy :

module Infra
  class EventHandlerErrorStrategy
    def initialize(event_handler_strategy: Rails.configuration.event_handler_strategy)
      @strategy = event_handler_strategy
    end

    def build_error_handler(handler, events)
      send_handler_info(handler, events)
      case strategy
      when :notify
        ->(event) { notify_error_handler(handler, event) }
      when :throw
        ->(event) { handler.call(event) }
      else
        raise UnknownHandlerStrategy
      end
    end

    private

    def send_handler_info(handler, events)
      for_events    = "  - for events: #{events.inspect}"
      with_strategy = "  - with '#{strategy}' strategy"
      message = "event handler registered #{handler}\n#{for_events}\n#{with_strategy}"
      if %w[development test].include?(Rails.env)
        puts message
      else
        Rails.logger.info message
      end
    end

    def notify_error_handler(handler, event)
      handler.call(event)
    rescue StandardError => ex
      Airbrake.notify(ex)
    end

    attr_reader :strategy
  end
end
@andrzejsliwa

This comment has been minimized.

Copy link
Member Author

andrzejsliwa commented Sep 15, 2017

I would like to discuss here how to apply this or similar design to base gem.

@paneq

This comment has been minimized.

Copy link
Member

paneq commented Sep 15, 2017

Current implementation of PubSub Broker is not fault tolerant, which means that if one error handler will fail the "broadcasting" will stop processing event by other handlers which were added after failing one.

That's correct and on purpose. It's up to the developers of sync handlers to decide if in case of an error they should swallow it and let the rest of process proceed or propagate and undo whole operation (this might be especially useful in legacy apps where you extract side-effects into handlers but you want to keep the whole process still fully transactional)

@paneq

This comment has been minimized.

Copy link
Member

paneq commented Sep 15, 2017

I would like to discuss here how to apply this or similar design to base gem.

Current status more or less:

module RailsEventStore
  Event                     = RubyEventStore::Event
  InMemoryRepository        = RubyEventStore::InMemoryRepository
  EventBroker               = RubyEventStore::PubSub::Broker
module RailsEventStore
  class Client < RubyEventStore::Client
    def initialize(repository: RailsEventStore.event_repository,
                   event_broker: EventBroker.new,
RailsEventStore.event_repository = RailsEventStoreActiveRecord::EventRepository.new
module RubyEventStore
  module PubSub
    class Broker
      DEFAULT_DISPATCHER = Dispatcher.new

      def initialize(dispatcher: DEFAULT_DISPATCHER)
        # ...
      end

      def notify_subscribers(event)
        all_subscribers_for(event.class).each do |subscriber|
          dispatcher.call(subscriber, event)
        end
      end
module RubyEventStore
  module PubSub
    class Broker
      class Dispatcher
        def call(subscriber, event)
          ensure_method_defined(subscriber)
          subscriber.call(event)
        end

solution 1

How about:

  • inheriting from a Broker and defining new notify_subscribers method in which exceptions from dispatcher.call(subscriber, event) are rescued
class SwallowExceptionsBroker < Broker
      def notify_subscribers(event)
        all_subscribers_for(event.class).each do |subscriber|
          begin
            dispatcher.call(subscriber, event)
          rescue => e
            AirBrake.notify(e)
          end
        end
    end
end

and

RailsEventStore.event_repository = RailsEventStoreActiveRecord::EventRepository.new(event_broker: SwallowExceptionsBroker.new)

It could be good to extract dispatcher.call(subscriber, event) into it's own method to make it easier and only change this one call.

solution 2

Alternatively I would look into whether you can provide your own dispatcher to a broker:

module RubyEventStore
  module PubSub
    class Broker
      class Dispatcher
        def call(subscriber, event)
          ensure_method_defined(subscriber)
          subscriber.call(event)
        end

It seems that you would need to only inherit and catch exceptions in call and do:

RailsEventStore.event_repository = RailsEventStoreActiveRecord::EventRepository.new(
  event_broker: RailsEventStore::EventBroker.new(MySwallowDispatcher.new)
)

What do you think?

@andrzejsliwa

This comment has been minimized.

Copy link
Member Author

andrzejsliwa commented Sep 18, 2017

@paneq I like solution 1, but advantage of my design is that I can define error handling strategy per event handler, have them configured depending from requirements, replacing Broker is to much global in his range (have impact on whole application in our case, I'm avoid having multiple instance of event_stores, command_buses etc. Thats why I have one CoreConfiguration in application.rb:

    config.to_prepare do
      Rails.configuration.core = CoreConfiguration.new
    end

this lets me safely reload whole configuration without risk that handlers were registered multiple times etc...

@paneq

This comment has been minimized.

Copy link
Member

paneq commented Sep 18, 2017

I would like to discuss here how to apply this or similar design to base gem.

What API would you like to see? I think this is a good starting point to imagine a new feature. Then you can think later about how to achieve it.

event_store.subscribe(handler, events, on_exception: :notify)
event_store.subscribe(handler, events, on_exception: :raise)

Does that look good for sync handlers?

What about async handlers? #103

@andrzejsliwa

This comment has been minimized.

Copy link
Member Author

andrzejsliwa commented Sep 18, 2017

what about:

event_store.subscribe(handler, events, on_exception: some_lambda_or_proc) 
# or  
event_store.subscribe(handler, events, on_exception: ErrorHandling.method(:notify)) 

To keep it open for extension

@andrzejsliwa

This comment has been minimized.

Copy link
Member Author

andrzejsliwa commented Sep 18, 2017

What about async handlers? #103

In case of implementing it as ActiveJob, we don't need anything more than standard way of dealing with it by providers of implementation. In my case I'm using https://github.com/tawan/active-elastic-job, which will just put failing message in dead letter queue on SQS, with notifications on Newrelic and Airbrake

@paneq

This comment has been minimized.

Copy link
Member

paneq commented Sep 18, 2017

I wonder what's the benefit of

event_store.subscribe(handler, events, on_exception: some_lambda_or_proc)

over

module NotifyOnException
  def call(event)
    super
  rescue => e
    # raise (default so not really needed)
    # or
    Airbrake.notify(e)
  end
end

and doing:

handler.prepend(NotifyOnException)

but maybe the convenience is worth it.

@paneq

This comment has been minimized.

Copy link
Member

paneq commented Sep 18, 2017

It would be nice if the exception handler was provided with exception, event, handler probably.

@andrzejsliwa

This comment has been minimized.

Copy link
Member Author

andrzejsliwa commented Sep 18, 2017

It would be nice if the exception handler was provided with exception, event, handler probably.

would be great to just implement it as decorator, with having full control over how to rescue and recover (local or distributed transactions handling, notifications or even emitting new event)

Some people recommending emting Events also on exceptions/errors in system, because everything going then to the source of true as series of events (which brings me to the parent_id: would be great to start linking events together, to improve insights)

@paneq

This comment has been minimized.

Copy link
Member

paneq commented Sep 18, 2017

which brings me to the parent_id: would be great to start linking events together, to improve insights

You can put correlation_id into Event's metadata for that. I remember that @pawelpacana and @mpraglowski were working on putting request.id in event's metadata but I am not sure if that's finished.

module RailsEventStore
  class Middleware
    def initialize(app, request_metadata_proc)
      @app = app
      @request_metadata_proc = request_metadata_proc
    end

    def call(env)
      Thread.current[:rails_event_store] = @request_metadata_proc.(env)
      @app.call(env)
    ensure
      Thread.current[:rails_event_store] = nil
    end
  end
end
module RailsEventStore
  class Client < RubyEventStore::Client
    def initialize(repository: RailsEventStore.event_repository,
                   event_broker: EventBroker.new,
                   page_size: PAGE_SIZE)
      capture_metadata = ->{ Thread.current[:rails_event_store] }
      super(repository: repository,
            event_broker: event_broker,
            page_size: page_size,
            metadata_proc: capture_metadata)
    end
  end
end
@paneq

This comment has been minimized.

Copy link
Member

paneq commented Sep 18, 2017

I think it is

require 'rails/railtie'
require 'rails_event_store/middleware'

module RailsEventStore
  class Railtie < ::Rails::Railtie
    initializer 'rails_event_store.middleware' do |rails|
      rails.middleware.use(::RailsEventStore::Middleware, RailsConfig.new(rails.config).request_metadata)
    end

    class RailsConfig
      def initialize(config)
        @config = config
      end

      def request_metadata
        return default_request_metadata unless @config.respond_to?(:rails_event_store)
        @config.rails_event_store.fetch(:request_metadata, default_request_metadata)
      end

      private
      def default_request_metadata
        ->(env) do
          request = ActionDispatch::Request.new(env)
          { remote_ip:  request.remote_ip,
            request_id: request.uuid,
          }
        end
      end
    end
  end
end
@paneq

This comment has been minimized.

Copy link
Member

paneq commented Sep 18, 2017

would be great to just implement it as decorator, with having full control over how to rescue and recover (local or distributed transactions handling, notifications or even emitting new event)

PR welcomed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment