Skip to content

Commit

Permalink
Fix flaky 'disconnect exceptional completion' test
Browse files Browse the repository at this point in the history
  • Loading branch information
andreas-sa-schroeder committed Oct 5, 2018
1 parent 092283c commit 92b1192
Showing 1 changed file with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -569,12 +569,11 @@ class JmsConnectorsSpec extends JmsSpec {
eventually { connectionFactory.cachedConnection shouldBe 'closed }
}

"sink disconnect exceptional completion" in withServer() { ctx =>
"producer disconnect exceptional completion" in withServer() { ctx =>
import system.dispatcher

val url: String = ctx.url
val connectionFactory = new CachedConnectionFactory(url)
val brokerStop = new CountDownLatch(1)

val jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer(
JmsProducerSettings(connectionFactory)
Expand All @@ -586,22 +585,26 @@ class JmsConnectorsSpec extends JmsSpec {
.mapAsync(1)(
n =>
Future {
Thread.sleep(500)
brokerStop.await()
Thread.sleep(100)
JmsTextMessage(n.toString)
}
)
.runWith(jmsSink)

ctx.broker.stop()
brokerStop.countDown()

val exception = completionFuture.failed.futureValue
exception shouldBe a[ConnectionRetryException]
exception.getCause shouldBe a[JMSException]

// connection was not yet initialized before broker stop
connectionFactory.cachedConnection shouldBe null
// connection should be either
// - not yet initialized before broker stop, or
// - closed on broker stop (if preStart came first).
if (connectionFactory.cachedConnection != null) {
connectionFactory.cachedConnection shouldBe 'closed
}

ctx.broker.start(true)
}

"ensure no message loss when stopping a stream" in withServer() { ctx =>
Expand Down

0 comments on commit 92b1192

Please sign in to comment.