From e69b2e8a5cdb325837120cded91f09699e9d52c1 Mon Sep 17 00:00:00 2001 From: Adrian Paschkowski Date: Mon, 4 Dec 2023 18:18:09 +0100 Subject: [PATCH] Only change RPC response topic with supported topic hot swap --- .../main/java/gg/beemo/latte/broker/BrokerClient.kt | 6 ++++-- .../main/java/gg/beemo/latte/broker/BrokerMessage.kt | 7 ++++++- .../java/gg/beemo/latte/broker/BrokerSubclients.kt | 10 +++++----- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt index 25cc041..b4a58ed 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt @@ -155,8 +155,10 @@ abstract class BrokerClient( return topics[topic]?.keys?.get(key) } - internal fun createResponseTopic(topic: String): String = "$topic.responses" - internal fun createResponseKey(key: String): String = "$key.response" + internal fun toResponseTopic(topic: String): String = + if (connection.supportsTopicHotSwap) "$topic.responses" else topic + + internal fun toResponseKey(key: String): String = "$key.response" private fun onTopicMessage( topic: String, diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt index 05ffb65..f2e7398 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt @@ -6,4 +6,9 @@ data class BrokerMessage( val key: String, val value: T, val headers: BaseBrokerMessageHeaders -) +) { + + val messageId: String + get() = headers.messageId + +} diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt index 15d2f4f..86a0248 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt @@ -186,8 +186,8 @@ class RpcClient( private val requestConsumer = client.consumer(topic, key, options, requestType, responseIsNullable) { val result = callback(it) val responseProducer = client.producer( - client.createResponseTopic(topic), - client.createResponseKey(key), + client.toResponseTopic(topic), + client.toResponseKey(key), options, responseType, responseIsNullable, @@ -197,7 +197,7 @@ class RpcClient( result, services = setOf(it.headers.sourceService), instances = setOf(it.headers.sourceInstance), - inReplyTo = it.headers.messageId, + inReplyTo = it.messageId, ) responseProducer.destroy() } @@ -230,8 +230,8 @@ class RpcClient( val messageId = AtomicReference(null) val responseConsumer = client.consumer( - client.createResponseTopic(topic), - client.createResponseKey(key), + client.toResponseTopic(topic), + client.toResponseKey(key), options, responseType, responseIsNullable,