Skip to content

Commit

Permalink
AMQP: Fixes handling null replyTo header in AmqpReplyToSinkStage (#2371)
Browse files Browse the repository at this point in the history
  • Loading branch information
Grandys committed Aug 6, 2020
1 parent a1fda3b commit f25228f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[amqp] final class AmqpReplyToSinkStage(settings: AmqpReplyToSinkSettings
override def onPush(): Unit = {
val elem = grab(in)

val replyTo = elem.properties.map(_.getReplyTo)
val replyTo = elem.properties.flatMap(properties => Option(properties.getReplyTo))

if (replyTo.isDefined) {
channel.basicPublish(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import akka.stream.testkit.{TestPublisher, TestSubscriber}
import akka.util.ByteString
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client.AuthenticationFailureException

import scala.concurrent.Await
Expand Down Expand Up @@ -146,6 +147,21 @@ class AmqpConnectorsSpec extends AmqpSpec {

caught.getCause.getMessage should equal("Reply-to header was not set")

val propertiesWithNullReplyTo = new BasicProperties()
.builder()
.appId("appId")
.build()
val outgoingMessageWithEmptyReplyTo = outgoingMessage
.withProperties(propertiesWithNullReplyTo)

Source
.single(outgoingMessageWithEmptyReplyTo)
.toMat(AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider).withFailIfReplyToMissing(false)))(
Keep.right
)
.run()
.futureValue shouldBe Done

}

"publish from one source and consume elements with multiple sinks" in assertAllStagesStopped {
Expand Down

0 comments on commit f25228f

Please sign in to comment.