diff --git a/latte/src/main/java/gg/beemo/latte/TestBrokerClient.kt b/latte/src/main/java/gg/beemo/latte/TestBrokerClient.kt index fb453b3..634a46d 100644 --- a/latte/src/main/java/gg/beemo/latte/TestBrokerClient.kt +++ b/latte/src/main/java/gg/beemo/latte/TestBrokerClient.kt @@ -61,10 +61,11 @@ class TestBrokerClient(connection: BrokerConnection) : BrokerClient(connection) // This is something that needs to be adapted at the connection level as well. val response = greetingRpc.call( GreetingRequest(name), - service = BrokerServices.TEA, - instance = "0", + services = setOf(BrokerServices.TEA), + instances = setOf("0"), + timeout = 5.seconds, ) - return response.name + return response.greeting } suspend fun enqueueRaidBan(user: RaidUser) { diff --git a/latte/src/main/java/gg/beemo/latte/broker/BaseBrokerMessageHeaders.kt b/latte/src/main/java/gg/beemo/latte/broker/BaseBrokerMessageHeaders.kt index 40331e3..c3d3811 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BaseBrokerMessageHeaders.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BaseBrokerMessageHeaders.kt @@ -3,14 +3,15 @@ package gg.beemo.latte.broker import java.util.* open class BaseBrokerMessageHeaders( - val clientId: String, - val sourceCluster: String, - targetClusters: Set?, // // Empty = No target restriction - requestId: String?, + val clientId: String?, + val sourceService: String, + val sourceInstance: String?, + val targetServices: Set, + val targetInstances: Set, + val inReplyTo: String? = null, ) { - val targetClusters: Set = targetClusters ?: emptySet() - val requestId: String = requestId ?: UUID.randomUUID().toString() + val messageId = UUID.randomUUID().toString() companion object { diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt index 756cbe1..efff5ed 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt @@ -4,6 +4,7 @@ import com.squareup.moshi.JsonAdapter import com.squareup.moshi.Moshi import gg.beemo.latte.logging.log import gg.beemo.latte.util.SuspendingCountDownLatch +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import java.util.Collections @@ -11,6 +12,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.AtomicReference import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds fun interface BrokerEventListener { suspend fun onMessage(msg: BrokerMessage) @@ -20,81 +22,175 @@ fun interface BrokerMessageListener { suspend fun onMessage(clusterId: String, msg: BrokerMessage) } -private class TopicMetadata( - val topic: String, - val keys: MutableMap> -) - -class RpcClient -class TopicKeyClient( - private val connection: BrokerConnection, - private val client: BrokerClient, +sealed class BaseClient( + protected val connection: BrokerConnection, + protected val client: BrokerClient, val topic: String, val key: String, - val type: Class, + protected val outgoingType: Class?, + protected val incomingType: Class?, + callback: (suspend CoroutineScope.(IncomingT) -> OutgoingT)?, +) { + + // TODO Should this class still exist? + + companion object { + // TODO Investigate if it's a good idea to add a global custom type adapter + // to serialize large Longs as Strings for easier JS compatibility. + // https://github.com/square/moshi#custom-type-adapters + @JvmStatic + protected val moshi: Moshi = Moshi.Builder().build() + } + +} + + +class ProducerClient( + connection: BrokerConnection, + client: BrokerClient, + topic: String, + key: String, + outgoingType: Class, +) : BaseClient( + connection, + client, + topic, + key, + outgoingType, + null, + null, ) { - private val adapter: JsonAdapter = moshi.adapter(type).nullSafe() - private val listeners: MutableList> = CopyOnWriteArrayList() + private val adapter: JsonAdapter = moshi.adapter(outgoingType).nullSafe() suspend fun send( - topic: String, - key: String, - obj: T?, - headers: BaseBrokerMessageHeaders = this.connection.createHeaders(), - ): String { - return connection.send(topic, key, stringify(obj), headers) + data: OutgoingT, + services: Set = emptySet(), + instances: Set = emptySet(), + ) { + val strigifiedData = stringifyOutgoing(data) + val headers = connection.createHeaders(services, instances) + connection.send(topic, key, strigifiedData, headers) + } + + private fun stringifyOutgoing(data: OutgoingT?): String { + return adapter.toJson(data) } - private fun parse(json: String): T? { +} + +class ConsumerClient( + connection: BrokerConnection, + client: BrokerClient, + topic: String, + key: String, + incomingType: Class, + callback: suspend CoroutineScope.(IncomingT) -> Unit, +) : BaseClient( + connection, + client, + topic, + key, + null, + incomingType, + callback, +) { + + private val adapter: JsonAdapter = moshi.adapter(incomingType).nullSafe() + + private fun parseIncoming(json: String): IncomingT? { return adapter.fromJson(json) } - private fun stringify(obj: T?): String { - return adapter.toJson(obj) +} + + +class RpcClient( + connection: BrokerConnection, + client: BrokerClient, + topic: String, + key: String, + outgoingType: Class, + incomingType: Class, + callback: suspend CoroutineScope.(IncomingT) -> OutgoingT, +) : BaseClient( + connection, + client, + topic, + key, + outgoingType, + incomingType, + callback, +) { + + private val producer = ProducerClient(connection, client, topic, key, outgoingType) + private val consumer = ConsumerClient(connection, client, topic, key, incomingType) { + TODO() } - companion object { - // TODO Investigate if it's a good idea to add a global custom type adapter - // to serialize large Longs as Strings for easier JS compatibility. - // https://github.com/square/moshi#custom-type-adapters - private val moshi = Moshi.Builder().build() + suspend fun call( + request: OutgoingT, + services: Set = emptySet(), + instances: Set = emptySet(), + timeout: Duration = 10.seconds, + ): IncomingT { + producer.send(request, services, instances) + TODO() } } + +private class TopicMetadata( + val topic: String, + val keys: MutableMap> +) + abstract class BrokerClient( - protected val connection: BrokerConnection, + @PublishedApi + internal val connection: BrokerConnection, ) { - private val topics: MutableMap = Collections.synchronizedMap(HashMap()) + inline fun consumer( + topic: String, + key: String, + noinline callback: suspend CoroutineScope.(T) -> Unit, + ): ConsumerClient { + return ConsumerClient(connection, this, topic, key, T::class.java, callback) + } - init { - log.debug("Initializing Broker Client with topics '{}' for objects of type {}", topics, type.name) - connection.on(topics, ::onTopicMessage) + inline fun producer( + topic: String, + key: String, + ): ProducerClient { + return ProducerClient(connection, this, topic, key, T::class.java) } - protected suspend inline fun sendRpcRequest( + inline fun rpc( topic: String, key: String, - obj: T?, - timeout: Duration = Duration.INFINITE, - ): BrokerMessage { - return sendRpcRequest(topic, key, T::class.java, obj, timeout) + noinline callback: suspend CoroutineScope.(IncomingT) -> OutgoingT, + ): RpcClient { + return RpcClient(connection, this, topic, key, IncomingT::class.java, OutgoingT::class.java, callback) } - protected suspend fun sendRpcRequest( + internal suspend fun send( topic: String, key: String, - type: Class, - obj: T?, - timeout: Duration = Duration.INFINITE, - ): BrokerMessage { - TODO() + data: String, + headers: BaseBrokerMessageHeaders = this.connection.createHeaders(), + ): String { + return connection.send(topic, key, data, headers) + } + + private val topics: MutableMap = Collections.synchronizedMap(HashMap()) + + init { + log.debug("Initializing Broker Client with topics '{}' for objects of type {}", topics, type.name) + connection.on(topics, ::onTopicMessage) } - // TODO Rename this to something RPC-related protected suspend fun sendClusterRequest( topic: String, key: String, diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt index b99a04e..55f3681 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt @@ -35,8 +35,9 @@ abstract class BrokerConnection { ): String abstract fun createHeaders( - targetClusters: Set? = null, - requestId: String? = null + targetServices: Set = emptySet(), + targetInstances: Set = emptySet(), + inReplyTo: String? = null, ): BaseBrokerMessageHeaders open fun on(topic: String, cb: TopicListener) { diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt index 3941747..5e34adc 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt @@ -8,9 +8,6 @@ data class BrokerMessage( val headers: BaseBrokerMessageHeaders ) { - val clusterId: String - get() = headers.sourceCluster - suspend fun respond(data: T?) { client.respond(this, data) }