diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt index 646bfe7..95470f6 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt @@ -1,5 +1,8 @@ package gg.beemo.latte.broker +import gg.beemo.latte.broker.rpc.BaseRpcRequestMessage +import gg.beemo.latte.broker.rpc.RpcClient +import gg.beemo.latte.broker.rpc.RpcResponse import gg.beemo.latte.logging.Log import kotlinx.coroutines.* import java.util.Collections diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt index aa01e11..5844014 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt @@ -1,5 +1,10 @@ package gg.beemo.latte.broker +import gg.beemo.latte.broker.rpc.RpcMessageHeaders +import gg.beemo.latte.broker.rpc.RpcRequestMessage +import gg.beemo.latte.broker.rpc.RpcResponseMessage +import gg.beemo.latte.broker.rpc.RpcStatus + open class BrokerMessage( val topic: String, val key: String, @@ -24,26 +29,3 @@ open class BrokerMessage( typealias AbstractBrokerMessage = BrokerMessage typealias BaseBrokerMessage = BrokerMessage -typealias BaseRpcRequestMessage = RpcRequestMessage - -class RpcRequestMessage( - topic: String, - key: String, - value: RequestT, - headers: H, - private val updateSender: suspend (RpcStatus, ResponseT) -> Unit, -) : BrokerMessage(topic, key, value, headers) { - - suspend fun sendUpdate(status: RpcStatus, response: ResponseT) { - updateSender(status, response) - } - -} - -class RpcResponseMessage(topic: String, key: String, value: ResponseT, headers: RpcMessageHeaders) : - BrokerMessage(topic, key, value, headers) { - - val status: RpcStatus - get() = headers.status - -} diff --git a/latte/src/main/java/gg/beemo/latte/broker/Exceptions.kt b/latte/src/main/java/gg/beemo/latte/broker/Exceptions.kt index 33700c4..749cb37 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/Exceptions.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/Exceptions.kt @@ -1,5 +1,7 @@ package gg.beemo.latte.broker +import gg.beemo.latte.broker.rpc.RpcStatus + sealed class BrokerException(message: String?) : Exception(message) class RpcRequestTimeout(message: String) : BrokerException(message) class IgnoreRpcRequest : BrokerException("Ignoring RPC request") diff --git a/latte/src/main/java/gg/beemo/latte/broker/Subclients.kt b/latte/src/main/java/gg/beemo/latte/broker/Subclients.kt new file mode 100644 index 0000000..763e180 --- /dev/null +++ b/latte/src/main/java/gg/beemo/latte/broker/Subclients.kt @@ -0,0 +1,171 @@ +package gg.beemo.latte.broker + +import com.squareup.moshi.JsonAdapter +import com.squareup.moshi.Moshi +import gg.beemo.latte.broker.rpc.RpcMessageHeaders +import gg.beemo.latte.logging.Log +import gg.beemo.latte.util.MoshiInstantAdapter +import gg.beemo.latte.util.MoshiJsLongAdapter +import gg.beemo.latte.util.MoshiUnitAdapter +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.coroutineScope +import java.time.Instant + +data class BrokerClientOptions( + val useSafeJsLongs: Boolean = false, +) + +abstract class BaseSubclient( + protected val connection: BrokerConnection, + protected val client: BrokerClient, + val topic: String, + val key: String, + protected val options: BrokerClientOptions, +) { + + internal abstract fun destroy() + + protected fun createMoshiAdapter(type: Class): JsonAdapter { + val mochi = if (options.useSafeJsLongs) safeJsMoshi else baseMoshi + return mochi.adapter(type).nullSafe() + } + + companion object { + private val baseMoshi: Moshi = Moshi.Builder() + .add(Unit::class.java, MoshiUnitAdapter()) + .add(Instant::class.java, MoshiInstantAdapter()) + .build() + private val safeJsMoshi: Moshi = baseMoshi + .newBuilder() + .add(Long::class.java, MoshiJsLongAdapter()) + .build() + } + +} + +class ProducerSubclient( + connection: BrokerConnection, + client: BrokerClient, + topic: String, + key: String, + options: BrokerClientOptions, + requestType: Class, + private val isNullable: Boolean, +) : BaseSubclient( + connection, + client, + topic, + key, + options, +) { + + private val log by Log + private val adapter: JsonAdapter = createMoshiAdapter(requestType) + + override fun destroy() { + client.deregisterProducer(this) + } + + suspend fun send( + data: T, + services: Set = emptySet(), + instances: Set = emptySet(), + ): MessageId { + val msg = BrokerMessage( + topic, + key, + data, + BrokerMessageHeaders( + connection, + targetServices = services, + targetInstances = instances, + ), + ) + @Suppress("UNCHECKED_CAST") + return internalSend(msg as AbstractBrokerMessage) + } + + internal suspend fun internalSend(msg: AbstractBrokerMessage, bypassNullCheck: Boolean = false): MessageId { + if (!bypassNullCheck && !isNullable) { + requireNotNull(msg.value) { + "Cannot send null message for non-nullable type with key '$key' in topic '$topic'" + } + } + val strigifiedData = stringifyOutgoing(msg.value) + log.trace( + "Sending message {} with key '{}' in topic '{}' with value: {}", + msg.messageId, + key, + topic, + strigifiedData, + ) + return connection.send(topic, key, strigifiedData, msg.headers) + } + + private fun stringifyOutgoing(data: T?): String { + return adapter.toJson(data) + } + +} + +class ConsumerSubclient( + connection: BrokerConnection, + client: BrokerClient, + topic: String, + key: String, + options: BrokerClientOptions, + incomingType: Class, + private val isNullable: Boolean, + private val callback: suspend CoroutineScope.(BaseBrokerMessage) -> Unit, +) : BaseSubclient( + connection, + client, + topic, + key, + options, +) { + + private val log by Log + private val adapter: JsonAdapter = createMoshiAdapter(incomingType) + + override fun destroy() { + client.deregisterConsumer(this) + } + + internal suspend fun onIncomingMessage( + value: String, + headers: BrokerMessageHeaders, + ) = coroutineScope { + val data = parseIncoming(value) + // Disable nullability enforcement for RPC exceptions. The caller has to deal with the unsafe typing now. + if (!isNullable && (headers !is RpcMessageHeaders || !headers.isException)) { + checkNotNull(data) { + "Received null message for non-nullable type with key '$key' in topic '$topic'" + } + } + val message = BrokerMessage(topic, key, data, headers) + log.trace( + "Received message {} with key '{}' in topic '{}' with value: {}", + headers.messageId, + key, + topic, + value, + ) + @Suppress("UNCHECKED_CAST") // Safe due to above null validation + val brokerMessage = message as BaseBrokerMessage + try { + callback(brokerMessage) + } catch (ex: Exception) { + log.error( + "Uncaught consumer callback error while processing message ${headers.messageId} " + + "with key '$key' in topic '$topic'", + ex, + ) + } + } + + private fun parseIncoming(json: String): T? { + return adapter.fromJson(json) + } + +} diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt similarity index 54% rename from latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt rename to latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt index 4fb48c0..845888d 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt @@ -1,185 +1,19 @@ -package gg.beemo.latte.broker +package gg.beemo.latte.broker.rpc -import com.squareup.moshi.JsonAdapter -import com.squareup.moshi.Moshi +import gg.beemo.latte.broker.* import gg.beemo.latte.logging.Log -import gg.beemo.latte.util.MoshiInstantAdapter -import gg.beemo.latte.util.MoshiJsLongAdapter -import gg.beemo.latte.util.MoshiUnitAdapter import gg.beemo.latte.util.SuspendingCountDownLatch import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.TimeoutCancellationException -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.single -import java.time.Instant import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds -data class BrokerClientOptions( - val useSafeJsLongs: Boolean = false, -) - -sealed class BaseSubclient( - protected val connection: BrokerConnection, - protected val client: BrokerClient, - val topic: String, - val key: String, - protected val options: BrokerClientOptions, -) { - - internal abstract fun destroy() - - protected fun createMoshiAdapter(type: Class): JsonAdapter { - val mochi = if (options.useSafeJsLongs) safeJsMoshi else baseMoshi - return mochi.adapter(type).nullSafe() - } - - companion object { - private val baseMoshi: Moshi = Moshi.Builder() - .add(Unit::class.java, MoshiUnitAdapter()) - .add(Instant::class.java, MoshiInstantAdapter()) - .build() - private val safeJsMoshi: Moshi = baseMoshi - .newBuilder() - .add(Long::class.java, MoshiJsLongAdapter()) - .build() - } - -} - -class ProducerSubclient( - connection: BrokerConnection, - client: BrokerClient, - topic: String, - key: String, - options: BrokerClientOptions, - requestType: Class, - private val isNullable: Boolean, -) : BaseSubclient( - connection, - client, - topic, - key, - options, -) { - - private val log by Log - private val adapter: JsonAdapter = createMoshiAdapter(requestType) - - override fun destroy() { - client.deregisterProducer(this) - } - - suspend fun send( - data: T, - services: Set = emptySet(), - instances: Set = emptySet(), - ): MessageId { - val msg = BrokerMessage( - topic, - key, - data, - BrokerMessageHeaders( - connection, - targetServices = services, - targetInstances = instances, - ), - ) - @Suppress("UNCHECKED_CAST") - return internalSend(msg as AbstractBrokerMessage) - } - - internal suspend fun internalSend(msg: AbstractBrokerMessage, bypassNullCheck: Boolean = false): MessageId { - if (!bypassNullCheck && !isNullable) { - requireNotNull(msg.value) { - "Cannot send null message for non-nullable type with key '$key' in topic '$topic'" - } - } - val strigifiedData = stringifyOutgoing(msg.value) - log.trace( - "Sending message {} with key '{}' in topic '{}' with value: {}", - msg.messageId, - key, - topic, - strigifiedData, - ) - return connection.send(topic, key, strigifiedData, msg.headers) - } - - private fun stringifyOutgoing(data: T?): String { - return adapter.toJson(data) - } - -} - -class ConsumerSubclient( - connection: BrokerConnection, - client: BrokerClient, - topic: String, - key: String, - options: BrokerClientOptions, - incomingType: Class, - private val isNullable: Boolean, - private val callback: suspend CoroutineScope.(BaseBrokerMessage) -> Unit, -) : BaseSubclient( - connection, - client, - topic, - key, - options, -) { - - private val log by Log - private val adapter: JsonAdapter = createMoshiAdapter(incomingType) - - override fun destroy() { - client.deregisterConsumer(this) - } - - internal suspend fun onIncomingMessage( - value: String, - headers: BrokerMessageHeaders, - ) = coroutineScope { - val data = parseIncoming(value) - // Disable nullability enforcement for RPC exceptions. The caller has to deal with the unsafe typing now. - if (!isNullable && (headers !is RpcMessageHeaders || !headers.isException)) { - checkNotNull(data) { - "Received null message for non-nullable type with key '$key' in topic '$topic'" - } - } - val message = BrokerMessage(topic, key, data, headers) - log.trace( - "Received message {} with key '{}' in topic '{}' with value: {}", - headers.messageId, - key, - topic, - value, - ) - @Suppress("UNCHECKED_CAST") // Safe due to above null validation - val brokerMessage = message as BaseBrokerMessage - try { - callback(brokerMessage) - } catch (ex: Exception) { - log.error( - "Uncaught consumer callback error while processing message ${headers.messageId} " + - "with key '$key' in topic '$topic'", - ex, - ) - } - } - - private fun parseIncoming(json: String): T? { - return adapter.fromJson(json) - } - -} - -typealias RpcResponse = Pair class RpcClient( client: BrokerClient, diff --git a/latte/src/main/java/gg/beemo/latte/broker/RpcMessageHeaders.kt b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcMessageHeaders.kt similarity index 90% rename from latte/src/main/java/gg/beemo/latte/broker/RpcMessageHeaders.kt rename to latte/src/main/java/gg/beemo/latte/broker/rpc/RpcMessageHeaders.kt index e103324..4545143 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/RpcMessageHeaders.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcMessageHeaders.kt @@ -1,4 +1,9 @@ -package gg.beemo.latte.broker +package gg.beemo.latte.broker.rpc + +import gg.beemo.latte.broker.BrokerConnection +import gg.beemo.latte.broker.BrokerMessageHeaders +import gg.beemo.latte.broker.MessageId +import gg.beemo.latte.broker.getOrThrow class RpcMessageHeaders(headers: Map) : BrokerMessageHeaders(headers) { diff --git a/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcMessages.kt b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcMessages.kt new file mode 100644 index 0000000..7bbb076 --- /dev/null +++ b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcMessages.kt @@ -0,0 +1,30 @@ +package gg.beemo.latte.broker.rpc + +import gg.beemo.latte.broker.BrokerMessage +import gg.beemo.latte.broker.BrokerMessageHeaders + +typealias RpcResponse = Pair + +typealias BaseRpcRequestMessage = RpcRequestMessage + +class RpcRequestMessage( + topic: String, + key: String, + value: RequestT, + headers: H, + private val updateSender: suspend (RpcStatus, ResponseT) -> Unit, +) : BrokerMessage(topic, key, value, headers) { + + suspend fun sendUpdate(status: RpcStatus, response: ResponseT) { + updateSender(status, response) + } + +} + +class RpcResponseMessage(topic: String, key: String, value: ResponseT, headers: RpcMessageHeaders) : + BrokerMessage(topic, key, value, headers) { + + val status: RpcStatus + get() = headers.status + +} diff --git a/latte/src/main/java/gg/beemo/latte/broker/RpcStatus.kt b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcStatus.kt similarity index 91% rename from latte/src/main/java/gg/beemo/latte/broker/RpcStatus.kt rename to latte/src/main/java/gg/beemo/latte/broker/rpc/RpcStatus.kt index 01840f9..9d10128 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/RpcStatus.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcStatus.kt @@ -1,4 +1,4 @@ -package gg.beemo.latte.broker +package gg.beemo.latte.broker.rpc class RpcStatus(val code: Int) { diff --git a/latte/src/test/kotlin/gg/beemo/latte/broker/BrokerClientTest.kt b/latte/src/test/kotlin/gg/beemo/latte/broker/BrokerClientTest.kt index 1bb6a36..e7e61dd 100644 --- a/latte/src/test/kotlin/gg/beemo/latte/broker/BrokerClientTest.kt +++ b/latte/src/test/kotlin/gg/beemo/latte/broker/BrokerClientTest.kt @@ -1,5 +1,6 @@ package gg.beemo.latte.broker +import gg.beemo.latte.broker.rpc.RpcStatus import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test diff --git a/latte/src/test/kotlin/gg/beemo/latte/broker/TestBrokerClient.kt b/latte/src/test/kotlin/gg/beemo/latte/broker/TestBrokerClient.kt index a238f34..2f673a9 100644 --- a/latte/src/test/kotlin/gg/beemo/latte/broker/TestBrokerClient.kt +++ b/latte/src/test/kotlin/gg/beemo/latte/broker/TestBrokerClient.kt @@ -1,6 +1,7 @@ package gg.beemo.latte.broker import com.squareup.moshi.JsonClass +import gg.beemo.latte.broker.rpc.RpcStatus import gg.beemo.latte.logging.Log import kotlinx.coroutines.CoroutineScope import org.junit.jupiter.api.Assertions diff --git a/vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt b/vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt index 1792d98..5ff4104 100644 --- a/vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt +++ b/vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt @@ -3,7 +3,7 @@ package gg.beemo.vanilla import gg.beemo.latte.broker.BrokerClient import gg.beemo.latte.broker.BrokerConnection import gg.beemo.latte.broker.IgnoreRpcRequest -import gg.beemo.latte.broker.RpcStatus +import gg.beemo.latte.broker.rpc.RpcStatus import gg.beemo.latte.logging.Log import gg.beemo.latte.ratelimit.SharedRatelimitData import gg.beemo.latte.util.SuspendingRatelimit