Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RestartSource.onFailuresWithBackoff should support failing on (custom) fatal errors #30859

Closed
agemooij opened this issue Nov 5, 2021 · 7 comments
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted help wanted Issues that the core team will likely not have time to work on t:stream
Milestone

Comments

@agemooij
Copy link
Contributor

agemooij commented Nov 5, 2021

I have several use cases that generally run as follows:

  • we retrieve a valid OAuth token for some API
  • we start a never ending stream to fetch data from that API (using akka-http) to stay in sync
  • when we run into most errors, it should just restart with backoff, even if it takes a long time (e.g. API/network downtime)
  • but when for instance some external factor invalidates the OAuth token, our stream starts continuously failing with 401 or 403 responses.

In such special error cases the stream should clearly be stopped since it is no use to continue, e.g. we have encountered a fatal error scenario that needs to be handled with some other behavior than mindless restarting the stream.

We currently solve this issue using custom graph stages because, as far as we can work out, there is no version of RestartSource (or RestartFlow) that supports retrying only on some errors but failing on others.

It would be great if the various .onFailuresWithBackoff methods (or a new method) would support some way to express which errors should be retried and which ones should lead to failure.

@johanandren
Copy link
Member

Sounds like it a "decider" would be a useful addition to both those operators. Should probably go in the RestartSettings rather than be an additional overload.

@johanandren johanandren added 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted help wanted Issues that the core team will likely not have time to work on t:stream labels Nov 8, 2021
@agemooij
Copy link
Contributor Author

Sounds like it a "decider" would be a useful addition to both those operators. Should probably go in the RestartSettings rather than be an additional overload.

Yes, that makes sense. An additional overload would indeed make it quite hard to read/use.

The implementation of this in RestartWithBackoffLogic has been a bit of a barrier for us to come up with a PR. The code is not that easy to understand and uses "sub inlets/outlets" and a subFusingMaterializer that are unfamiliar to us.

@agemooij
Copy link
Contributor Author

agemooij commented Nov 10, 2021

But looking at it with fresh eyes, would it be correct to say that if we can get a "decider" down to the onUpstreamFailure handler in RestartWithBackoffLogic, then we would only have to change the logic there to use that decider to either fail or schedule a timer?

So something like this:

      override def onUpstreamFailure(ex: Throwable) = {
        if (finishing || maxRestartsReached() || decider.alwaysFailOn(ex)) {
          fail(out, ex)
        } else {
          logIt(s"Restarting stream due to failure [${restartCount + 1}]: $ex", OptionVal.Some(ex))
          scheduleRestartTimer()
        }
      }

If that is the only code that would have to change than I could work on an initial PR for that.

@johanandren
Copy link
Member

Sounds about right, looks like there is corresponding in both the out handler and the in handler, so probably at least both those places would need to invoke a decider.

@agemooij
Copy link
Contributor Author

OK. I think it would be better to create a new Decider, inspired by the one in akka.stream.Supervision rather than using the existing one, which wouldn't quite match the semantics, e.g. the only options should be Restart and Fail.

@johanandren
Copy link
Member

I agree, perhaps even a simple lambda returning boolean rather than encoding the return value as an ADT, something like restartOn(ex) with a default true for all.

@dwickern
Copy link
Contributor

You can work around this somewhat by wrapping the stream elements in a Try. Emit any "fatal" errors as a Failure and then unwrap them outside the RestartSource to cancel it from downstream.

RestartSource.onFailuresWithBackoff(restartSettings) { () =>
  Source(...).map(Success(_)).recover {
    case ex: InvalidOAuthTokenException => Failure(ex)
  }
}.map(_.get)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted help wanted Issues that the core team will likely not have time to work on t:stream
Projects
None yet
Development

No branches or pull requests

4 participants