From aad14e6ce31f5df15042fb8d72aceaa53b051969 Mon Sep 17 00:00:00 2001 From: Adrian Paschkowski Date: Mon, 11 Dec 2023 10:22:30 +0100 Subject: [PATCH] [WIP] Make header class more flexible and extendable --- .../latte/broker/BaseBrokerMessageHeaders.kt | 37 ------- .../gg/beemo/latte/broker/BrokerClient.kt | 8 +- .../gg/beemo/latte/broker/BrokerConnection.kt | 18 ++-- .../gg/beemo/latte/broker/BrokerMessage.kt | 29 +++++- .../latte/broker/BrokerMessageHeaders.kt | 91 ++++++++++++++++++ .../gg/beemo/latte/broker/BrokerSubclients.kt | 96 +++++++++++-------- .../java/gg/beemo/latte/broker/Exceptions.kt | 5 + .../gg/beemo/latte/broker/LocalConnection.kt | 6 +- .../beemo/latte/broker/RpcMessageHeaders.kt | 63 ++++++++++++ .../java/gg/beemo/latte/broker/RpcStatus.kt | 20 ++++ .../latte/broker/kafka/KafkaConnection.kt | 6 +- .../latte/broker/kafka/KafkaMessageHeaders.kt | 14 +-- 12 files changed, 285 insertions(+), 108 deletions(-) delete mode 100644 latte/src/main/java/gg/beemo/latte/broker/BaseBrokerMessageHeaders.kt create mode 100644 latte/src/main/java/gg/beemo/latte/broker/BrokerMessageHeaders.kt create mode 100644 latte/src/main/java/gg/beemo/latte/broker/Exceptions.kt create mode 100644 latte/src/main/java/gg/beemo/latte/broker/RpcMessageHeaders.kt create mode 100644 latte/src/main/java/gg/beemo/latte/broker/RpcStatus.kt diff --git a/latte/src/main/java/gg/beemo/latte/broker/BaseBrokerMessageHeaders.kt b/latte/src/main/java/gg/beemo/latte/broker/BaseBrokerMessageHeaders.kt deleted file mode 100644 index e0098af..0000000 --- a/latte/src/main/java/gg/beemo/latte/broker/BaseBrokerMessageHeaders.kt +++ /dev/null @@ -1,37 +0,0 @@ -package gg.beemo.latte.broker - -import java.util.* - -open class BaseBrokerMessageHeaders( - val sourceService: String, - val sourceInstance: String, - val targetServices: Set, - val targetInstances: Set, - val inReplyTo: MessageId?, - messageId: MessageId?, -) { - - val messageId: MessageId = messageId ?: UUID.randomUUID().toString() - - companion object { - - const val HEADER_SOURCE_SERVICE = "source-service" - const val HEADER_SOURCE_INSTANCE = "source-instance" - const val HEADER_TARGET_SERVICES = "target-services" - const val HEADER_TARGET_INSTANCES = "target-instances" - const val HEADER_MESSAGE_ID = "message-id" - const val HEADER_IN_REPLY_TO = "in-reply-to" - - @JvmStatic - protected fun splitToSet(value: String): Set { - return value.split(",").filter { it.isNotEmpty() }.toSet() - } - - @JvmStatic - protected fun joinToString(value: Set): String { - return value.joinToString(",") - } - - } - -} diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt index b4a58ed..1e27753 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt @@ -30,7 +30,7 @@ abstract class BrokerClient( topic: String, key: String, options: BrokerClientOptions = BrokerClientOptions(), - noinline callback: suspend CoroutineScope.(BrokerMessage) -> Unit, + noinline callback: suspend CoroutineScope.(BaseBrokerMessage) -> Unit, ): ConsumerSubclient { return consumer(topic, key, options, T::class.java, null is T, callback) } @@ -42,7 +42,7 @@ abstract class BrokerClient( options: BrokerClientOptions = BrokerClientOptions(), type: Class, isNullable: Boolean, - callback: suspend CoroutineScope.(BrokerMessage) -> Unit, + callback: suspend CoroutineScope.(BaseBrokerMessage) -> Unit, ): ConsumerSubclient { log.debug("Creating consumer for key '{}' in topic '{}' with type {}", key, topic, type.name) return ConsumerSubclient(connection, this, topic, key, options, type, isNullable, callback).also { @@ -76,7 +76,7 @@ abstract class BrokerClient( topic: String, key: String, options: BrokerClientOptions = BrokerClientOptions(), - noinline callback: suspend CoroutineScope.(BrokerMessage) -> ResponseT, + noinline callback: suspend CoroutineScope.(BaseRpcRequestMessage) -> Pair, ): RpcClient { return RpcClient( this, @@ -164,7 +164,7 @@ abstract class BrokerClient( topic: String, key: String, value: String, - headers: BaseBrokerMessageHeaders, + headers: BrokerMessageHeaders, ) { val metadata = getExistingKeyMetadata(topic, key) ?: return for (consumer in metadata.consumers) { diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt index 1f15ca4..81a915f 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt @@ -4,7 +4,7 @@ import gg.beemo.latte.logging.Log import java.util.* fun interface TopicListener { - fun onMessage(topic: String, key: String, value: String, headers: BaseBrokerMessageHeaders) + fun onMessage(topic: String, key: String, value: String, headers: BrokerMessageHeaders) } typealias MessageId = String @@ -28,14 +28,14 @@ abstract class BrokerConnection { topic: String, key: String, value: String, - headers: BaseBrokerMessageHeaders, + headers: BrokerMessageHeaders, ): MessageId internal suspend fun send( topic: String, key: String, value: String, - headers: BaseBrokerMessageHeaders, + headers: BrokerMessageHeaders, ): MessageId { log.trace( "Sending message {} with key '{}' in topic '{}' with value: {}", @@ -47,12 +47,6 @@ abstract class BrokerConnection { return abstractSend(topic, key, value, headers) } - internal abstract fun createHeaders( - targetServices: Set = emptySet(), - targetInstances: Set = emptySet(), - inReplyTo: MessageId? = null, - ): BaseBrokerMessageHeaders - protected abstract fun createTopic(topic: String) protected abstract fun removeTopic(topic: String) @@ -82,7 +76,7 @@ abstract class BrokerConnection { topic: String, key: String, value: String, - headers: BaseBrokerMessageHeaders + headers: BrokerMessageHeaders ) { if ( (headers.targetServices.isNotEmpty() && serviceName !in headers.targetServices) || @@ -104,7 +98,7 @@ abstract class BrokerConnection { topic: String, key: String, value: String, - headers: BaseBrokerMessageHeaders + headers: BrokerMessageHeaders ): Boolean { val targetServices = headers.targetServices val targetInstances = headers.targetInstances @@ -129,7 +123,7 @@ abstract class BrokerConnection { ) } - private fun invokeLocalCallbacks(topic: String, key: String, value: String, headers: BaseBrokerMessageHeaders) { + private fun invokeLocalCallbacks(topic: String, key: String, value: String, headers: BrokerMessageHeaders) { log.trace( "Dispatching message {} with key '{}' in topic '{}' to local listeners with value: {}", headers.messageId, diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt index f2e7398..4d2aad7 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt @@ -1,14 +1,37 @@ package gg.beemo.latte.broker -data class BrokerMessage( - val client: BrokerClient, +open class BrokerMessage( val topic: String, val key: String, val value: T, - val headers: BaseBrokerMessageHeaders + val headers: H ) { val messageId: String get() = headers.messageId + internal fun toRpcRequestMessage( + updateSender: suspend (ResponseT, RpcStatus) -> Unit, + ): RpcRequestMessage { + return RpcRequestMessage(topic, key, value, headers, updateSender) + } + +} + +typealias BaseBrokerMessage = BrokerMessage +typealias BaseRpcRequestMessage = RpcRequestMessage +typealias RpcResponseMessage = BrokerMessage + +class RpcRequestMessage( + topic: String, + key: String, + value: RequestT, + headers: H, + private val updateSender: suspend (ResponseT, RpcStatus) -> Unit, +) : BrokerMessage(topic, key, value, headers) { + + suspend fun sendUpdate(response: ResponseT, status: RpcStatus) { + updateSender(response, status) + } + } diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerMessageHeaders.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessageHeaders.kt new file mode 100644 index 0000000..1f7aeca --- /dev/null +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessageHeaders.kt @@ -0,0 +1,91 @@ +package gg.beemo.latte.broker + +import java.util.UUID + +open class BrokerMessageHeaders(val headers: Map) { + + val sourceService: String by lazy { + headers.getOrThrow(HEADER_SOURCE_SERVICE) + } + val sourceInstance: String by lazy { + headers.getOrThrow(HEADER_SOURCE_INSTANCE) + } + val targetServices: Set by lazy { + splitToSet(headers.getOrDefault(HEADER_TARGET_SERVICES, "")) + } + val targetInstances: Set by lazy { + splitToSet(headers.getOrDefault(HEADER_TARGET_INSTANCES, "")) + } + val messageId: MessageId by lazy { + headers.getOrThrow(HEADER_MESSAGE_ID) + } + + constructor( + sourceService: String, + sourceInstance: String, + targetServices: Set, + targetInstances: Set, + ) : this( + createHeadersMap( + sourceService, + sourceInstance, + targetServices, + targetInstances, + null, + ) + ) + + constructor( + connection: BrokerConnection, + targetServices: Set, + targetInstances: Set, + ) : this( + connection.serviceName, + connection.instanceId, + targetServices, + targetInstances, + ) + + companion object { + + private const val HEADER_SOURCE_SERVICE = "source-service" + private const val HEADER_SOURCE_INSTANCE = "source-instance" + private const val HEADER_TARGET_SERVICES = "target-services" + private const val HEADER_TARGET_INSTANCES = "target-instances" + private const val HEADER_MESSAGE_ID = "message-id" + + @JvmStatic + protected fun createHeadersMap( + sourceService: String, + sourceInstance: String, + targetServices: Set, + targetInstances: Set, + messageId: MessageId?, + extra: Map = emptyMap(), + ): Map { + return mapOf( + HEADER_SOURCE_SERVICE to sourceService, + HEADER_SOURCE_INSTANCE to sourceInstance, + HEADER_TARGET_SERVICES to joinToString(targetServices), + HEADER_TARGET_INSTANCES to joinToString(targetInstances), + HEADER_MESSAGE_ID to (messageId ?: UUID.randomUUID().toString()), + ) + } + + @JvmStatic + protected fun splitToSet(value: String): Set { + return value.split(",").filter { it.isNotEmpty() }.toSet() + } + + @JvmStatic + protected fun joinToString(value: Set): String { + return value.joinToString(",") + } + + } + +} + +internal fun Map.getOrThrow(key: String): String { + return get(key) ?: throw IllegalArgumentException("Missing broker message header '$key'") +} diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt index 678dd84..597ab65 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt @@ -24,6 +24,7 @@ data class BrokerClientOptions( val useSafeJsLongs: Boolean = false, ) +// TODO Add error handling, some try-finally to close the producer/consumer even with errors sealed class BaseSubclient( protected val connection: BrokerConnection, protected val client: BrokerClient, @@ -80,30 +81,35 @@ class ProducerSubclient( services: Set = emptySet(), instances: Set = emptySet(), ): MessageId { - return send(data, services, instances, null) + val msg = BrokerMessage( + topic, + key, + data, + BrokerMessageHeaders( + connection, + targetServices = services, + targetInstances = instances, + messageId = null, + ), + ) + return internalSend(msg) } - internal suspend fun send( - data: T, - services: Set, - instances: Set, - inReplyTo: MessageId?, - ): MessageId { + internal suspend fun internalSend(msg: BaseBrokerMessage): MessageId { if (!isNullable) { - requireNotNull(data) { + requireNotNull(msg.value) { "Cannot send null message for non-nullable type with key '$key' in topic '$topic'" } } - val strigifiedData = stringifyOutgoing(data) - val headers = connection.createHeaders(services, instances, inReplyTo) + val strigifiedData = stringifyOutgoing(msg.value) log.trace( "Sending message {} with key '{}' in topic '{}' with value: {}", - headers.messageId, + msg.messageId, key, topic, strigifiedData, ) - return connection.send(topic, key, strigifiedData, headers) + return connection.send(topic, key, strigifiedData, msg.headers) } private fun stringifyOutgoing(data: T?): String { @@ -120,7 +126,7 @@ class ConsumerSubclient( options: BrokerClientOptions, incomingType: Class, private val isNullable: Boolean, - private val callback: suspend CoroutineScope.(BrokerMessage) -> Unit, + private val callback: suspend CoroutineScope.(BaseBrokerMessage) -> Unit, ) : BaseSubclient( connection, client, @@ -138,7 +144,7 @@ class ConsumerSubclient( internal suspend fun onIncomingMessage( value: String, - headers: BaseBrokerMessageHeaders, + headers: BrokerMessageHeaders, ) = coroutineScope { val data = parseIncoming(value) if (!isNullable) { @@ -146,7 +152,7 @@ class ConsumerSubclient( "Received null message for non-nullable type with key '$key' in topic '$topic'" } } - val message = BrokerMessage(client, topic, key, data, headers) + val message = BrokerMessage(topic, key, data, headers) log.trace( "Received message {} with key '{}' in topic '{}' with value: {}", headers.messageId, @@ -155,7 +161,7 @@ class ConsumerSubclient( value, ) @Suppress("UNCHECKED_CAST") // Safe due to above null validation - callback(message as BrokerMessage) + callback(message as BaseBrokerMessage) } private fun parseIncoming(json: String): T? { @@ -173,7 +179,7 @@ class RpcClient( requestIsNullable: Boolean, private val responseType: Class, private val responseIsNullable: Boolean, - private val callback: suspend CoroutineScope.(BrokerMessage) -> ResponseT, + private val callback: suspend CoroutineScope.(BaseRpcRequestMessage) -> Pair, ) : BaseSubclient( client.connection, client, @@ -183,13 +189,7 @@ class RpcClient( ) { private val requestProducer = client.producer(topic, key, options, requestType, requestIsNullable) - - private val requestConsumer = client.consumer(topic, key, options, requestType, responseIsNullable) { - val result = try { - callback(it) - } catch (_: IgnoreRpcRequest) { - return@consumer - } + private val requestConsumer = client.consumer(topic, key, options, requestType, requestIsNullable) { msg -> val responseProducer = client.producer( client.toResponseTopic(topic), client.toResponseKey(key), @@ -197,13 +197,34 @@ class RpcClient( responseType, responseIsNullable, ) - // Send only to source service/instance that initiated this call - responseProducer.send( - result, - services = setOf(it.headers.sourceService), - instances = setOf(it.headers.sourceInstance), - inReplyTo = it.messageId, - ) + + suspend fun sendResponse(response: ResponseT, status: RpcStatus, isUpdate: Boolean) { + val responseMsg = RpcResponseMessage( + client.toResponseTopic(topic), + client.toResponseKey(key), + response, + RpcMessageHeaders( + connection, + // Send only to source service/instance that initiated this call + targetServices = setOf(msg.headers.sourceService), + targetInstances = setOf(msg.headers.sourceInstance), + inReplyTo = msg.headers.messageId, + status, + isUpdate, + ), + ) + responseProducer.internalSend(responseMsg) + } + + val rpcMessage = msg.toRpcRequestMessage { data, status -> + sendResponse(data, status, true) + } + val (status, response) = try { + callback(rpcMessage) + } catch (_: IgnoreRpcRequest) { + return@consumer + } + sendResponse(response, status, false) responseProducer.destroy() } @@ -212,7 +233,7 @@ class RpcClient( services: Set = emptySet(), instances: Set = emptySet(), timeout: Duration = 10.seconds, - ): BrokerMessage { + ): RpcResponseMessage { return stream(request, services, instances, timeout, 1).single() } @@ -222,7 +243,7 @@ class RpcClient( instances: Set = emptySet(), timeout: Duration = 10.seconds, maxResponses: Int? = null, - ): Flow> { + ): Flow> { require(timeout.isFinite() || maxResponses != null) { "Must specify either a timeout or a max number of responses" } @@ -241,10 +262,11 @@ class RpcClient( responseType, responseIsNullable, ) { - if (it.headers.inReplyTo != messageId.get()) { + val msg = it.toRpcResponseMessage() + if (msg.inReplyTo != messageId.get()) { return@consumer } - send(it) + send(msg) timeoutLatch?.countDown() val count = responseCounter.incrementAndGet() if (maxResponses != null && count >= maxResponses) { @@ -278,7 +300,3 @@ class RpcClient( } } - -sealed class BrokerException(message: String?) : Exception(message) -class RpcRequestTimeout(message: String) : BrokerException(message) -class IgnoreRpcRequest : BrokerException(null) diff --git a/latte/src/main/java/gg/beemo/latte/broker/Exceptions.kt b/latte/src/main/java/gg/beemo/latte/broker/Exceptions.kt new file mode 100644 index 0000000..92b088b --- /dev/null +++ b/latte/src/main/java/gg/beemo/latte/broker/Exceptions.kt @@ -0,0 +1,5 @@ +package gg.beemo.latte.broker + +sealed class BrokerException(message: String?) : Exception(message) +class RpcRequestTimeout(message: String) : BrokerException(message) +class IgnoreRpcRequest : BrokerException(null) diff --git a/latte/src/main/java/gg/beemo/latte/broker/LocalConnection.kt b/latte/src/main/java/gg/beemo/latte/broker/LocalConnection.kt index 2fe31a2..4a070f0 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/LocalConnection.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/LocalConnection.kt @@ -10,7 +10,7 @@ class LocalConnection : BrokerConnection() { topic: String, key: String, value: String, - headers: BaseBrokerMessageHeaders, + headers: BrokerMessageHeaders, ): MessageId { if ( shouldDispatchExternallyAfterShortCircuit(topic, key, value, headers) && @@ -34,8 +34,8 @@ class LocalConnection : BrokerConnection() { targetServices: Set, targetInstances: Set, inReplyTo: MessageId? - ): BaseBrokerMessageHeaders { - return BaseBrokerMessageHeaders(serviceName, instanceId, targetServices, targetInstances, inReplyTo, null) + ): BrokerMessageHeaders { + return BrokerMessageHeaders(serviceName, instanceId, targetServices, targetInstances, inReplyTo, null) } override fun createTopic(topic: String) { diff --git a/latte/src/main/java/gg/beemo/latte/broker/RpcMessageHeaders.kt b/latte/src/main/java/gg/beemo/latte/broker/RpcMessageHeaders.kt new file mode 100644 index 0000000..92ab2ce --- /dev/null +++ b/latte/src/main/java/gg/beemo/latte/broker/RpcMessageHeaders.kt @@ -0,0 +1,63 @@ +package gg.beemo.latte.broker + +class RpcMessageHeaders(headers: Map) : BrokerMessageHeaders(headers) { + + val inReplyTo: MessageId by lazy { + headers.getOrThrow(HEADER_IN_REPLY_TO) + } + val status: RpcStatus by lazy { + RpcStatus(headers.getOrDefault(HEADER_STATUS, "999_999").toInt()) + } + val isUpdate: Boolean by lazy { + headers.getOrDefault(HEADER_IS_UPDATE, "false").toBoolean() + } + + constructor( + sourceService: String, + sourceInstance: String, + targetServices: Set, + targetInstances: Set, + inReplyTo: MessageId, + status: RpcStatus, + isUpdate: Boolean, + ) : this( + createHeadersMap( + sourceService, + sourceInstance, + targetServices, + targetInstances, + null, + mapOf( + HEADER_IN_REPLY_TO to inReplyTo, + HEADER_STATUS to status.code.toString(), + HEADER_IS_UPDATE to isUpdate.toString(), + ) + ) + ) + + constructor( + connection: BrokerConnection, + targetServices: Set, + targetInstances: Set, + inReplyTo: MessageId, + status: RpcStatus, + isUpdate: Boolean, + ) : this( + connection.serviceName, + connection.instanceId, + targetServices, + targetInstances, + inReplyTo, + status, + isUpdate, + ) + + companion object { + + private const val HEADER_IN_REPLY_TO = "rpc-in-reply-to" + private const val HEADER_IS_UPDATE = "rpc-is-update" + private const val HEADER_STATUS = "rpc-response-status" + + } + +} diff --git a/latte/src/main/java/gg/beemo/latte/broker/RpcStatus.kt b/latte/src/main/java/gg/beemo/latte/broker/RpcStatus.kt new file mode 100644 index 0000000..01840f9 --- /dev/null +++ b/latte/src/main/java/gg/beemo/latte/broker/RpcStatus.kt @@ -0,0 +1,20 @@ +package gg.beemo.latte.broker + +class RpcStatus(val code: Int) { + + companion object { + val OK = RpcStatus(0) + val UNKNOWN = RpcStatus(999_999) + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is RpcStatus) return false + return code == other.code + } + + override fun hashCode(): Int { + return code + } + +} diff --git a/latte/src/main/java/gg/beemo/latte/broker/kafka/KafkaConnection.kt b/latte/src/main/java/gg/beemo/latte/broker/kafka/KafkaConnection.kt index 2501455..cedb153 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/kafka/KafkaConnection.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/kafka/KafkaConnection.kt @@ -1,6 +1,6 @@ package gg.beemo.latte.broker.kafka -import gg.beemo.latte.broker.BaseBrokerMessageHeaders +import gg.beemo.latte.broker.BrokerMessageHeaders import gg.beemo.latte.broker.BrokerConnection import gg.beemo.latte.broker.MessageId import gg.beemo.latte.logging.Log @@ -49,7 +49,7 @@ class KafkaConnection( topic: String, key: String, value: String, - headers: BaseBrokerMessageHeaders, + headers: BrokerMessageHeaders, ): MessageId { require(headers is KafkaMessageHeaders) { "KafkaConnection requires headers of type KafkaMessageHeaders to be passed, got ${headers.javaClass.name} instead" @@ -102,7 +102,7 @@ class KafkaConnection( targetServices: Set, targetInstances: Set, inReplyTo: MessageId?, - ): BaseBrokerMessageHeaders { + ): BrokerMessageHeaders { return KafkaMessageHeaders(serviceName, instanceId, targetServices, targetInstances, inReplyTo, null) } diff --git a/latte/src/main/java/gg/beemo/latte/broker/kafka/KafkaMessageHeaders.kt b/latte/src/main/java/gg/beemo/latte/broker/kafka/KafkaMessageHeaders.kt index e4d5aa6..52d8535 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/kafka/KafkaMessageHeaders.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/kafka/KafkaMessageHeaders.kt @@ -1,22 +1,24 @@ package gg.beemo.latte.broker.kafka -import gg.beemo.latte.broker.BaseBrokerMessageHeaders +import gg.beemo.latte.broker.BrokerMessageHeaders import gg.beemo.latte.broker.MessageId import org.apache.kafka.common.header.Headers +// TODO Remove these. Instead, the Base headers will have a Map of values, +// which the connections will read from or put all incoming headers into. The important headers +// will then be extracted by the base headers class. +// TODO Create RpcMessageHeaders which extends the base headers. class KafkaMessageHeaders( sourceService: String, sourceInstance: String, targetServices: Set, targetInstances: Set, - inReplyTo: MessageId?, messageId: MessageId?, -) : BaseBrokerMessageHeaders( +) : BrokerMessageHeaders( sourceService, sourceInstance, targetServices, targetInstances, - inReplyTo, messageId, ) { @@ -25,7 +27,6 @@ class KafkaMessageHeaders( headers.getOrThrow(HEADER_SOURCE_INSTANCE), splitToSet(headers.getOrDefault(HEADER_TARGET_SERVICES, "")), splitToSet(headers.getOrDefault(HEADER_TARGET_INSTANCES, "")), - headers.getOrNull(HEADER_IN_REPLY_TO), headers.getOrThrow(HEADER_MESSAGE_ID), ) @@ -34,7 +35,6 @@ class KafkaMessageHeaders( headers.add(HEADER_SOURCE_INSTANCE, sourceInstance.toByteArray()) headers.add(HEADER_TARGET_SERVICES, joinToString(targetServices).toByteArray()) headers.add(HEADER_TARGET_INSTANCES, joinToString(targetInstances).toByteArray()) - inReplyTo?.let { headers.add(HEADER_IN_REPLY_TO, it.toByteArray()) } headers.add(HEADER_MESSAGE_ID, messageId.toByteArray()) } @@ -45,7 +45,7 @@ private fun Headers.getOrNull(key: String): String? { } private fun Headers.getOrThrow(key: String): String { - return getOrNull(key) ?: throw IllegalArgumentException("Missing header '$key'") + return getOrNull(key) ?: throw IllegalArgumentException("Missing broker message header '$key'") } private fun Headers.getOrDefault(key: String, defaultValue: String): String {