Skip to content

Commit

Permalink
feat: cloudopt-next-kafka support suspend
Browse files Browse the repository at this point in the history
  • Loading branch information
T-baby committed Sep 2, 2021
1 parent ce82cae commit 5103191
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ import io.vertx.kafka.client.consumer.KafkaConsumerRecord
*/
interface KafkaListener {

fun listener(record: KafkaConsumerRecord<String, Any>)
suspend fun listener(record: KafkaConsumerRecord<String, Any>)

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.vertx.kafka.client.producer.KafkaProducer
import net.cloudopt.next.core.Classer
import net.cloudopt.next.core.Plugin
import net.cloudopt.next.core.Worker
import net.cloudopt.next.core.Worker.global
import net.cloudopt.next.web.NextServer
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
Expand Down Expand Up @@ -52,7 +53,7 @@ class KafkaPlugin : Plugin {
Classer.scanPackageByAnnotation(NextServer.packageName, true, AutoKafka::class)
.forEach { clazz ->
clazz.findAnnotation<AutoKafka>()?.value?.split(",")?.forEach { topic ->
var set = KafkaManager.kafkaList[topic] ?: mutableSetOf()
val set = KafkaManager.kafkaList[topic] ?: mutableSetOf()
set.add(clazz)
KafkaManager.kafkaList[topic] = set
}
Expand All @@ -66,10 +67,12 @@ class KafkaPlugin : Plugin {
KafkaManager.logger.error("[KAFKA] Registered topic listener was error:${KafkaManager.kafkaList.keys}")
}
}?.handler { record ->
if (record.topic().isNotBlank() && KafkaManager.kafkaList[record.topic()]?.size ?: 0 > 0) {
if (record.topic().isNotBlank() && (KafkaManager.kafkaList[record.topic()]?.size ?: 0) > 0) {
KafkaManager.kafkaList[record.topic()]?.forEach { clazz ->
var obj = clazz.createInstance() as KafkaListener
obj.listener(record as KafkaConsumerRecord<String, Any>)
val obj = clazz.createInstance() as KafkaListener
global{
obj.listener(record as KafkaConsumerRecord<String, Any>)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@ package net.cloudopt.next.kafka.test

import net.cloudopt.next.kafka.KafkaManager
import net.cloudopt.next.web.Resource
import net.cloudopt.next.web.route.API
import net.cloudopt.next.web.route.GET
import net.cloudopt.next.web.annotation.API
import net.cloudopt.next.web.annotation.GET


/*
* @author: Cloudopt
* @Time: 2018/1/26
* @Description: Test Controller
*/
@API("/")
class IndexController : Resource() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ import net.cloudopt.next.kafka.KafkaPlugin
import net.cloudopt.next.web.NextServer


/*
* @author: Cloudopt
* @Time: 2018/2/6
* @Description: Test Case
*/

fun main(args: Array<String>) {
NextServer.addPlugin(KafkaPlugin())
NextServer.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,10 @@ import net.cloudopt.next.kafka.AutoKafka
import net.cloudopt.next.kafka.KafkaListener


/*
* @author: Cloudopt
* @Time: 2018/2/6
* @Description: Test Case
*/
@AutoKafka("test-topic")
class TestKafka : KafkaListener {

override fun listener(record: KafkaConsumerRecord<String, Any>) {
override suspend fun listener(record: KafkaConsumerRecord<String, Any>) {
println("this is kafka.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ package net.cloudopt.next.kafka.test
import org.apache.kafka.streams.processor.AbstractProcessor


/*
* @author: Cloudopt
* @Time: 2018/2/6
* @Description: Test Case
*/
class TestStreams : AbstractProcessor<String, String>() {

override fun process(key: String?, value: String?) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"packageName": "net.cloudopt.next.kafka.test",
"port": 2333,
"kafka": {
"streams": "true",
"streams": "false",
"bootstrap.servers": "PLAINTEXT://127.0.0.1:9092",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
Expand Down

0 comments on commit 5103191

Please sign in to comment.