Skip to content

Commit

Permalink
MQTT streaming: Fixes wrong Exception thrown when client ping timeouts (
Browse files Browse the repository at this point in the history
#1642)

## Purpose

When mqtt client doesn't send ping request within the expected period (given by the connect keep alive parameter) stream should failed with `ServerConnector.PingFailed` exception but it fails with `ClientConnection.PingFailed` exception. That makes that ActorMqttServerSession command flow doesn't translate the failure to the public api error ActorMqttServerSession.PingFailed (MqttSession.scala::482)

## Background Context

There was an ignored failing unit test due to this bug that however was pointing to a different issue. After this PR this unit test is not failing anymore so I activate it again.
  • Loading branch information
jcroig authored and ennru committed Apr 12, 2019
1 parent e81f031 commit 399819c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.{Done, NotUsed}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior, ChildFailed, PostStop, Terminated}
import akka.annotation.InternalApi
import akka.stream.alpakka.mqtt.streaming.impl.ServerConnector.PingFailed
import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.scaladsl.{BroadcastHub, Keep, Source, SourceQueueWithComplete}
import akka.util.ByteString
Expand Down Expand Up @@ -619,7 +620,7 @@ import scala.util.{Failure, Success}
local.success(ForwardPingReq)
clientConnected(data)
case (context, ReceivePingReqTimeout) =>
data.remote.fail(PingFailed)
data.remote.fail(ServerConnector.PingFailed)
timer.cancel(ReceivePingreq)
disconnect(context, data.remote, data)
case (context, DisconnectReceivedFromRemote(local)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import akka.testkit._
import akka.util.{ByteString, Timeout}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Span}

import scala.concurrent.{ExecutionContext, Promise}
import scala.concurrent.duration._
Expand Down Expand Up @@ -1514,7 +1515,11 @@ class MqttSessionSpec
server.watchCompletion().foreach(_ => session.shutdown())
}

"close when no ping request received" ignore { // assertAllStagesStopped { // https://github.com/akka/alpakka/issues/1563
"close when no ping request received" in assertAllStagesStopped {
// A longer patience config implicit is provided since minimum client's keep alive time is 1 second, so default
// 150 millis is not enough for the ping request timeout to be triggered and verify the stream fails as expected.
implicit val patienceConfig = PatienceConfig(scaled(Span(3000, Millis)), scaled(Span(15, Millis)))

val session = ActorMqttServerSession(settings)

val client = TestProbe()
Expand Down

0 comments on commit 399819c

Please sign in to comment.