Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[latte] Fix BrokerClient topic handling #11

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 132 additions & 59 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,8 @@ import gg.beemo.latte.broker.rpc.RpcResponse
import gg.beemo.latte.logging.Log
import kotlinx.coroutines.*
import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean

private class TopicMetadata(
val topic: String,
val keys: MutableMap<String, KeyMetadata>,
var connectionListener: TopicListener? = null,
)

private class KeyMetadata(
val topic: TopicMetadata,
val producers: MutableSet<ProducerSubclient<*>>,
val consumers: MutableSet<ConsumerSubclient<*>>,
)

abstract class BrokerClient(
@PublishedApi
Expand Down Expand Up @@ -48,7 +38,7 @@ abstract class BrokerClient(
): ConsumerSubclient<T> {
log.debug("Creating consumer for key '{}' in topic '{}' with type {}", key, topic, type.name)
return ConsumerSubclient(connection, this, topic, key, options, type, isNullable, callback).also {
registerConsumer(it)
registerSubclient(it)
}
}

Expand All @@ -70,7 +60,7 @@ abstract class BrokerClient(
): ProducerSubclient<T> {
log.debug("Creating producer for key '{}' in topic '{}' with type {}", key, topic, type.name)
return ProducerSubclient(connection, this, topic, key, options, type, isNullable).also {
registerProducer(it)
registerSubclient(it)
}
}

Expand All @@ -93,82 +83,149 @@ abstract class BrokerClient(
)
}

fun destroy(cancelScope: Boolean = true) {
val producers = topics.values.flatMap { metadata -> metadata.keys.values.flatMap { it.producers } }
val consumers = topics.values.flatMap { metadata -> metadata.keys.values.flatMap { it.consumers } }
producers.forEach {
it.destroy()
private fun registerSubclient(subclient: BaseSubclient) {
val topic = subclient.topic
val metadata = topics.computeIfAbsent(topic) {
TopicMetadata(connection, consumerScope, topic)
}
metadata.registerSubclient(subclient)
}

internal fun deregisterSubclient(subclient: BaseSubclient) {
val topic = subclient.topic
topics[topic]?.let {
it.deregisterSubclient(subclient)
if (it.isEmpty) {
it.destroy()
topics.remove(topic)
}
}
consumers.forEach {
it.destroy()
}

fun destroy(cancelScope: Boolean = true) {
log.debug("Destroying BrokerClient of type {} with active topics {}", javaClass.simpleName, topics.keys)
while (topics.isNotEmpty()) {
val topic = topics.keys.first()
topics[topic]?.destroy()
topics.remove(topic)
}
topics.clear()
if (cancelScope) {
consumerScope.cancel()
}
}

private fun registerProducer(producer: ProducerSubclient<*>) {
val metadata = getOrCreateKeyMetadata(producer.topic, producer.key)
metadata.producers.add(producer)
}
internal fun toResponseTopic(topic: String): String =
if (connection.supportsTopicHotSwap) "$topic.responses" else topic

private fun registerConsumer(consumer: ConsumerSubclient<*>) {
val metadata = getOrCreateKeyMetadata(consumer.topic, consumer.key)
if (metadata.consumers.isEmpty() && metadata.topic.connectionListener == null) {
// New consumer - create a new connection listener for this topic
val listener = TopicListener { topic, key, value, headers ->
onTopicMessage(topic, key, value, headers)
}
connection.on(consumer.topic, listener)
metadata.topic.connectionListener = listener
internal fun toResponseKey(key: String): String = "$key.response"

}

private class TopicMetadata(
private val connection: BrokerConnection,
private val consumerScope: CoroutineScope,
private val topic: String,
) {

private class KeyMetadata(val key: String) {
val producers: MutableSet<ProducerSubclient<*>> = Collections.synchronizedSet(HashSet())
val consumers: MutableSet<ConsumerSubclient<*>> = Collections.synchronizedSet(HashSet())

val isEmpty: Boolean
get() = producers.isEmpty() && consumers.isEmpty()

fun destroy() {
producers.forEach(ProducerSubclient<*>::destroy)
consumers.forEach(ConsumerSubclient<*>::destroy)
producers.clear()
consumers.clear()
}
metadata.consumers.add(consumer)
}

internal fun deregisterProducer(producer: ProducerSubclient<*>) {
log.debug("Removing producer for key '{}' in topic '{}'", producer.key, producer.topic)
val metadata = getExistingKeyMetadata(producer.topic, producer.key)
metadata?.producers?.remove(producer)
private val log by Log
private val _keys: MutableMap<String, KeyMetadata> = Collections.synchronizedMap(HashMap())
private val isBeingDestroyed = AtomicBoolean(false)
val isEmpty: Boolean
get() = _keys.isEmpty()

@Volatile
private var connectionListener: TopicListener? = null

fun registerSubclient(subclient: BaseSubclient) {
log.debug(
"Adding {} for key '{}' in topic '{}'",
subclient.javaClass.simpleName,
subclient.key,
subclient.topic
)
val metadata = getOrCreateKeyMetadata(subclient.key)
when (subclient) {
is ConsumerSubclient<*> -> {
if (metadata.consumers.isEmpty() && connectionListener == null) {
log.debug("Creating new connection listener for topic '{}'", subclient.topic)
// New consumer - create a new connection listener for this topic
val listener = TopicListener { topic, key, value, headers ->
onTopicMessage(topic, key, value, headers)
}
connection.on(subclient.topic, listener)
connectionListener = listener
}
metadata.consumers.add(subclient)
}

is ProducerSubclient<*> -> {
metadata.producers.add(subclient)
}
}
}

internal fun deregisterConsumer(consumer: ConsumerSubclient<*>) {
log.debug("Removing consumer for key '{}' in topic '{}'", consumer.key, consumer.topic)
val metadata = getExistingKeyMetadata(consumer.topic, consumer.key)
if (metadata?.consumers?.remove(consumer) == true && metadata.consumers.isEmpty()) {
metadata.topic.connectionListener?.let {
connection.off(metadata.topic.topic, it)
metadata.topic.connectionListener = null
fun deregisterSubclient(subclient: BaseSubclient) {
log.debug(
"Removing {} for key '{}' in topic '{}'",
subclient.javaClass.simpleName,
subclient.key,
subclient.topic
)
val metadata = getExistingKeyMetadata(subclient.key)
metadata?.let {
when (subclient) {
is ConsumerSubclient<*> -> it.consumers.remove(subclient)
is ProducerSubclient<*> -> it.producers.remove(subclient)
}
maybeCleanupKeyMetadata(it)
}
}

private fun getOrCreateKeyMetadata(topic: String, key: String): KeyMetadata {
val topicData = topics.computeIfAbsent(topic) {
TopicMetadata(topic, Collections.synchronizedMap(HashMap()))
private fun maybeCleanupKeyMetadata(keyMetadata: KeyMetadata) {
if (keyMetadata.isEmpty) {
_keys.remove(keyMetadata.key)
}
val keyData = topicData.keys.computeIfAbsent(key) {
KeyMetadata(topicData, Collections.synchronizedSet(HashSet()), Collections.synchronizedSet(HashSet()))
if (this.isEmpty) {
connectionListener?.let {
log.debug("Removing connection listener for topic '{}' after key cleanup", topic)
connection.off(topic, it)
connectionListener = null
}
}
return keyData
}

private fun getExistingKeyMetadata(topic: String, key: String): KeyMetadata? {
return topics[topic]?.keys?.get(key)
private fun getOrCreateKeyMetadata(key: String): KeyMetadata {
return _keys.computeIfAbsent(key) {
KeyMetadata(key)
}
}

internal fun toResponseTopic(topic: String): String =
if (connection.supportsTopicHotSwap) "$topic.responses" else topic

internal fun toResponseKey(key: String): String = "$key.response"
private fun getExistingKeyMetadata(key: String): KeyMetadata? {
return _keys[key]
}

private fun onTopicMessage(
topic: String,
key: String,
value: String,
headers: BrokerMessageHeaders,
) {
val metadata = getExistingKeyMetadata(topic, key) ?: return
val metadata = getExistingKeyMetadata(key) ?: return
for (consumer in metadata.consumers) {
consumerScope.launch {
try {
Expand All @@ -180,6 +237,22 @@ abstract class BrokerClient(
}
}

fun destroy() {
if (!isBeingDestroyed.compareAndSet(false, true)) {
return
}
while (_keys.isNotEmpty()) {
val key = _keys.keys.first()
_keys[key]?.destroy()
_keys.remove(key)
}
connectionListener?.let {
log.debug("Removing connection listener for topic '{}' in destroy()", topic)
connection.off(topic, it)
connectionListener = null
}
}

}

@PublishedApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ abstract class BrokerConnection {

abstract suspend fun start()
open fun destroy() {
log.debug("Destroying BrokerConnection")
topicListeners.clear()
}

Expand Down
20 changes: 15 additions & 5 deletions latte/src/main/java/gg/beemo/latte/broker/Subclients.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import gg.beemo.latte.util.MoshiUnitAdapter
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean

data class BrokerClientOptions(
val useSafeJsLongs: Boolean = false,
Expand All @@ -23,7 +24,16 @@ abstract class BaseSubclient(
protected val options: BrokerClientOptions,
) {

internal abstract fun destroy()
private val log by Log
private val isBeingDestroyed = AtomicBoolean(false)

internal fun destroy() {
if (isBeingDestroyed.compareAndSet(false, true)) {
doDestroy()
}
}

protected abstract fun doDestroy()

protected fun <T> createMoshiAdapter(type: Class<T>): JsonAdapter<T?> {
val mochi = if (options.useSafeJsLongs) safeJsMoshi else baseMoshi
Expand Down Expand Up @@ -62,8 +72,8 @@ class ProducerSubclient<T>(
private val log by Log
private val adapter: JsonAdapter<T?> = createMoshiAdapter(requestType)

override fun destroy() {
client.deregisterProducer(this)
override fun doDestroy() {
client.deregisterSubclient(this)
}

suspend fun send(
Expand Down Expand Up @@ -128,8 +138,8 @@ class ConsumerSubclient<T>(
private val log by Log
private val adapter: JsonAdapter<T?> = createMoshiAdapter(incomingType)

override fun destroy() {
client.deregisterConsumer(this)
override fun doDestroy() {
client.deregisterSubclient(this)
}

internal suspend fun onIncomingMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class KafkaConnection(
}

override fun destroy() {
log.debug("Destroying KafkaConnection")
consumer?.close()
consumer = null
producer?.close()
Expand Down
2 changes: 1 addition & 1 deletion latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class RpcClient<RequestT, ResponseT>(

}

override fun destroy() {
override fun doDestroy() {
requestProducer.destroy()
requestConsumer.destroy()
}
Expand Down
2 changes: 1 addition & 1 deletion latte/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Configuration status="INFO" shutdownHook="disable">
<Properties>
<Property name="PATTERN">%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight{%-5level}{FATAL=bg_bright_red, ERROR=bright_red, WARN=bright_yellow, INFO=bright_green, DEBUG=bright_cyan, TRACE=bright_white} [%style{%t}{bright_white}] %style{%logger{36}}{white}: %msg%n%ex</Property>
</Properties>
Expand Down
11 changes: 10 additions & 1 deletion vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import gg.beemo.latte.CommonConfig
import gg.beemo.latte.broker.kafka.KafkaConnection
import gg.beemo.latte.config.Configurator
import gg.beemo.latte.logging.Log
import gg.beemo.latte.logging.log
import kotlinx.coroutines.runBlocking
import org.apache.logging.log4j.LogManager

object Vanilla {

Expand All @@ -25,7 +27,14 @@ object Vanilla {
)

log.debug("Initializing Kafka Ratelimit client")
RatelimitClient(brokerConnection)
val ratelimitClient = RatelimitClient(brokerConnection)

Runtime.getRuntime().addShutdownHook(Thread({
log.info("Destroying everything")
ratelimitClient.destroy()
brokerConnection.destroy()
LogManager.shutdown(true, true)
}, "Vanilla Shutdown Hook"))

log.debug("Starting Kafka connection")
brokerConnection.start()
Expand Down
Loading