Skip to content

Commit

Permalink
Merge pull request http4s/http4s#6250 from armanbilge/integration/0.2…
Browse files Browse the repository at this point in the history
…3-main-20220405

Merge 0.23 -> main
  • Loading branch information
rossabaker committed Apr 5, 2022
2 parents 0217193 + 6517461 commit 61c6987
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 33 deletions.
Expand Up @@ -260,23 +260,9 @@ private final class Http1Connection[F[_]](
idleTimeoutS: F[Either[Throwable, Unit]],
idleRead: Option[Future[ByteBuffer]],
): F[Response[F]] =
F.async[Response[F]] { cb =>
F.delay {
idleRead match {
case Some(read) =>
handleRead(read, cb, closeOnFinish, doesntHaveBody, "Initial Read", idleTimeoutS)
case None =>
handleRead(
channelRead(),
cb,
closeOnFinish,
doesntHaveBody,
"Initial Read",
idleTimeoutS,
)
}
None
}
F.async_[Response[F]] { cb =>
val read = idleRead.getOrElse(channelRead())
handleRead(read, cb, closeOnFinish, doesntHaveBody, "Initial Read", idleTimeoutS)
}

// this method will get some data, and try to continue parsing using the implicit ec
Expand Down
Expand Up @@ -90,18 +90,25 @@ private[http4s] final class IdleTimeoutStage[A](
@tailrec def go(): Unit =
timeoutState.get() match {
case Disabled =>
val newCancel = exec.schedule(timeoutTask, timeout)
if (timeoutState.compareAndSet(Disabled, Enabled(timeoutTask, newCancel))) ()
else {
newCancel.cancel()
go()
tryScheduling(timeoutTask) match {
case Some(newCancel) =>
if (timeoutState.compareAndSet(Disabled, Enabled(timeoutTask, newCancel))) ()
else {
newCancel.cancel()
go()
}
case None => ()
}
case old @ Enabled(_, oldCancel) =>
val newCancel = exec.schedule(timeoutTask, timeout)
if (timeoutState.compareAndSet(old, Enabled(timeoutTask, newCancel))) oldCancel.cancel()
else {
newCancel.cancel()
go()
tryScheduling(timeoutTask) match {
case Some(newCancel) =>
if (timeoutState.compareAndSet(old, Enabled(timeoutTask, newCancel)))
oldCancel.cancel()
else {
newCancel.cancel()
go()
}
case None => ()
}
case _ => ()
}
Expand All @@ -112,18 +119,21 @@ private[http4s] final class IdleTimeoutStage[A](
@tailrec private def resetTimeout(): Unit =
timeoutState.get() match {
case old @ Enabled(timeoutTask, oldCancel) =>
val newCancel = exec.schedule(timeoutTask, timeout)
if (timeoutState.compareAndSet(old, Enabled(timeoutTask, newCancel))) oldCancel.cancel()
else {
newCancel.cancel()
resetTimeout()
tryScheduling(timeoutTask) match {
case Some(newCancel) =>
if (timeoutState.compareAndSet(old, Enabled(timeoutTask, newCancel))) oldCancel.cancel()
else {
newCancel.cancel()
resetTimeout()
}
case None => ()
}
case _ => ()
}

@tailrec def cancelTimeout(): Unit =
timeoutState.get() match {
case old @ IdleTimeoutStage.Enabled(_, cancel) =>
case old @ Enabled(_, cancel) =>
if (timeoutState.compareAndSet(old, Disabled)) cancel.cancel()
else cancelTimeout()
case _ => ()
Expand Down

0 comments on commit 61c6987

Please sign in to comment.