Skip to content

Commit

Permalink
Finish up headers redesign
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Dec 17, 2023
1 parent aad14e6 commit 03323c1
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 92 deletions.
5 changes: 5 additions & 0 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ open class BrokerMessage<T, H : BrokerMessageHeaders>(
return RpcRequestMessage(topic, key, value, headers, updateSender)
}

internal fun toRpcResponseMessage(): RpcResponseMessage<T> {
return RpcResponseMessage(topic, key, value, RpcMessageHeaders(headers))
}

}

typealias AbstractBrokerMessage<T> = BrokerMessage<T, out BrokerMessageHeaders>
typealias BaseBrokerMessage<T> = BrokerMessage<T, BrokerMessageHeaders>
typealias BaseRpcRequestMessage<RequestT, ResponseT> = RpcRequestMessage<RequestT, ResponseT, BrokerMessageHeaders>
typealias RpcResponseMessage<T> = BrokerMessage<T, RpcMessageHeaders>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ open class BrokerMessageHeaders(val headers: Map<String, String>) {
messageId: MessageId?,
extra: Map<String, String> = emptyMap(),
): Map<String, String> {
return mapOf(
HEADER_SOURCE_SERVICE to sourceService,
HEADER_SOURCE_INSTANCE to sourceInstance,
HEADER_TARGET_SERVICES to joinToString(targetServices),
HEADER_TARGET_INSTANCES to joinToString(targetInstances),
HEADER_MESSAGE_ID to (messageId ?: UUID.randomUUID().toString()),
)
val headers = HashMap<String, String>()
headers[HEADER_SOURCE_SERVICE] = sourceService
headers[HEADER_SOURCE_INSTANCE] = sourceInstance
headers[HEADER_TARGET_SERVICES] = joinToString(targetServices)
headers[HEADER_TARGET_INSTANCES] = joinToString(targetInstances)
headers[HEADER_MESSAGE_ID] = messageId ?: UUID.randomUUID().toString()
headers.putAll(extra)
return headers
}

@JvmStatic
Expand Down
5 changes: 2 additions & 3 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,12 @@ class ProducerSubclient<T>(
connection,
targetServices = services,
targetInstances = instances,
messageId = null,
),
)
return internalSend(msg)
}

internal suspend fun internalSend(msg: BaseBrokerMessage<T>): MessageId {
internal suspend fun internalSend(msg: AbstractBrokerMessage<T>): MessageId {
if (!isNullable) {
requireNotNull(msg.value) {
"Cannot send null message for non-nullable type with key '$key' in topic '$topic'"
Expand Down Expand Up @@ -263,7 +262,7 @@ class RpcClient<RequestT, ResponseT>(
responseIsNullable,
) {
val msg = it.toRpcResponseMessage()
if (msg.inReplyTo != messageId.get()) {
if (msg.headers.inReplyTo != messageId.get()) {
return@consumer
}
send(msg)
Expand Down
8 changes: 0 additions & 8 deletions latte/src/main/java/gg/beemo/latte/broker/LocalConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@ class LocalConnection : BrokerConnection() {
// Nothing to start :)
}

override fun createHeaders(
targetServices: Set<String>,
targetInstances: Set<String>,
inReplyTo: MessageId?
): BrokerMessageHeaders {
return BrokerMessageHeaders(serviceName, instanceId, targetServices, targetInstances, inReplyTo, null)
}

override fun createTopic(topic: String) {
// noop
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class RpcMessageHeaders(headers: Map<String, String>) : BrokerMessageHeaders(hea
headers.getOrDefault(HEADER_IS_UPDATE, "false").toBoolean()
}

constructor(base: BrokerMessageHeaders) : this(base.headers)

constructor(
sourceService: String,
sourceInstance: String,
Expand All @@ -27,7 +29,7 @@ class RpcMessageHeaders(headers: Map<String, String>) : BrokerMessageHeaders(hea
targetServices,
targetInstances,
null,
mapOf(
extra = mapOf(
HEADER_IN_REPLY_TO to inReplyTo,
HEADER_STATUS to status.code.toString(),
HEADER_IS_UPDATE to isUpdate.toString(),
Expand Down
29 changes: 11 additions & 18 deletions latte/src/main/java/gg/beemo/latte/broker/kafka/KafkaConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,16 @@ class KafkaConnection(
value: String,
headers: BrokerMessageHeaders,
): MessageId {
require(headers is KafkaMessageHeaders) {
"KafkaConnection requires headers of type KafkaMessageHeaders to be passed, got ${headers.javaClass.name} instead"
}

if (shouldDispatchExternallyAfterShortCircuit(topic, key, value, headers)) {

val producer = this.producer
checkNotNull(producer) { "Producer is not initialized" }
val record = ProducerRecord(topic, key, value)
headers.applyTo(record.headers())
record.headers().apply {
headers.headers.forEach { (key, value) ->
add(key, value.toByteArray())
}
}

// Asynchronously enqueue message
producer.send(record) { metadata: RecordMetadata, ex: Exception? ->
Expand Down Expand Up @@ -98,14 +98,6 @@ class KafkaConnection(
super.destroy()
}

override fun createHeaders(
targetServices: Set<String>,
targetInstances: Set<String>,
inReplyTo: MessageId?,
): BrokerMessageHeaders {
return KafkaMessageHeaders(serviceName, instanceId, targetServices, targetInstances, inReplyTo, null)
}

override fun createTopic(topic: String) {
checkRunningTopicsModification(topic)
}
Expand Down Expand Up @@ -217,6 +209,12 @@ class KafkaConnection(
log.debug("Consumer has been created")
}

private fun handleIncomingRecord(topic: String, record: Record<String, String>) {
val headersMap = record.headers().associate { it.key() to String(it.value()) }
val headers = BrokerMessageHeaders(headersMap)
dispatchIncomingMessage(topic, record.key(), record.value(), headers)
}

private fun createConnectionProperties(): Properties = Properties().apply {
// Server(s) to connect to
this[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = kafkaHostsString
Expand All @@ -231,9 +229,4 @@ class KafkaConnection(
this[CommonClientConfigs.RETRIES_CONFIG] = 10
}

private fun handleIncomingRecord(topic: String, record: Record<String, String>) {
val headers = KafkaMessageHeaders(record.headers())
dispatchIncomingMessage(topic, record.key(), record.value(), headers)
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ class BrokerClientTest {
fun `test greeting RPC`() = withTestClient { client ->
val response = client.greetingRpc.call(GreetingRequest("Beemo"))
Assertions.assertEquals("Hello, Beemo", response.value.greeting)
Assertions.assertEquals(RpcStatus.OK, response.headers.status)
}

@Test
fun `test null RPC`() = withTestClient { client ->
val response = client.nullRpc.call(null)
Assertions.assertNull(response.value)
Assertions.assertEquals(RpcStatus(1337), response.headers.status)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class TestBrokerClient(
key = "greeting.requests",
) {
log.info("greetingRpc received request: ${it.value}")
return@rpc GreetingResponse("Hello, ${it.value.name}")
return@rpc RpcStatus.OK to GreetingResponse("Hello, ${it.value.name}")
}

val nullRpc = rpc<Unit?, Unit?>(
Expand All @@ -33,7 +33,7 @@ class TestBrokerClient(
) {
log.info("nullRpc received request: ${it.value}")
Assertions.assertNull(it.value)
return@rpc null
return@rpc RpcStatus(1337) to null
}

val safeLongProducer = producer<Long>(
Expand Down

0 comments on commit 03323c1

Please sign in to comment.