Skip to content

Commit

Permalink
Improved error propagation in ConnectionState
Browse files Browse the repository at this point in the history
ConnectionState stores failures where possible, and uses existing
exception instead of always raising TerminalState.
This improves user experience.
  • Loading branch information
hnaderi committed Apr 6, 2023
1 parent c583a12 commit 88a509f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 7 deletions.
9 changes: 6 additions & 3 deletions modules/client/src/main/scala/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ package lepus.client

import cats.effect.*
import cats.effect.implicits.*
import cats.effect.kernel.Resource.ExitCase.*
import cats.effect.std.Queue
import cats.effect.std.QueueSink
import cats.effect.std.QueueSource
import cats.implicits.*
import fs2.Pipe
import fs2.Stream
Expand Down Expand Up @@ -115,7 +114,11 @@ object Connection {
}
case m: Frame.Method => dispatcher.invoke(m)
case Frame.Heartbeat => state.onHeartbeat
}.onFinalize(state.onClosed).interruptWhen(state.whenClosed)
}.onFinalizeCase {
case Succeeded => state.onClosed
case Errored(e) => state.onFailed(e)
case Canceled => state.onClosed
}.interruptWhen(state.whenClosed)

private[client] def lifetime[F[_]: Temporal](
config: F[NegotiatedConfig],
Expand Down
11 changes: 7 additions & 4 deletions modules/client/src/main/scala/internal/ConnectionState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ import lepus.protocol.Frame
import lepus.protocol.constants.ReplyCode
import lepus.protocol.domains.*

import ConnectionState.TerminalState

private[client] trait ConnectionState[F[_]] extends Signal[F, Status] {
def onConnected(config: NegotiatedConfig): F[Unit]
def onOpened: F[Unit]
def onClosed: F[Unit]
def onFailed(ex: Throwable): F[Unit]
def onClosed: F[Unit] = onFailed(TerminalState)
def onCloseRequest: F[Unit]
def onCloseRequest(req: ConnectionClass.Close): F[Unit]
def onHeartbeat: F[Unit]
Expand Down Expand Up @@ -84,9 +87,9 @@ private[client] object ConnectionState {
)
)

override def onClosed: F[Unit] =
hasOpened.complete(Left(TerminalState)) *>
configDef.complete(Left(TerminalState)) *>
override def onFailed(ex: Throwable): F[Unit] =
hasOpened.complete(Left(ex)) *>
configDef.complete(Left(ex)) *>
output.onClose *>
dispatcher.onClose *>
underlying.set(Status.Closed)
Expand Down
11 changes: 11 additions & 0 deletions modules/client/src/test/scala/internal/ConnectionStateSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import lepus.protocol.ConnectionClass
import lepus.protocol.Frame
import lepus.protocol.constants.ReplyCode
import lepus.protocol.domains.*
import org.scalacheck.Arbitrary
import org.scalacheck.Gen

import java.util.concurrent.TimeoutException
Expand Down Expand Up @@ -95,6 +96,16 @@ class ConnectionStateSuite extends InternalTestSuite {
} yield ()
}

test("config raises underlying error if closed") {
forAllF(Arbitrary.arbitrary[Throwable]) { ex =>
for {
s <- SUT
_ <- s.onFailed(ex)
_ <- s.config.attempt.assertEquals(Left(ex))
} yield ()
}
}

test("Raises error if onConnected is called more than once") {
for {
s <- SUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ final class FakeConnectionState(
override def onClosed: IO[Unit] =
interactions.add(Interaction.Closed) >> state.set(Status.Closed)

override def onFailed(ex: Throwable): IO[Unit] =
interactions.add(Interaction.Failed(ex)) >> state.set(Status.Closed)

override def onOpened: IO[Unit] = state.set(Status.Opened) >>
openedDef.complete(Right(())) >> interactions.add(Interaction.Opened)

Expand All @@ -89,6 +92,7 @@ object FakeConnectionState {
case Connected(config: NegotiatedConfig)
case CloseRequest(close: ConnectionClass.Close)
case ClientCloseRequest, Opened, Closed, Heartbeat
case Failed(ex: Throwable)
}

def apply(
Expand Down

0 comments on commit 88a509f

Please sign in to comment.