Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Build stage
FROM maven:3.9.11-ibm-semeru-17-noble AS build

WORKDIR /app
COPY pom.xml .
COPY package.json .
COPY src ./src
COPY ui ./ui

# Build the JAR (this will run frontend-maven-plugin too)
RUN mvn clean package -DskipTests

# Runtime stage
FROM ibm-semeru-runtimes:open-17-jdk

WORKDIR /app
COPY kafka.properties kafka.properties
COPY --from=build /app/target/demo-all.jar app.jar

EXPOSE 8080

ENTRYPOINT ["java", "-jar", "app.jar"]
67 changes: 45 additions & 22 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,29 +1,52 @@
version: '2'
services:

zookeeper:
image: strimzi/zookeeper
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
broker:
image: apache/kafka:4.0.0
container_name: broker
hostname: broker
ports:
- "2181:2181"
- "9092:9092"
environment:
LOG_DIR: /tmp/logs
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_HOST://:29092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:9093
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
KAFKA_CONFIG_DIR: /var/lib/kafka-config
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
healthcheck:
test: ["CMD", "/opt/kafka/bin/kafka-broker-api-versions.sh", "--bootstrap-server", "localhost:9092"]
interval: 5s
timeout: 10s
retries: 3
start_period: 3s

create-topics:
image: apache/kafka:4.0.0
container_name: create-topics
depends_on:
broker:
condition: service_healthy
entrypoint: ["/bin/sh", "-c"]
command: |
"
/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic __consumer_offsets --bootstrap-server broker:9092 --partitions 1 --replication-factor 1
/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic demo --bootstrap-server broker:9092 --partitions 1 --replication-factor 1
"
restart: "no"

kafka:
image: strimzi/kafka
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]

app:
build: .
container_name: demo-app
depends_on:
- zookeeper
broker:
condition: service_healthy
create-topics:
condition: service_completed_successfully
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- "8080:8080"
restart: always
2 changes: 1 addition & 1 deletion kafka.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
bootstrap.servers=localhost:9092
bootstrap.servers=broker:9092
## Optional topic configuration - otherwise default value will be chosen
# topic=

Expand Down
83 changes: 60 additions & 23 deletions src/main/java/kafka/vertx/demo/PeriodicProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.TimeoutStream;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaProducer;
Expand All @@ -16,54 +15,84 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.time.Duration;
import java.util.Map;
import java.util.stream.Collectors;

public class PeriodicProducer extends AbstractVerticle {

private static final Logger logger = LoggerFactory.getLogger(PeriodicProducer.class);
private String customMessage;
private static final long PRODUCE_INTERVAL_MS = Duration.ofSeconds(2).toMillis();

private KafkaProducer<String, String> kafkaProducer;
private static final long TIMER_NOT_SET = -1L;
private long timerId = TIMER_NOT_SET;
private String customMessage = "Hello World";

@Override
public void start(Promise<Void> startPromise) {
String propertiesPath = System.getProperty(Main.PROPERTIES_PATH_ENV_NAME, Main.DEFAULT_PROPERTIES_PATH);
String propertiesPath = System.getProperty(
Main.PROPERTIES_PATH_ENV_NAME,
Main.DEFAULT_PROPERTIES_PATH
);

Main.loadKafkaConfig(vertx, propertiesPath)
.onSuccess(config -> {
HashMap<String, String> props = config.mapTo(HashMap.class);
setup(props);
setup(config);
startPromise.complete();
})
.onFailure(startPromise::fail);
}

private void setup(HashMap<String, String> props) {
// Don't retry and only wait 10 secs for partition info as this is a demo app
private void setup(JsonObject config) {
// Convert JsonObject config -> Map<String,String>
Map<String, String> props = config.getMap()
.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> String.valueOf(e.getValue())
));

props.put(ProducerConfig.RETRIES_CONFIG, "0");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000");
KafkaProducer<String, String> kafkaProducer = KafkaProducer.create(vertx, props);

kafkaProducer.exceptionHandler(err -> logger.debug("Kafka error: {}", err));
kafkaProducer = KafkaProducer.create(vertx, props);
kafkaProducer.exceptionHandler(err -> logger.error("Kafka producer error", err));

TimeoutStream timerStream = vertx.periodicStream(2000);
timerStream.handler(tick -> produceKafkaRecord(kafkaProducer, props.get(Main.TOPIC_KEY)));
timerStream.pause();
vertx.eventBus()
.<JsonObject>consumer(Main.PERIODIC_PRODUCER_ADDRESS,
msg -> handleCommand(props, msg));

vertx.eventBus().<JsonObject>consumer(Main.PERIODIC_PRODUCER_ADDRESS, message -> handleCommand(timerStream, message));
logger.info("🚀 PeriodicProducer started");
}

private void handleCommand(TimeoutStream timerStream, Message<JsonObject> message) {
private void handleCommand(Map<String, String> props, Message<JsonObject> message) {
String command = message.body().getString(WebSocketServer.ACTION, "none");
if (WebSocketServer.START_ACTION.equals(command)) {
logger.info("Producing Kafka records");
customMessage = message.body().getString("custom", "Hello World");
timerStream.resume();
} else if (WebSocketServer.STOP_ACTION.equals(command)) {
logger.info("Stopping producing Kafka records");
timerStream.pause();
switch (command) {
case WebSocketServer.START_ACTION:
customMessage = message.body().getString("custom", "Hello World");
if (timerId == TIMER_NOT_SET) {
timerId = vertx.setPeriodic(PRODUCE_INTERVAL_MS,
id -> produceKafkaRecord(props.get(Main.TOPIC_KEY)));
logger.info("Producing Kafka records with message template: {}", customMessage);
}
break;

case WebSocketServer.STOP_ACTION:
if (timerId != TIMER_NOT_SET) {
vertx.cancelTimer(timerId);
timerId = TIMER_NOT_SET;
logger.info("Stopped producing Kafka records");
}
break;

default:
logger.warn("Unknown command received: {}", command);
}
}

private void produceKafkaRecord(KafkaProducer<String, String> kafkaProducer, String topic) {
private void produceKafkaRecord(String topic) {
String payload = customMessage;
KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(topic, payload);
logger.debug("Producing record to topic {} with payload {}", topic, payload);
Expand All @@ -84,4 +113,12 @@ private void produceKafkaRecord(KafkaProducer<String, String> kafkaProducer, Str
vertx.eventBus().send(Main.PERIODIC_PRODUCER_BROADCAST, new JsonObject().put("status", "ERROR"));
});
}

@Override
public void stop() {
if (kafkaProducer != null) {
kafkaProducer.close()
.onComplete(ar -> logger.info("KafkaProducer closed: {}", ar.succeeded() ? "success" : ar.cause()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will ar.cause()) be null is success ??

logger.info("KafkaProducer closed: {}", ar.succeeded() ? "success" : "failure", ar.cause());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that won't get called

}
}
}
26 changes: 13 additions & 13 deletions src/main/java/kafka/vertx/demo/WebSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.StaticHandler;
import io.vertx.ext.web.templ.thymeleaf.ThymeleafTemplateEngine;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Set;

public class WebSocketServer extends AbstractVerticle {

Expand Down Expand Up @@ -70,7 +68,7 @@ private Future<HttpServer> createRouterAndStartServer(JsonObject config) {
JsonObject props = new JsonObject();

String topic = config.getString("topic");

props.put("topic", topic);
props.put("producerPath", PRODUCE_PATH);
props.put("consumerPath", CONSUME_PATH);
Expand All @@ -94,7 +92,7 @@ private Future<HttpServer> startWebSocket(Router router) {
return vertx.createHttpServer(new HttpServerOptions().setRegisterWebSocketWriteHandlers(true))
.requestHandler(router)
.webSocketHandler(this::handleWebSocket)
.listen(8080)
.listen(8080, "0.0.0.0")
.onSuccess(ok -> logger.info("🚀 WebSocketServer started"))
.onFailure(err -> logger.error("❌ WebSocketServer failed to listen", err));
}
Expand Down Expand Up @@ -140,11 +138,8 @@ private void handleProduceSocket(ServerWebSocket webSocket) {

private void handleConsumeSocket(ServerWebSocket webSocket) {
KafkaConsumer<String, JsonObject> kafkaConsumer = KafkaConsumer.create(vertx, kafkaConfig);

kafkaConsumer.exceptionHandler(err -> logger.error("Kafka error", err));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add it back for better error visibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you are right


String topic = kafkaConfig.get(Main.TOPIC_KEY);
TopicPartition topicPartition = new TopicPartition().setTopic(topic);

kafkaConsumer.handler(record -> {
JsonObject payload = new JsonObject()
Expand All @@ -157,16 +152,21 @@ private void handleConsumeSocket(ServerWebSocket webSocket) {
vertx.eventBus().send(webSocket.textHandlerID(), payload.encode());
});

kafkaConsumer.subscribe(topic)
.onSuccess(v -> {
logger.info("Subscribed to {}", topic);
})
.onFailure(err -> logger.error("Could not subscribe to {}", topic, err));

webSocket.handler(buffer -> {
String action = buffer.toJsonObject().getString(ACTION, "none");

if (START_ACTION.equals(action)) {
kafkaConsumer.subscription()
.compose(sub -> (sub.size() > 0) ? kafkaConsumer.resume(topicPartition) : kafkaConsumer.subscribe(topic))
.onSuccess(ok -> logger.info("Subscribed to {}", topic))
.onFailure(err -> logger.error("Could not subscribe to {}", topic, err));
kafkaConsumer.resume();
logger.info("Consumer resumed");
} else if (STOP_ACTION.equals(action)) {
kafkaConsumer.pause(topicPartition)
.onFailure(err -> logger.error("Cannot pause consumer", err));
kafkaConsumer.pause();
logger.info("Consumer paused");
}
});

Expand Down