Skip to content

Commit

Permalink
feat: a kafka consumer demo
Browse files Browse the repository at this point in the history
  • Loading branch information
cyhii committed Jun 2, 2022
1 parent ee6190d commit 6371b63
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ application {
dependencies {
implementation(platform("io.vertx:vertx-stack-depchain:$vertxVersion"))
implementation("io.vertx:vertx-config")
implementation("io.vertx:vertx-kafka-client")
implementation("io.vertx:vertx-lang-kotlin")
implementation("io.vertx:vertx-lang-kotlin-coroutines")
implementation("org.slf4j:slf4j-api:1.7.35")
Expand Down
1 change: 1 addition & 0 deletions src/main/kotlin/com/example/starter/MainVerticle.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ import io.vertx.kotlin.coroutines.CoroutineVerticle
class MainVerticle : CoroutineVerticle() {

override suspend fun start() {
vertx.deployVerticle(MyKafkaConsumer())
}
}
50 changes: 50 additions & 0 deletions src/main/kotlin/com/example/starter/MyKafkaConsumer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.example.starter

import io.vertx.core.json.JsonObject
import io.vertx.kafka.client.consumer.KafkaConsumer
import io.vertx.kafka.client.consumer.KafkaConsumerRecord
import io.vertx.kotlin.coroutines.CoroutineVerticle
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class MyKafkaConsumer : CoroutineVerticle() {

private val log: Logger = LoggerFactory.getLogger(javaClass)

override suspend fun start() {
val kafkaConfig: Map<String, String> = mapOf(
"bootstrap.servers" to "127.0.0.1:9092",
"key.deserializer" to "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" to "io.vertx.kafka.client.serialization.JsonObjectDeserializer",
"group.id" to "my-consumer",
"auto.offset.reset" to "latest",
"enable.auto.commit" to "true",
)
val consumer: KafkaConsumer<String, JsonObject> = KafkaConsumer.create(vertx, kafkaConfig)

consumer.handler { record ->
consumerHandler(record)
}

consumer.exceptionHandler {
// WARN: uncomment this line below then you will get log storm
// log.warn("failed to consume message : {}", it.message, it)
}

val topics = setOf("my-topic")
consumer.subscribe(topics) {
if (it.succeeded()) {
log.info("succeeded to subscribe topics {}", topics)
} else {
log.error("failed to subscribe topics {}", topics, it.cause())
}
}
}

private fun consumerHandler(record: KafkaConsumerRecord<String, JsonObject>) {
val json = record.value()

log.info("consume message {}", json)
// consume this message
}
}

0 comments on commit 6371b63

Please sign in to comment.