Skip to content

Commit

Permalink
Restart of outbound stream if upstream completed (unexpectedly), #31081
Browse files Browse the repository at this point in the history
* In the logs attached to the issue we can see that an outbound connection is not
  re-established after "Upstream finished" (broken pipe).
* Normally that is handled by the inner RestartFlow around the connection flow, but
  if that has reached it's maxRestarts (3) it will complete the entire stream and
  attachOutboundStreamRestart would not handle that as a restart case.
  • Loading branch information
patriknw committed Mar 10, 2022
1 parent cc762a8 commit f72d448
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions akka-remote/src/main/scala/akka/remote/artery/Association.scala
Expand Up @@ -958,9 +958,14 @@ private[remote] class Association(

implicit val ec = materializer.executionContext
streamCompleted.foreach { _ =>
// shutdown as expected
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
materializing.countDown()
if (transport.isShutdown || isRemovedAfterQuarantined()) {
// shutdown as expected
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
materializing.countDown()
} else {
log.debug("{} to [{}] was completed. It will be restarted if used again.", streamName, remoteAddress)
lazyRestart()
}
}
streamCompleted.failed.foreach {
case ArteryTransport.ShutdownSignal =>
Expand Down Expand Up @@ -1034,7 +1039,7 @@ private[remote] class Association(
} else {
log.error(
cause,
s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}",
s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}",
streamName,
remoteAddress,
advancedSettings.OutboundMaxRestarts,
Expand Down

0 comments on commit f72d448

Please sign in to comment.