Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESPRunner hook for responding to event processor failures #216

Merged
merged 10 commits into from Jul 10, 2019

Conversation

Projects
None yet
7 participants
@orien
Copy link
Member

commented Jul 9, 2019

We're encountering a problem when running our event processors via the ESPRunner. When one of the event processors running in a child process fails and terminates prematurely, the ESPRunner just ignores the problem. Eventually, our event processor lag monitor will raise the alert to the on-call developer, who in turn can manually restart the ESPRunner.

The process status list looks something like this:

> ps aux
USER     PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
app        1  0.0  1.6  90992 65204 ?        Ssl  03:40   0:02 ruby /app/script/processors
app        2  0.1  1.7  93624 68692 ?        Sl   03:40   0:04 MyApp::Queries::Projector1
app        3  0.0  0.0      0     0 ?        Z    03:40   0:03 [ruby] <defunct>
app        4  0.1  1.6  93640 67660 ?        Sl   03:40   0:05 MyApp::Queries::Projector3

We explored adding an extra option where the ESPRunner would shutdown in such a scenario in #215.

Change

This PR proposes a more extensible solution: provide a hook for the event processor failure. This'll allow teams to choose and implement an appropriate response as they see fit. Here're a few examples:

Report to Rollbar

EventSourcery::EventProcessing::ESPRunner.new(
  event_processors: processors,
  event_source: source,
  after_subprocess_termination: proc do |processor:, runner:, exit_status:|
    if exit_status != 0
      Rollbar.error("Processor #{processor.processor_name} "\
                    "terminated with exit status #{exit_status}")
    end
  end
).start!

Shutdown the ESPRunner

EventSourcery::EventProcessing::ESPRunner.new(
  event_processors: processors,
  event_source: source,
  after_subprocess_termination: proc do |processor:, runner:, exit_status:|
    runner.shutdown
  end
).start!

Restart the event processor

EventSourcery::EventProcessing::ESPRunner.new(
  event_processors: processors,
  event_source: source,
  after_subprocess_termination: proc do |processor:, runner:, exit_status:|
    runner.start_processor(processor) unless runner.shutdown_requested?
  end
).start!

def shutdown_requested?
@shutdown_requested
end

This comment has been minimized.

Copy link
@orien

orien Jul 9, 2019

Author Member

These three methods have been made public to provide options for responding to event processor failure.

Process.kill(signal, *@pids) unless all_processes_terminated?
return if all_processes_terminated?

logger.info("ESPRunner: Sending #{signal} to [#{@pids.values.map(&:processor_name).join(', ')}]")

This comment has been minimized.

Copy link
@orien

orien Jul 9, 2019

Author Member

Log a message when sending signals to subprocesses.

@exit_status &&= !!status.success?
logger.info("ESPRunner: Process #{event_processor.processor_name} terminated with exit status: #{status.exitstatus.inspect}")

This comment has been minimized.

Copy link
@orien

orien Jul 9, 2019

Author Member

Log a message when a subprocess terminates.

@exit_status &&= !!status.success?
logger.info("ESPRunner: Process #{event_processor.processor_name} terminated with exit status: #{status.exitstatus.inspect}")
@after_subprocess_termination&.call(processor: event_processor, runner: self, exit_status: status.exitstatus)

This comment has been minimized.

Copy link
@orien

orien Jul 9, 2019

Author Member

Call the after_subprocess_termination when a subprocess terminates.

@orien orien requested review from stevehodgkiss, joesustaric and mjward Jul 9, 2019

Restructure loop to better reveal intent
We want to ensure it yields at least once before the break condition.
@stevehodgkiss
Copy link
Member

left a comment

Nice 👏

@mjward

mjward approved these changes Jul 9, 2019

Copy link
Collaborator

left a comment

Looks great

@scottyp-env
Copy link

left a comment

Nice! Love the approach of leaving the action up to the project using the ESPRunner

@ricobl

ricobl approved these changes Jul 10, 2019

Copy link

left a comment

Awesome! Thanks @orien!

end

# Start each Event Stream Processor in a new child process.
def start!
with_logging do
start_processes
listen_for_shutdown_signals
wait_till_shutdown_requested
record_terminated_processes
while_waiting_for_shutdown do

This comment has been minimized.

Copy link
@joesustaric

joesustaric Jul 10, 2019

Contributor

why have you chosen to supply a block to this method?
In terms of reading the code it makes sense.
My only comment (I'm quite sure this is a nitpick) is that if this is the only block being passed to this method.. why can you just replace the yield with record_terminated_processes?

This comment has been minimized.

Copy link
@joesustaric

joesustaric Jul 10, 2019

Contributor

humm to be honest I think it's ok like this.. I think it reads better.. you would have to change the method name to something like
record_terminated_processes_while_waiting_for_shutdown might be a little too verbose.

This comment has been minimized.

Copy link
@orien

orien Jul 10, 2019

Author Member

Yeah, that'd work. I'm aiming to have the contents of this start! method to provide an understandable overview. To do this I try to provide a consistent level of abstraction with the statements. Were we to replace yield with record_terminated_processes, I'd feel this should be communicated via the method name. So I'd probably come up with something like record_terminated_processes_while_waiting_for_shutdown. Which isn't too bad, but perhaps a bit long. That would be how I'd do it in a language that doesn't support blocks. I like how using the block it hints that record_terminated_processes is (could be) called multiple times. Being able to do this is one of the nice things about Ruby.

This comment has been minimized.

Copy link
@orien

orien Jul 10, 2019

Author Member

I'm interested in your reaction though. Is it confusing? Convoluted?

This comment has been minimized.

Copy link
@joesustaric

joesustaric Jul 10, 2019

Contributor

No certainly not confusing or convoluted. It was just a comment regarding if the concern of recording the terminated processes, if it can just belong to while_waiting_for_shutdown or not but I certainly think the way it is, is fine and reads well. all good 👍

This comment has been minimized.

Copy link
@orien

orien Jul 10, 2019

Author Member

Cheers.

@@ -36,7 +85,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [0.20.0] - 2018-06-21

This comment has been minimized.

Copy link
@joesustaric

joesustaric Jul 10, 2019

Contributor

This library should totally be at v1 by now :italian-hand:

@@ -36,7 +85,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [0.20.0] - 2018-06-21
### Changed
- Changed signature of `ESPProcess#initialize` to include a default value for `after_fork`. This prevents the
- Changed signature of `ESPProcess#initialize` to include a default value for `after_fork`. This prevents the

This comment has been minimized.

Copy link
@joesustaric

joesustaric Jul 10, 2019

Contributor

Just sanity checking.. this does not break the API for existing code... because we are supplying default values

@joesustaric
Copy link
Contributor

left a comment

Totally agree this is a nicererer option. great work @orien 🏆

@orien orien merged commit 8dd8ccf into master Jul 10, 2019

2 checks passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details

@orien orien deleted the orien/process-management-hooks branch Jul 10, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.