From b7954a39efebb921a49d9cf9f806e83164a4ea16 Mon Sep 17 00:00:00 2001 From: Adrian Paschkowski Date: Sun, 17 Dec 2023 20:18:32 +0100 Subject: [PATCH] tweaks --- latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt | 5 ++--- .../src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt | 4 +++- .../src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt | 3 +++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt index 1e27753..8b41ef7 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt @@ -12,7 +12,6 @@ private class TopicMetadata( private class KeyMetadata( val topic: TopicMetadata, - val key: String, val producers: MutableSet>, val consumers: MutableSet>, ) @@ -76,7 +75,7 @@ abstract class BrokerClient( topic: String, key: String, options: BrokerClientOptions = BrokerClientOptions(), - noinline callback: suspend CoroutineScope.(BaseRpcRequestMessage) -> Pair, + noinline callback: suspend CoroutineScope.(BaseRpcRequestMessage) -> RpcResponse, ): RpcClient { return RpcClient( this, @@ -146,7 +145,7 @@ abstract class BrokerClient( TopicMetadata(topic, Collections.synchronizedMap(HashMap())) } val keyData = topicData.keys.computeIfAbsent(key) { - KeyMetadata(topicData, key, Collections.synchronizedSet(HashSet()), Collections.synchronizedSet(HashSet())) + KeyMetadata(topicData, Collections.synchronizedSet(HashSet()), Collections.synchronizedSet(HashSet())) } return keyData } diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt index e5c068c..7893eeb 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt @@ -169,6 +169,8 @@ class ConsumerSubclient( } +typealias RpcResponse = Pair + class RpcClient( client: BrokerClient, topic: String, @@ -178,7 +180,7 @@ class RpcClient( requestIsNullable: Boolean, private val responseType: Class, private val responseIsNullable: Boolean, - private val callback: suspend CoroutineScope.(BaseRpcRequestMessage) -> Pair, + private val callback: suspend CoroutineScope.(BaseRpcRequestMessage) -> RpcResponse, ) : BaseSubclient( client.connection, client, diff --git a/vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt b/vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt index 5613b37..1792d98 100644 --- a/vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt +++ b/vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt @@ -3,6 +3,7 @@ package gg.beemo.vanilla import gg.beemo.latte.broker.BrokerClient import gg.beemo.latte.broker.BrokerConnection import gg.beemo.latte.broker.IgnoreRpcRequest +import gg.beemo.latte.broker.RpcStatus import gg.beemo.latte.logging.Log import gg.beemo.latte.ratelimit.SharedRatelimitData import gg.beemo.latte.util.SuspendingRatelimit @@ -46,6 +47,8 @@ class RatelimitClient(connection: BrokerConnection) : BrokerClient(connection) { provider.getClientRatelimit(clientId).requestQuota() log.debug("Granted {} quota request for service {}", type, service) + + return@rpc RpcStatus.OK to Unit } }