Skip to content

Commit

Permalink
[WIP] Plan out initial concept
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Dec 1, 2023
1 parent 3480052 commit 1655df5
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 62 deletions.
74 changes: 74 additions & 0 deletions latte/src/main/java/gg/beemo/latte/TestBrokerClient.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package gg.beemo.latte

import gg.beemo.latte.broker.BrokerClient
import gg.beemo.latte.broker.BrokerConnection
import kotlinx.coroutines.delay
import kotlin.time.Duration.Companion.seconds

// TODO This would be in the CommonConfig or similar.
object BrokerServices {
const val TEA = "tea"
}

class Tea {
companion object {
val cluster: Cluster = Cluster()
}

class Cluster {
suspend fun restartShard(id: ShardId) {}
}
}

class ShardId
class RaidUser
data class ShardRestartRequest(val shardId: ShardId)
data class GreetingRequest(val name: String)
data class GreetingResponse(val greeting: String)

// ------------------------------

class TestBrokerClient(connection: BrokerConnection) : BrokerClient(connection) {

init {
consumer<ShardRestartRequest>(
topic = "cluster.shard",
key = "restart",
) { req: ShardRestartRequest ->
Tea.cluster.restartShard(req.shardId)
}
}

private val raidBanQueue = producer<RaidUser>(
topic = "raid.bans",
key = "ban.enqueue",
)

private val greetingRpc = rpc<GreetingRequest, GreetingResponse>(
topic = "rpc.greetings",
key = "greeting.requests",
) { req: GreetingRequest ->
delay(2.seconds)
return@rpc GreetingResponse("Hello, ${req.name}")
}

suspend fun createGreeting(name: String): String {
// TODO Have to specify what cluster to send to.
// Must also support external clusters such as milk... which aren't called clusters.
// Maybe I should rename it to "service", so you specify the "target service".
// Perhaps also the "target instance"? Because Tea as a whole would be a service.
// So for a cluster-specific request, you send to service=tea, instance=0.
// This is something that needs to be adapted at the connection level as well.
val response = greetingRpc.call(
GreetingRequest(name),
service = BrokerServices.TEA,
instance = "0",
)
return response.name
}

suspend fun enqueueRaidBan(user: RaidUser) {
raidBanQueue.send(user)
}

}
145 changes: 85 additions & 60 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import gg.beemo.latte.util.SuspendingCountDownLatch
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import java.util.Collections
import java.util.concurrent.CopyOnWriteArraySet
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.AtomicReference
import kotlin.time.Duration

Expand All @@ -19,65 +20,93 @@ fun interface BrokerMessageListener<T : Any> {
suspend fun onMessage(clusterId: String, msg: BrokerMessage<T>)
}

private class TopicData<T : Any>(
private class TopicMetadata(
val topic: String,
val keys: MutableMap<String, TopicKeyClient<out Any>>
)

class RpcClient<RequestType: Any, ResponseType: Any>

class TopicKeyClient<T : Any>(
private val connection: BrokerConnection,
private val client: BrokerClient,
val topic: String,
val key: String,
val type: Class<T>,
val adapter: JsonAdapter<T>,
val listeners: MutableMap<String, BrokerEventListener<T>>,
) {

fun parse(json: String): T? {
private val adapter: JsonAdapter<T> = moshi.adapter(type).nullSafe()
private val listeners: MutableList<BrokerEventListener<T>> = CopyOnWriteArrayList()

suspend fun send(
topic: String,
key: String,
obj: T?,
headers: BaseBrokerMessageHeaders = this.connection.createHeaders(),
): String {
return connection.send(topic, key, stringify(obj), headers)
}

private fun parse(json: String): T? {
return adapter.fromJson(json)
}

fun stringify(obj: T?): String {
private fun stringify(obj: T?): String {
return adapter.toJson(obj)
}

}

abstract class BrokerClient(
protected val connection: BrokerConnection,
// TODO initialize topics when registering event listeners instead?
initialTopics: Map<String, Class<Any>> = emptyMap(),
) {

companion object {
// TODO Investigate if it's a good idea to add a global custom type adapter
// to serialize large Longs as Strings for easier JS compatibility.
// https://github.com/square/moshi#custom-type-adapters
private val moshi = Moshi.Builder().build()
}

private val topics: MutableMap<String, TopicData<*>> = Collections.synchronizedMap(HashMap())
}

abstract class BrokerClient(
protected val connection: BrokerConnection,
) {

private val topics: MutableMap<String, TopicMetadata> = Collections.synchronizedMap(HashMap())

init {
log.debug("Initializing Broker Client with topics '{}' for objects of type {}", topics, type.name)
connection.on(topics, ::onTopicMessage)
}

protected suspend fun <T : Any> send(
protected suspend inline fun <reified T : Any> sendRpcRequest(
topic: String,
key: String,
obj: T?,
headers: BaseBrokerMessageHeaders = this.connection.createHeaders(),
): String {
val topicData = getTopicData(topic, obj)
requireNotNull(topicData) { "Attempting to send to unregistered topic $topic" }
return connection.send(topic, key, topicData.stringify(obj), headers)
timeout: Duration = Duration.INFINITE,
): BrokerMessage<T> {
return sendRpcRequest(topic, key, T::class.java, obj, timeout)
}

protected suspend fun <T : Any> sendRpcRequest(
topic: String,
key: String,
type: Class<T>,
obj: T?,
timeout: Duration = Duration.INFINITE,
): BrokerMessage<T> {
TODO()
}

// TODO Rename this to something RPC-related
protected suspend fun <T : Any> sendClusterRequest(
topic: String,
key: String,
obj: T?,
timeout: Duration = Duration.ZERO,
timeout: Duration = Duration.INFINITE,
targetClusters: Set<String> = emptySet(),
expectedResponses: Int? = null,
messageCallback: BrokerMessageListener<T>? = null,
): Pair<Map<String, BrokerMessage<T>>, Boolean> {
val responseKey = key.toResponseKey()

val responses = mutableMapOf<String, BrokerMessage<T>>()
val responses = ConcurrentHashMap<String, BrokerMessage<T>>()
val latch = SuspendingCountDownLatch(expectedResponses ?: targetClusters.size)
val requestId: AtomicReference<String> = AtomicReference("")

Expand All @@ -99,17 +128,13 @@ abstract class BrokerClient(
}

on(topic, responseKey, cb)
var timeoutReached = false
val timeoutReached: Boolean
try {
val headers = this.connection.createHeaders(targetClusters)
requestId.set(headers.requestId)
send(topic, key, obj, headers)

if (timeout <= Duration.ZERO) {
latch.await()
} else {
timeoutReached = !latch.await(timeout)
}
timeoutReached = !latch.await(timeout)
} finally {
off(topic, responseKey, cb)
}
Expand All @@ -122,27 +147,11 @@ abstract class BrokerClient(
}

protected fun <T : Any> on(topic: String, key: String, type: Class<T>, cb: BrokerEventListener<T>) {
val topicData = topics.computeIfAbsent(topic) {
val adapter = moshi.adapter(type).nullSafe()
TopicData(type, adapter, Collections.synchronizedMap(HashMap()))
}
require(topicData.type == type) {
"Topic '$topic' is already registered with type ${topicData.type.name}, " +
"attempting to re-register it with type ${type.name}"
}
@Suppress("UNCHECKED_CAST") // Safe because of the above verification
(topicData as TopicData<T>).listeners[key] = cb
getTopicKeyMetadata(topic, key, type).listeners.add(cb)
}

protected fun <T : Any> off(topic: String, key: String, cb: BrokerEventListener<T>) {
topics.computeIfPresent(topic) { _, topicData ->
topicData.listeners.remove(key, cb)
if (topicData.listeners.isEmpty()) {
null
} else {
topicData
}
}
getTopicKeyMetadata<T>(topic, key)?.listeners?.remove(cb)
}

internal suspend fun <T : Any> respond(
Expand All @@ -154,37 +163,53 @@ abstract class BrokerClient(
msg.headers.requestId,
)
send(
msg.topic,
msg.key.toResponseKey(),
data,
newHeaders,
)
}

private suspend fun onTopicMessage(key: String, value: String, headers: BaseBrokerMessageHeaders) = coroutineScope {
val obj = parse(value)
val msg = BrokerMessage(this@BrokerClient, key, obj, headers)
val listeners = keyListeners[key]
for (listener in listeners ?: return@coroutineScope) {
private suspend fun <T : Any> onTopicMessage(
topic: String,
key: String,
value: String,
headers: BaseBrokerMessageHeaders,
) = coroutineScope {
val metadata = getTopicKeyMetadata<T>(topic, key)
requireNotNull(metadata) { "Can't find metadata for key '$key' in topic '$topic'" }
val obj = metadata.parse(value)
val msg = BrokerMessage(this@BrokerClient, topic, key, obj, headers)
for (listener in metadata.listeners) {
launch {
try {
listener.onMessage(msg)
} catch (t: Throwable) {
log.error("Uncaught error in BrokerClient listener", t)
log.error("Uncaught error in BrokerClient listener for key '$key' in topic '$topic'", t)
}
}
}
}

private fun <T:Any> getTopicData(topic: String, obj: T?): TopicData<T>? {
private fun <T : Any> getTopicKeyMetadata(topic: String, key: String): TopicKeyClient<T>? {
val topicData = topics[topic] ?: return null
if (obj != null) {
require(topicData.type == obj::class.java) {
"Topic '$topic' is already registered with type ${topicData.type.name}, " +
"attempting to send with type ${obj::class.java.name}"
}
return topicData.keys[key]
}

private fun <T : Any> getTopicKeyMetadata(topic: String, key: String, type: Class<T>): TopicKeyClient<T> {
val topicData = topics.computeIfAbsent(topic) {
TopicMetadata(topic, Collections.synchronizedMap(HashMap()))
}
val keyData = topicData.keys.computeIfAbsent(key) {
val adapter = moshi.adapter(type).nullSafe()
TopicKeyClient(topic, key, type, adapter, CopyOnWriteArrayList())
}
require(keyData.type == type) {
"Key '$key' in topic '$topic' is already registered with type ${keyData.type.name}, " +
"attempting to process it with type ${type.name} instead"
}
@Suppress("UNCHECKED_CAST") // Safe because of the above verification
return topicData as TopicData<T>
return keyData as TopicKeyClient<T>
}

}
Expand Down
4 changes: 2 additions & 2 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.util.*
import java.util.concurrent.CopyOnWriteArraySet

fun interface TopicListener {
suspend fun onMessage(key: String, value: String, headers: BaseBrokerMessageHeaders)
suspend fun onMessage(topic: String, key: String, value: String, headers: BaseBrokerMessageHeaders)
}

abstract class BrokerConnection {
Expand Down Expand Up @@ -98,7 +98,7 @@ abstract class BrokerConnection {
val listeners = topicListeners[topic]
for (listener in listeners ?: return@launch) {
try {
listener.onMessage(key, value, headers)
listener.onMessage(topic, key, value, headers)
} catch (t: Throwable) {
log.error("Uncaught error in BrokerConnection listener", t)
}
Expand Down
4 changes: 4 additions & 0 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package gg.beemo.latte.broker

data class BrokerMessage<T : Any>(
val client: BrokerClient,
val topic: String,
val key: String,
val value: T?,
val headers: BaseBrokerMessageHeaders
) {

val clusterId: String
get() = headers.sourceCluster

suspend fun respond(data: T?) {
client.respond(this, data)
}
Expand Down

0 comments on commit 1655df5

Please sign in to comment.