Skip to content

Commit

Permalink
fix: Avoid turning all stream timeouts to TcpIdleTimeoutException
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Aug 16, 2023
1 parent 9745f88 commit 159717b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 18 deletions.
8 changes: 6 additions & 2 deletions akka-stream/src/main/scala/akka/stream/impl/Timers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,11 @@ import akka.stream.stage._

}

final class IdleTimeoutBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] {
final class IdleTimeoutBidi[I, O](
val timeout: FiniteDuration,
createFailure: FiniteDuration => Throwable = timeout =>
new StreamIdleTimeoutException(s"No elements passed in the last ${timeout.toCoarsest}."))
extends GraphStage[BidiShape[I, I, O, O]] {
val in1 = Inlet[I]("in1")
val in2 = Inlet[O]("in2")
val out1 = Outlet[I]("out1")
Expand All @@ -170,7 +174,7 @@ import akka.stream.stage._

final override def onTimer(key: Any): Unit =
if (nextDeadline - System.nanoTime < 0)
failStage(new StreamIdleTimeoutException(s"No elements passed in the last ${timeout.toCoarsest}."))
failStage(createFailure(timeout))

override def preStart(): Unit =
scheduleWithFixedDelay(GraphStageLogicTimer, timeoutCheckInterval(timeout), timeoutCheckInterval(timeout))
Expand Down
24 changes: 8 additions & 16 deletions akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@
package akka.stream.impl.io

import java.net.InetSocketAddress
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }

import scala.annotation.nowarn
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.{ Duration, FiniteDuration }

import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Terminated }
import akka.annotation.InternalApi
Expand All @@ -22,6 +19,7 @@ import akka.io.Tcp
import akka.io.Tcp._
import akka.stream._
import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.impl.Timers
import akka.stream.impl.fusing.GraphStages.detacher
import akka.stream.scaladsl.{ BidiFlow, Flow, TcpIdleTimeoutException, Tcp => StreamTcp }
import akka.stream.scaladsl.Tcp.{ OutgoingConnection, ServerBinding }
Expand Down Expand Up @@ -593,19 +591,13 @@ private[stream] object ConnectionSourceStage {
case Some(address) => s" on connection to [$address]"
case _ => ""
}
BidiFlow.fromGraph(
new Timers.IdleTimeoutBidi(
idleTimeout,
createFailure = _ =>
new TcpIdleTimeoutException(
s"TCP idle-timeout encountered$connectionToString, no bytes passed in the last $idleTimeout",
idleTimeout)))

val toNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] =
BidiFlow.fromFlows(
Flow[ByteString].mapError {
case _: TimeoutException =>
new TcpIdleTimeoutException(
s"TCP idle-timeout encountered$connectionToString, no bytes passed in the last $idleTimeout",
idleTimeout)
},
Flow[ByteString])
val fromNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] =
toNetTimeout.reversed // now the bottom flow transforms the exception, the top one doesn't (since that one is "fromNet")

fromNetTimeout.atop(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](idleTimeout)).atop(toNetTimeout)
}
}

0 comments on commit 159717b

Please sign in to comment.