Skip to content

Commit

Permalink
Generalize Kafka code into BrokerClient/BrokerConnection (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Sep 12, 2023
1 parent 873e17f commit 27ddd03
Show file tree
Hide file tree
Showing 14 changed files with 343 additions and 237 deletions.
3 changes: 2 additions & 1 deletion latte/src/main/java/gg/beemo/latte/CommonConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gg.beemo.latte

object CommonConfig {

const val VANILLA_CLUSTER_ID = -1
const val INVALID_CLUSTER_ID = Integer.MIN_VALUE.toString()
const val VANILLA_CLUSTER_ID = "-1"

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package gg.beemo.latte.broker

import java.util.*

open class BaseBrokerMessageHeaders(
val clientId: String,
val sourceCluster: String,
targetClusters: Set<String>?, // // Empty = No target restriction
requestId: String?,
) {

val targetClusters: Set<String> = targetClusters ?: emptySet()
val requestId: String = requestId ?: UUID.randomUUID().toString()

companion object {

const val HEADER_CLIENT_ID = "client-id"
const val HEADER_REQUEST_ID = "request-id"
const val HEADER_SOURCE_CLUSTER = "source-cluster"
const val HEADER_TARGET_CLUSTERS = "target-clusters"

}

}
Original file line number Diff line number Diff line change
@@ -1,49 +1,48 @@
package gg.beemo.latte.kafka
package gg.beemo.latte.broker

import com.squareup.moshi.Moshi
import gg.beemo.latte.logging.log
import gg.beemo.latte.util.SuspendingCountDownLatch
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import java.util.Collections
import java.util.concurrent.CopyOnWriteArraySet
import java.util.concurrent.atomic.AtomicReference
import kotlin.time.Duration

fun interface KafkaEventListener<T : Any> {
suspend fun onMessage(msg: KafkaMessage<T>)
fun interface BrokerEventListener<T : Any> {
suspend fun onMessage(msg: BrokerMessage<T>)
}

fun interface KafkaMessageListener<T : Any> {
suspend fun onMessage(clusterId: Int, msg: KafkaMessage<T>)
fun interface BrokerMessageListener<T : Any> {
suspend fun onMessage(clusterId: String, msg: BrokerMessage<T>)
}

open class KafkaClient<T : Any>(
private val connection: KafkaConnection,
open class BrokerClient<T : Any>(
private val connection: BrokerConnection,
type: Class<T>,
private val topicName: String
) {

val currentClusterId: Int
get() = connection.currentClusterId
val clusterId: String
get() = connection.clusterId

val clientId: String
get() = connection.clientId

private val moshi = Moshi.Builder().build()
private val adapter = moshi.adapter(type).nullSafe()
private val keyListeners = Collections.synchronizedMap(HashMap<String, MutableSet<KafkaEventListener<T>>>())
private val keyListeners = Collections.synchronizedMap(HashMap<String, MutableSet<BrokerEventListener<T>>>())

init {
log.debug("Initializing Kafka Client with topic '$topicName' for objects of type ${type.name}")
connection.on(topicName, ::onTopicRecord)
log.debug("Initializing Broker Client with topic '$topicName' for objects of type ${type.name}")
connection.on(topicName, ::onTopicMessage)
}

protected suspend fun send(
key: String,
obj: T?,
headers: KafkaRecordHeaders = KafkaRecordHeaders(clientId, currentClusterId),
headers: BaseBrokerMessageHeaders = this.connection.createHeaders(),
blocking: Boolean = true,
): String {
return connection.send(topicName, key, stringify(obj), headers, blocking)
Expand All @@ -53,18 +52,18 @@ open class KafkaClient<T : Any>(
key: String,
obj: T?,
timeout: Duration = Duration.ZERO,
targetClusters: Set<Int> = emptySet(),
targetClusters: Set<String> = emptySet(),
expectedResponses: Int? = null,
blocking: Boolean = true,
messageCallback: KafkaMessageListener<T>? = null,
): Pair<Map<Int, KafkaMessage<T>>, Boolean> {
messageCallback: BrokerMessageListener<T>? = null,
): Pair<Map<String, BrokerMessage<T>>, Boolean> {
val responseKey = key.toResponseKey()

val responses = mutableMapOf<Int, KafkaMessage<T>>()
val responses = mutableMapOf<String, BrokerMessage<T>>()
val latch = SuspendingCountDownLatch(expectedResponses ?: targetClusters.size)
val requestId: AtomicReference<String> = AtomicReference("")

val cb = KafkaEventListener { msg ->
val cb = BrokerEventListener { msg ->
coroutineScope {
if (msg.headers.requestId != requestId.get()) {
return@coroutineScope
Expand All @@ -84,18 +83,9 @@ open class KafkaClient<T : Any>(
on(responseKey, cb)
var timeoutReached = false
try {
requestId.set(
send(
key,
obj,
KafkaRecordHeaders(
clientId,
currentClusterId,
targetClusters,
),
blocking,
)
)
val headers = this.connection.createHeaders(targetClusters)
requestId.set(headers.requestId)
send(key, obj, headers, blocking)

if (timeout <= Duration.ZERO) {
latch.await()
Expand All @@ -109,14 +99,14 @@ open class KafkaClient<T : Any>(
return Pair(responses, timeoutReached)
}

protected fun on(key: String, cb: KafkaEventListener<T>) {
protected fun on(key: String, cb: BrokerEventListener<T>) {
val listeners = keyListeners.computeIfAbsent(key) {
CopyOnWriteArraySet()
}
listeners.add(cb)
}

protected fun off(key: String, cb: KafkaEventListener<T>) {
protected fun off(key: String, cb: BrokerEventListener<T>) {
keyListeners.computeIfPresent(key) { _, listeners ->
listeners.remove(cb)
if (listeners.size == 0) {
Expand All @@ -128,15 +118,13 @@ open class KafkaClient<T : Any>(
}

internal suspend fun respond(
msg: KafkaMessage<T>,
msg: BrokerMessage<T>,
data: T?,
blocking: Boolean = true,
) {
val newHeaders = KafkaRecordHeaders(
clientId = clientId,
sourceCluster = currentClusterId,
targetClusters = setOf(msg.headers.sourceCluster),
requestId = msg.headers.requestId,
val newHeaders = this.connection.createHeaders(
setOf(msg.headers.sourceCluster),
msg.headers.requestId,
)
send(
msg.key.toResponseKey(),
Expand All @@ -154,16 +142,16 @@ open class KafkaClient<T : Any>(
return adapter.toJson(obj)
}

private suspend fun onTopicRecord(key: String, value: String, headers: KafkaRecordHeaders) = coroutineScope {
private suspend fun onTopicMessage(key: String, value: String, headers: BaseBrokerMessageHeaders) = coroutineScope {
val obj = parse(value)
val msg = KafkaMessage(this@KafkaClient, key, obj, headers)
val msg = BrokerMessage(this@BrokerClient, key, obj, headers)
val listeners = keyListeners[key]
for (listener in listeners ?: return@coroutineScope) {
launch(Dispatchers.Default) {
launch {
try {
listener.onMessage(msg)
} catch (t: Throwable) {
log.error("Uncaught error in KafkaClient listener", t)
log.error("Uncaught error in BrokerClient listener", t)
}
}
}
Expand Down
110 changes: 110 additions & 0 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package gg.beemo.latte.broker

import gg.beemo.latte.logging.log
import kotlinx.coroutines.*
import java.util.*
import java.util.concurrent.CopyOnWriteArraySet

fun interface TopicListener {
suspend fun onMessage(key: String, value: String, headers: BaseBrokerMessageHeaders)
}

abstract class BrokerConnection {

abstract val clientId: String
abstract val clusterId: String

protected val topicListeners: MutableMap<String, MutableSet<TopicListener>> = Collections.synchronizedMap(HashMap())

private val eventDispatcherScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
private val eventDispatcherErrorHandler = CoroutineExceptionHandler { _, t ->
log.error("Uncaught error in BrokerConnection event handler", t)
}

abstract fun start()
open fun destroy() {
topicListeners.clear()
eventDispatcherScope.cancel()
}

abstract suspend fun send(
topic: String,
key: String,
value: String,
headers: BaseBrokerMessageHeaders,
blocking: Boolean = true,
): String

abstract fun createHeaders(
targetClusters: Set<String>? = null,
requestId: String? = null
): BaseBrokerMessageHeaders

open fun on(topic: String, cb: TopicListener) {
topicListeners.computeIfAbsent(topic) { CopyOnWriteArraySet() }.add(cb)
}

open fun off(topic: String, cb: TopicListener) {
topicListeners.computeIfPresent(topic) { _, listeners ->
listeners.remove(cb)
if (listeners.size == 0) {
null
} else {
listeners
}
}
}

protected fun shouldDispatchExternallyAfterShortCircuit(
topic: String,
key: String,
value: String,
headers: BaseBrokerMessageHeaders
): Boolean {
val targets = headers.targetClusters
val isLocalClusterInTargets = clusterId in targets

// If the message is meant for ourselves (amongst other clusters),
// immediately dispatch it to the listeners.
if (targets.isEmpty() || isLocalClusterInTargets) {
invokeCallbacks(topic, key, value, headers)
}

val isLocalClusterTheOnlyTarget = targets.size == 1 && isLocalClusterInTargets
// Return whether implementers should dispatch this message to external services
return targets.isEmpty() || !isLocalClusterTheOnlyTarget
}

protected fun handleIncomingMessage(
topic: String,
key: String,
value: String,
headers: BaseBrokerMessageHeaders
) {
if (headers.targetClusters.isNotEmpty() && clusterId !in headers.targetClusters) {
// If there is a target cluster restriction and this message wasn't meant for us,
// discard it immediately without notifying any listeners.
return
}
if (headers.sourceCluster == clusterId) {
// If this message was sent by ourselves, discard it too, as we already dispatch events
// to our listeners in `send()` to avoid the round trip through an external service.
return
}
invokeCallbacks(topic, key, value, headers)
}

private fun invokeCallbacks(topic: String, key: String, value: String, headers: BaseBrokerMessageHeaders) {
eventDispatcherScope.launch(eventDispatcherErrorHandler) {
val listeners = topicListeners[topic]
for (listener in listeners ?: return@launch) {
try {
listener.onMessage(key, value, headers)
} catch (t: Throwable) {
log.error("Uncaught error in BrokerConnection listener", t)
}
}
}
}

}
14 changes: 14 additions & 0 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package gg.beemo.latte.broker

data class BrokerMessage<T : Any>(
val client: BrokerClient<T>,
val key: String,
val value: T?,
val headers: BaseBrokerMessageHeaders
) {

suspend fun respond(data: T?, blocking: Boolean = true) {
client.respond(this, data, blocking)
}

}
31 changes: 31 additions & 0 deletions latte/src/main/java/gg/beemo/latte/broker/LocalConnection.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package gg.beemo.latte.broker

class LocalConnection(
override val clusterId: String = "local-cluster-id",
) : BrokerConnection() {

override val clientId = "local-client-id"

override suspend fun send(
topic: String,
key: String,
value: String,
headers: BaseBrokerMessageHeaders,
blocking: Boolean,
): String {
if (shouldDispatchExternallyAfterShortCircuit(topic, key, value, headers) && headers.targetClusters.isNotEmpty()) {
throw IllegalArgumentException("Attempting to send message to other cluster(s) ${headers.targetClusters} in a LocalConnection")
}

return headers.requestId
}

override fun start() {
// Nothing to start :)
}

override fun createHeaders(targetClusters: Set<String>?, requestId: String?): BaseBrokerMessageHeaders {
return BaseBrokerMessageHeaders(this.clientId, this.clusterId, targetClusters, requestId)
}

}

0 comments on commit 27ddd03

Please sign in to comment.