Skip to content

Commit

Permalink
[WIP] Start implementing concept
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Dec 1, 2023
1 parent 1655df5 commit a499c00
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 57 deletions.
7 changes: 4 additions & 3 deletions latte/src/main/java/gg/beemo/latte/TestBrokerClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ class TestBrokerClient(connection: BrokerConnection) : BrokerClient(connection)
// This is something that needs to be adapted at the connection level as well.
val response = greetingRpc.call(
GreetingRequest(name),
service = BrokerServices.TEA,
instance = "0",
services = setOf(BrokerServices.TEA),
instances = setOf("0"),
timeout = 5.seconds,
)
return response.name
return response.greeting
}

suspend fun enqueueRaidBan(user: RaidUser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package gg.beemo.latte.broker
import java.util.*

open class BaseBrokerMessageHeaders(
val clientId: String,
val sourceCluster: String,
targetClusters: Set<String>?, // // Empty = No target restriction
requestId: String?,
val clientId: String?,
val sourceService: String,
val sourceInstance: String?,
val targetServices: Set<String>,
val targetInstances: Set<String>,
val inReplyTo: String? = null,
) {

val targetClusters: Set<String> = targetClusters ?: emptySet()
val requestId: String = requestId ?: UUID.randomUUID().toString()
val messageId = UUID.randomUUID().toString()

companion object {

Expand Down
182 changes: 139 additions & 43 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import com.squareup.moshi.JsonAdapter
import com.squareup.moshi.Moshi
import gg.beemo.latte.logging.log
import gg.beemo.latte.util.SuspendingCountDownLatch
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.AtomicReference
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

fun interface BrokerEventListener<T : Any> {
suspend fun onMessage(msg: BrokerMessage<T>)
Expand All @@ -20,81 +22,175 @@ fun interface BrokerMessageListener<T : Any> {
suspend fun onMessage(clusterId: String, msg: BrokerMessage<T>)
}

private class TopicMetadata(
val topic: String,
val keys: MutableMap<String, TopicKeyClient<out Any>>
)

class RpcClient<RequestType: Any, ResponseType: Any>

class TopicKeyClient<T : Any>(
private val connection: BrokerConnection,
private val client: BrokerClient,
sealed class BaseClient<OutgoingT : Any, IncomingT : Any>(
protected val connection: BrokerConnection,
protected val client: BrokerClient,
val topic: String,
val key: String,
val type: Class<T>,
protected val outgoingType: Class<OutgoingT>?,
protected val incomingType: Class<IncomingT>?,
callback: (suspend CoroutineScope.(IncomingT) -> OutgoingT)?,
) {

// TODO Should this class still exist?

companion object {
// TODO Investigate if it's a good idea to add a global custom type adapter
// to serialize large Longs as Strings for easier JS compatibility.
// https://github.com/square/moshi#custom-type-adapters
@JvmStatic
protected val moshi: Moshi = Moshi.Builder().build()
}

}


class ProducerClient<OutgoingT : Any>(
connection: BrokerConnection,
client: BrokerClient,
topic: String,
key: String,
outgoingType: Class<OutgoingT>,
) : BaseClient<OutgoingT, Unit>(
connection,
client,
topic,
key,
outgoingType,
null,
null,
) {

private val adapter: JsonAdapter<T> = moshi.adapter(type).nullSafe()
private val listeners: MutableList<BrokerEventListener<T>> = CopyOnWriteArrayList()
private val adapter: JsonAdapter<OutgoingT?> = moshi.adapter(outgoingType).nullSafe()

suspend fun send(
topic: String,
key: String,
obj: T?,
headers: BaseBrokerMessageHeaders = this.connection.createHeaders(),
): String {
return connection.send(topic, key, stringify(obj), headers)
data: OutgoingT,
services: Set<String> = emptySet(),
instances: Set<String> = emptySet(),
) {
val strigifiedData = stringifyOutgoing(data)
val headers = connection.createHeaders(services, instances)
connection.send(topic, key, strigifiedData, headers)
}

private fun stringifyOutgoing(data: OutgoingT?): String {
return adapter.toJson(data)
}

private fun parse(json: String): T? {
}

class ConsumerClient<IncomingT : Any>(
connection: BrokerConnection,
client: BrokerClient,
topic: String,
key: String,
incomingType: Class<IncomingT>,
callback: suspend CoroutineScope.(IncomingT) -> Unit,
) : BaseClient<Unit, IncomingT>(
connection,
client,
topic,
key,
null,
incomingType,
callback,
) {

private val adapter: JsonAdapter<IncomingT?> = moshi.adapter(incomingType).nullSafe()

private fun parseIncoming(json: String): IncomingT? {
return adapter.fromJson(json)
}

private fun stringify(obj: T?): String {
return adapter.toJson(obj)
}


class RpcClient<OutgoingT : Any, IncomingT : Any>(
connection: BrokerConnection,
client: BrokerClient,
topic: String,
key: String,
outgoingType: Class<OutgoingT>,
incomingType: Class<IncomingT>,
callback: suspend CoroutineScope.(IncomingT) -> OutgoingT,
) : BaseClient<OutgoingT, IncomingT>(
connection,
client,
topic,
key,
outgoingType,
incomingType,
callback,
) {

private val producer = ProducerClient(connection, client, topic, key, outgoingType)
private val consumer = ConsumerClient(connection, client, topic, key, incomingType) {
TODO()
}

companion object {
// TODO Investigate if it's a good idea to add a global custom type adapter
// to serialize large Longs as Strings for easier JS compatibility.
// https://github.com/square/moshi#custom-type-adapters
private val moshi = Moshi.Builder().build()
suspend fun call(
request: OutgoingT,
services: Set<String> = emptySet(),
instances: Set<String> = emptySet(),
timeout: Duration = 10.seconds,
): IncomingT {
producer.send(request, services, instances)
TODO()
}

}


private class TopicMetadata(
val topic: String,
val keys: MutableMap<String, TopicKeyClient<out Any>>
)

abstract class BrokerClient(
protected val connection: BrokerConnection,
@PublishedApi
internal val connection: BrokerConnection,
) {

private val topics: MutableMap<String, TopicMetadata> = Collections.synchronizedMap(HashMap())
inline fun <reified T : Any> consumer(
topic: String,
key: String,
noinline callback: suspend CoroutineScope.(T) -> Unit,
): ConsumerClient<T> {
return ConsumerClient(connection, this, topic, key, T::class.java, callback)
}

init {
log.debug("Initializing Broker Client with topics '{}' for objects of type {}", topics, type.name)
connection.on(topics, ::onTopicMessage)
inline fun <reified T : Any> producer(
topic: String,
key: String,
): ProducerClient<T> {
return ProducerClient(connection, this, topic, key, T::class.java)
}

protected suspend inline fun <reified T : Any> sendRpcRequest(
inline fun <reified IncomingT : Any, reified OutgoingT : Any> rpc(
topic: String,
key: String,
obj: T?,
timeout: Duration = Duration.INFINITE,
): BrokerMessage<T> {
return sendRpcRequest(topic, key, T::class.java, obj, timeout)
noinline callback: suspend CoroutineScope.(IncomingT) -> OutgoingT,
): RpcClient<IncomingT, OutgoingT> {
return RpcClient(connection, this, topic, key, IncomingT::class.java, OutgoingT::class.java, callback)
}

protected suspend fun <T : Any> sendRpcRequest(
internal suspend fun send(
topic: String,
key: String,
type: Class<T>,
obj: T?,
timeout: Duration = Duration.INFINITE,
): BrokerMessage<T> {
TODO()
data: String,
headers: BaseBrokerMessageHeaders = this.connection.createHeaders(),
): String {
return connection.send(topic, key, data, headers)
}

private val topics: MutableMap<String, TopicMetadata> = Collections.synchronizedMap(HashMap())

init {
log.debug("Initializing Broker Client with topics '{}' for objects of type {}", topics, type.name)
connection.on(topics, ::onTopicMessage)
}

// TODO Rename this to something RPC-related
protected suspend fun <T : Any> sendClusterRequest(
topic: String,
key: String,
Expand Down
5 changes: 3 additions & 2 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ abstract class BrokerConnection {
): String

abstract fun createHeaders(
targetClusters: Set<String>? = null,
requestId: String? = null
targetServices: Set<String> = emptySet(),
targetInstances: Set<String> = emptySet(),
inReplyTo: String? = null,
): BaseBrokerMessageHeaders

open fun on(topic: String, cb: TopicListener) {
Expand Down
3 changes: 0 additions & 3 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ data class BrokerMessage<T : Any>(
val headers: BaseBrokerMessageHeaders
) {

val clusterId: String
get() = headers.sourceCluster

suspend fun respond(data: T?) {
client.respond(this, data)
}
Expand Down

0 comments on commit a499c00

Please sign in to comment.