From 1a52831f2fa7b6012f85775f91fe17aaff8d65b5 Mon Sep 17 00:00:00 2001 From: Dave Troiano Date: Mon, 13 Oct 2025 10:42:51 -0400 Subject: [PATCH] feat: Queues for Kafka tutorial --- build.gradle | 1 + .../kafka/build.gradle | 6 +- .../kafka/settings.gradle | 4 +- .../developer/MovieRatingJoinerTest.java | 6 +- .../developer/MovieRatingJoinerTest.java | 6 +- queues-for-kafka/.gitignore | 3 + queues-for-kafka/README.md | 329 ++++++++++++++++++ queues-for-kafka/build.gradle | 53 +++ queues-for-kafka/docker-compose.yml | 27 ++ queues-for-kafka/settings.gradle | 3 + .../io/confluent/developer/ConsumerApp.java | 159 +++++++++ .../developer/ConsumerAppArgParser.java | 180 ++++++++++ .../confluent/developer/ConsumerThread.java | 124 +++++++ .../io/confluent/developer/ProducerApp.java | 67 ++++ .../developer/ProducerAppArgParser.java | 102 ++++++ .../src/main/resources/cloud.properties | 4 + .../src/main/resources/local.properties | 1 + .../src/main/resources/logback.xml | 11 + .../main/resources/simplelogger.properties | 2 + settings.gradle | 1 + 20 files changed, 1077 insertions(+), 12 deletions(-) create mode 100644 queues-for-kafka/.gitignore create mode 100644 queues-for-kafka/README.md create mode 100644 queues-for-kafka/build.gradle create mode 100644 queues-for-kafka/docker-compose.yml create mode 100644 queues-for-kafka/settings.gradle create mode 100644 queues-for-kafka/src/main/java/io/confluent/developer/ConsumerApp.java create mode 100644 queues-for-kafka/src/main/java/io/confluent/developer/ConsumerAppArgParser.java create mode 100644 queues-for-kafka/src/main/java/io/confluent/developer/ConsumerThread.java create mode 100644 queues-for-kafka/src/main/java/io/confluent/developer/ProducerApp.java create mode 100644 queues-for-kafka/src/main/java/io/confluent/developer/ProducerAppArgParser.java create mode 100644 queues-for-kafka/src/main/resources/cloud.properties create mode 100644 queues-for-kafka/src/main/resources/local.properties create mode 100644 queues-for-kafka/src/main/resources/logback.xml create mode 100644 queues-for-kafka/src/main/resources/simplelogger.properties diff --git a/build.gradle b/build.gradle index 34da10be..07b931a1 100644 --- a/build.gradle +++ b/build.gradle @@ -27,6 +27,7 @@ subprojects { } tasks.withType(Test) { + maxParallelForks = 1 testLogging { outputs.upToDateWhen { false } events "PASSED", "SKIPPED", "FAILED", "STANDARD_OUT", "STANDARD_ERROR" diff --git a/confluent-parallel-consumer-application/kafka/build.gradle b/confluent-parallel-consumer-application/kafka/build.gradle index 2741d61c..c921624f 100644 --- a/confluent-parallel-consumer-application/kafka/build.gradle +++ b/confluent-parallel-consumer-application/kafka/build.gradle @@ -29,9 +29,9 @@ repositories { dependencies { - implementation project(':common') implementation "org.slf4j:slf4j-simple:2.0.7" - implementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4" + implementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.3.3" + implementation "org.apache.kafka:kafka-clients:3.9.1" implementation "org.apache.commons:commons-lang3:3.12.0" implementation "me.tongfei:progressbar:0.9.3" implementation 'org.awaitility:awaitility:4.2.0' @@ -41,7 +41,7 @@ dependencies { testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2' testImplementation 'org.hamcrest:hamcrest:2.2' testImplementation 'org.awaitility:awaitility:4.2.0' - testImplementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4:tests" // for LongPollingMockConsumer + testImplementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.3.3:tests" // for LongPollingMockConsumer testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.2' } diff --git a/confluent-parallel-consumer-application/kafka/settings.gradle b/confluent-parallel-consumer-application/kafka/settings.gradle index f118aff1..82b90ee0 100644 --- a/confluent-parallel-consumer-application/kafka/settings.gradle +++ b/confluent-parallel-consumer-application/kafka/settings.gradle @@ -6,6 +6,4 @@ * This project uses @Incubating APIs which are subject to change. */ -rootProject.name = 'parallel-consumer' -include ':common' -project(':common').projectDir = file('../../common') \ No newline at end of file +rootProject.name = 'parallel-consumer' \ No newline at end of file diff --git a/joining-stream-table/kstreams/src/test/java/io/confluent/developer/MovieRatingJoinerTest.java b/joining-stream-table/kstreams/src/test/java/io/confluent/developer/MovieRatingJoinerTest.java index d351a54c..a65a4f2d 100644 --- a/joining-stream-table/kstreams/src/test/java/io/confluent/developer/MovieRatingJoinerTest.java +++ b/joining-stream-table/kstreams/src/test/java/io/confluent/developer/MovieRatingJoinerTest.java @@ -12,9 +12,9 @@ class MovieRatingJoinerTest { void testApply() { RatedMovie actualRatedMovie; - Movie treeOfLife = new Movie(354, "Tree of Life", 2011); - Rating rating = new Rating(354, 9.8); - RatedMovie expectedRatedMovie = new RatedMovie(354, "Tree of Life", 2011, 9.8); + Movie treeOfLife = new Movie("354", "Tree of Life", 2011); + Rating rating = new Rating("354", 9.8); + RatedMovie expectedRatedMovie = new RatedMovie("354", "Tree of Life", 2011, 9.8); MovieRatingJoiner joiner = new MovieRatingJoiner(); actualRatedMovie = joiner.apply(rating, treeOfLife); diff --git a/joining-table-table/kstreams/src/test/java/io/confluent/developer/MovieRatingJoinerTest.java b/joining-table-table/kstreams/src/test/java/io/confluent/developer/MovieRatingJoinerTest.java index d351a54c..a65a4f2d 100644 --- a/joining-table-table/kstreams/src/test/java/io/confluent/developer/MovieRatingJoinerTest.java +++ b/joining-table-table/kstreams/src/test/java/io/confluent/developer/MovieRatingJoinerTest.java @@ -12,9 +12,9 @@ class MovieRatingJoinerTest { void testApply() { RatedMovie actualRatedMovie; - Movie treeOfLife = new Movie(354, "Tree of Life", 2011); - Rating rating = new Rating(354, 9.8); - RatedMovie expectedRatedMovie = new RatedMovie(354, "Tree of Life", 2011, 9.8); + Movie treeOfLife = new Movie("354", "Tree of Life", 2011); + Rating rating = new Rating("354", 9.8); + RatedMovie expectedRatedMovie = new RatedMovie("354", "Tree of Life", 2011, 9.8); MovieRatingJoiner joiner = new MovieRatingJoiner(); actualRatedMovie = joiner.apply(rating, treeOfLife); diff --git a/queues-for-kafka/.gitignore b/queues-for-kafka/.gitignore new file mode 100644 index 00000000..833a06c9 --- /dev/null +++ b/queues-for-kafka/.gitignore @@ -0,0 +1,3 @@ +gradle/ +gradlew +gradlew.bat diff --git a/queues-for-kafka/README.md b/queues-for-kafka/README.md new file mode 100644 index 00000000..cfe0e4a0 --- /dev/null +++ b/queues-for-kafka/README.md @@ -0,0 +1,329 @@ + + + +# How to scale Kafka consumption throughput with share consumers (KIP-932: Queues for Kafka) + +This tutorial demonstrates how to produce a high volume of messages to Kafka, and then compare consumption throughput when using both regular consumers and share consumers. The steps in this tutorial outline how to set up a cluster for share consumers, run the provided producer / consumer applications, and compare performance results between classic Kafka consumer instances and share consumers. For a deeper look at the application source code, refer to the `Code explanation` section at the bottom. + +The following steps use Confluent Cloud. To run the tutorial locally with Docker, skip to the `Docker instructions` section at the bottom. + +## Prerequisites + +- A [Confluent Cloud](https://confluent.cloud/signup) account +- The [Confluent CLI](https://docs.confluent.io/confluent-cli/current/install.html) installed on your machine +- [Apache Kafka 4.1](https://kafka.apache.org/downloads) for its command-line tools +- Clone the `confluentinc/tutorials` repository and navigate into its top-level directory: + ```shell + git clone git@github.com:confluentinc/tutorials.git + cd tutorials + ``` + +## Create Confluent Cloud resources + +First, create a Dedicated 1-CKU cluster in Confluent Cloud by following the instructions [here](https://docs.confluent.io/cloud/current/clusters/create-cluster.html#create-ak-clusters). + +Since Queues for Kafka is currently a Closed Preview feature, you'll need to open a support request to enable the feature on your cluster. In the [Confluent Support Portal](https://support.confluent.io/), open a ticket requesting that Queues for Kafka be enabled for your cluster. Provide the cluster ID in your request, which you can find in the [Confluent Cloud Console](https://confluent.cloud/) by navigating to `Cluster Settings` from your Dedicated cluster overview page. + +## Confluent CLI setup + +Run the following series of commands to log in and set the active Confluent Cloud environment and cluster. + +```shell +confluent login --prompt --save +confluent environment list +confluent environment use +confluent kafka cluster list +confluent kafka cluster use +``` + +## Generate Confluent Cloud credentials + +Generate a Kafka API key by substituting the cluster ID from the previous command: + +```shell +confluent api-key create --resource +``` + +Copy the API key into the file `queues-for-kafka/src/main/resources/cloud.properties` where you see the `` placeholder, and copy the API secret where you see the `` placeholder. + +Run this command to get your cluster's bootstrap servers endpoint: + +```shell +confluent kafka cluster describe +``` + +Copy the endpoint (of the form `pkc-...confluent.cloud:9092`) into the same `cloud.properties` file where you see the `` placeholder. Do not copy the leading `SASL_SSL://`. + +## Create topic + +Create a 6-partition topic called `strings` that we will use to test consumption throughput. + +```shell +confluent kafka topic create strings --partitions 6 +``` + +## Compile and run the producer application + +Compile the application from the top-level `tutorials` repository directory: + +```shell +./gradlew queues-for-kafka:shadowJar +``` + +Navigate into the application's home directory: + +```shell +cd queues-for-kafka +``` + +Run the producer application, passing the `cloud.properties` Kafka client configuration file that you populated with your Dedicated cluster's bootstrap servers endpoint and credentials: + +```shell +java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \ + io.confluent.developer.ProducerApp \ + --properties-file ./src/main/resources/cloud.properties +``` + +## Run consumer applications + +In a separate shell, run the regular `KafkaConsumer`-based application. This will run 16 concurrent consumers. Only 6 will actively consume since a partition can only be assigned to one consumer instance. It will simulate a 500-millisecond workload and report throughput after consuming 1,000 events. + +```shell +java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \ + io.confluent.developer.ConsumerApp \ + --properties-file ./src/main/resources/cloud.properties \ + --consumer-type consumer \ + --num-consumers 16 \ + --wait-ms 500 \ + --total-events 1000 +``` + +The app will exit once 1,000 events have been consumed, which should take around a minute and a half. You will see a log message like this reporting the duration: + +```plaintext +Completed consuming 1000 messages in 89.61 seconds. +``` + +Next, run the consumer application using share consumers. + +First, alter the `share-consumer-group` to begin consuming from the earliest offset: + +```shell +/bin/kafka-configs.sh --bootstrap-server \ + --group share-consumer-group --alter --add-config 'share.auto.offset.reset=earliest' \ + --command-config ./src/main/resources/cloud.properties +``` + +Run the consumer app again using the same number of threads and simulated event processing time, except this time pass the `share_consumer` consumer type: + +```shell +java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \ + io.confluent.developer.ConsumerApp \ + --properties-file ./src/main/resources/cloud.properties \ + --consumer-type share_consumer \ + --num-consumers 16 \ + --wait-ms 500 \ + --total-events 1000 +``` + +This time, the app should take closer to 30 seconds to complete, given that consumption scales to all 16 threads. You will see a log message like this reporting the duration: + +```plaintext +Completed consuming 1000 messages in 31.42 seconds. +``` + +## Other suggested experiments + +Try different application configurations to see how consumption throughput is impacted. For example, vary `--num-consumers` and `--wait-ms` to see how throughput scales with more workers and different per-event wait times. Also try a different number of topic partitions. How does it impact consumption throughput? + +## Clean up + +When you are finished, delete the Confluent Cloud resources created for this tutorial. For example, if you are using an isolated environment, delete it by first getting the environment ID in the form `env-123456`: + +```shell +confluent environment list +``` + +Delete the environment, including all resources created for this tutorial: + +```shell +confluent environment delete +``` + +
+ Docker instructions + + ## Prerequisites + + * Docker running via [Docker Desktop](https://docs.docker.com/desktop/) or [Docker Engine](https://docs.docker.com/engine/install/) + * [Docker Compose](https://docs.docker.com/compose/install/). Ensure that the command `docker compose version` succeeds. + * Clone the `confluentinc/tutorials` repository and navigate into its top-level directory: + ```shell + git clone git@github.com:confluentinc/tutorials.git + cd tutorials + ``` + + ## Start Kafka in Docker + + Start Apache Kafka 4.1 with the following command: + + ```shell + docker compose -f ./queues-for-kafka/docker-compose.yml up -d + ``` + + ## Enable share consumption + + Open a shell in the broker container: + + ```shell + docker exec --workdir /opt/kafka/bin/ -it broker /bin/bash + ``` + + Enable share consumers: + + ```shell + ./kafka-features.sh --bootstrap-server localhost:9092 upgrade --feature share.version=1 + ``` + + Alter the `share-consumer-group` share group to begin consuming from the earliest offset: + + ```shell + ./kafka-configs.sh --bootstrap-server localhost:9092 \ + --group share-consumer-group --alter \ + --add-config 'share.auto.offset.reset=earliest' + ``` + + ## Create topics + + In the broker container, create a topic called `strings` with 6 partitions: + + ```shell + ./kafka-topics.sh --bootstrap-server localhost:9092 --create \ + --partitions 6 --topic strings + ``` + + Enter `Ctrl+D` to exit the container shell. + + ## Compile and run the producer application + + On your local machine, compile the app: + + ```shell + ./gradlew queues-for-kafka:shadowJar + ``` + + Navigate into the application's home directory: + + ```shell + cd queues-for-kafka + ``` + + Run the producer application, passing the `local.properties` Kafka client configuration file that points to the broker's bootstrap servers endpoint at `localhost:9092`: + + ```shell + java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \ + io.confluent.developer.ProducerApp \ + --properties-file ./src/main/resources/local.properties + ``` + + ## Run consumer applications + + In a separate shell, run the regular `KafkaConsumer`-based application. This will run 16 concurrent consumers. Only 6 will actively consume since a partition can only be assigned to one consumer instance. It will simulate a 500-millisecond workload and report throughput after consuming 1,000 events. + + ```shell + java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \ + io.confluent.developer.ConsumerApp \ + --properties-file ./src/main/resources/local.properties \ + --consumer-type consumer \ + --num-consumers 16 \ + --wait-ms 500 \ + --total-events 1000 + ``` + + The app will exit once 1,000 events have been consumed, which should take around a minute and a half. You will see a log message like this reporting the duration: + + ```plaintext + Completed consuming 1000 messages in 89.61 seconds. + ``` + + Next, run the consumer app again using the same number of threads and simulated event processing time, except this time pass the `share_consumer` consumer type: + + ```shell + java -cp ./build/libs/kafka-consumer-comparison-app-0.0.1.jar \ + io.confluent.developer.ConsumerApp \ + --properties-file ./src/main/resources/local.properties \ + --consumer-type share_consumer \ + --num-consumers 16 \ + --wait-ms 500 \ + --total-events 1000 + ``` + + This time, the app should take closer to 30 seconds to complete, given that consumption scales to all 16 threads. You will see a log message like this reporting the duration: + + ```plaintext + Completed consuming 1000 messages in 31.42 seconds. + ``` + + ## Other suggested experiments + + Try different application configurations to see how consumption throughput is impacted. For example, vary `--num-consumers` and `--wait-ms` to see how throughput scales with more workers and different per-event wait times. Also try a different number of topic partitions. How does it impact consumption throughput? + + ## Clean up + + From your local machine, stop the broker container: + + ```shell + docker compose -f ./queues-for-kafka/docker-compose.yml down + ``` + +
+ +
+ Code explanation + + This section summarizes the key application source files under `src/main/java/io/confluent/developer`. + + - **`ProducerApp.java`**: Standalone producer that sends a high volume of string messages to the `strings` topic. + + - Parses CLI options via `ProducerAppArgParser` to locate the Kafka client properties file. + - Builds a `KafkaProducer` with `StringSerializer` for keys/values and `acks=all` for durability. + - Produces 1,000,000 messages (key and value are the stringified index), logs progress every 10,000, and throttles briefly to keep the producer running longer and avoid overwhelming the broker. + - The producer flushes every event so that there aren't large batches. This ensures that each multiple share consumers will be able to actively consume from a given partition. + + - **`ConsumerApp.java`**: Orchestrates multi-threaded consumption to compare regular `KafkaConsumer` vs `KafkaShareConsumer`-based throughput. + + - Parses CLI options via `ConsumerAppArgParser`: + - `--consumer-type` selects `consumer` (regular) or `share_consumer` (share consumer). + - `--num-consumers` controls the number of consumer worker threads. + - `--wait-ms` simulates per-record processing time (sleep per event). + - `--total-events` stops after consuming the specified number of events across all workers. + - Builds consumer properties common to both implementations: `StringDeserializer` for keys/values and the appropriate group. + - For regular consumers: sets `group.id=consumer-group` and `auto.offset.reset=earliest`. + - For share consumers: sets `group.id=share-consumer-group`, `share.acknowledgement.mode=explicit`, and `max.poll.records=100`. + - Creates an `ExecutorService` and launches N `ConsumerThread` workers with an event handler that: + - Sleeps for `--wait-ms` to simulate work. + - Atomically counts records across all workers, logs progress periodically, and records total elapsed time when `--total-events` is reached. + - Adds a shutdown hook to close all consumers and the executor service cleanly. + + - **`ConsumerThread.java`**: A runnable worker used by `ConsumerApp` that encapsulates the consumption loop for either consumer type. + + - Two constructors: one for a `KafkaConsumer` and one for a `KafkaShareConsumer`. Each subscribes the consumer to the `strings` topic. + - In `run()`, polls for events and invokes the provided `EventHandler` per record. + - Share consumer path: after handling a record, explicitly acknowledges with `AcknowledgeType.ACCEPT` or `REJECT` on error. + - Regular consumer path: handles records without explicit acknowledgements (normal consumer semantics). + - Cleanly closes the underlying consumers and exits once the desired number of events is consumed. + + - **`ConsumerAppArgParser.java`**: Command-line parsing and validation for the consumer app using Apache Commons CLI. + + - Options: `--properties-file`, `--consumer-type`, `--num-consumers`, `--wait-ms`, and `--total-events`. + - Validates ranges (e.g., consumers 1–16, wait 1–5000 ms, events 1–1,000,000). + + - **`ProducerAppArgParser.java`**: Minimal command-line parsing for the producer app. + - Option: `--properties-file` to locate the Kafka client configuration. + + ### Notes + + - All examples use the topic `strings`. Adjust the topic name in the source if needed. + - Kafka client configuration is provided via the properties files in `src/main/resources` (`cloud.properties` or `local.properties`). + +
diff --git a/queues-for-kafka/build.gradle b/queues-for-kafka/build.gradle new file mode 100644 index 00000000..5c632e04 --- /dev/null +++ b/queues-for-kafka/build.gradle @@ -0,0 +1,53 @@ +buildscript { + repositories { + mavenCentral() + } + dependencies { + classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0" + } +} + +plugins { + id "java" + id "idea" + id "eclipse" +} + +sourceCompatibility = "17" +targetCompatibility = "17" +version = "0.0.1" + +repositories { + mavenCentral() + + maven { + url "https://packages.confluent.io/maven" + } +} + +apply plugin: "com.github.johnrengelman.shadow" + +dependencies { + implementation group: 'org.slf4j', name: 'slf4j-simple', version: '2.0.17' + implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.17' + implementation group: 'commons-cli', name: 'commons-cli', version: '1.10.0' + implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '4.1.0' + implementation group: 'org.apache.kafka', name: 'kafka-group-coordinator', version: '4.1.0' + + implementation group: 'org.projectlombok', name: 'lombok', version: '1.18.42' + annotationProcessor group: 'org.projectlombok', name: 'lombok', version: '1.18.42' +} + +jar { + manifest { + attributes( + "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "), + "Main-Class": "io.confluent.developer.ConsumerApp" + ) + } +} + +shadowJar { + archiveBaseName = "kafka-consumer-comparison-app" + archiveClassifier = '' +} diff --git a/queues-for-kafka/docker-compose.yml b/queues-for-kafka/docker-compose.yml new file mode 100644 index 00000000..b1126a84 --- /dev/null +++ b/queues-for-kafka/docker-compose.yml @@ -0,0 +1,27 @@ +services: + broker: + image: apache/kafka:4.1.0 + hostname: broker + container_name: broker + ports: + - 9092:9092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_NODE_ID: 1 + KAFKA_UNSTABLE_API_VERSIONS_ENABLE: true + KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: classic,consumer,share + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093 + KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk diff --git a/queues-for-kafka/settings.gradle b/queues-for-kafka/settings.gradle new file mode 100644 index 00000000..5c88c337 --- /dev/null +++ b/queues-for-kafka/settings.gradle @@ -0,0 +1,3 @@ +rootProject.name = 'kafka-consumer-comparison-app' +include ':common' +project(':common').projectDir = file('../../common') diff --git a/queues-for-kafka/src/main/java/io/confluent/developer/ConsumerApp.java b/queues-for-kafka/src/main/java/io/confluent/developer/ConsumerApp.java new file mode 100644 index 00000000..bb5e940b --- /dev/null +++ b/queues-for-kafka/src/main/java/io/confluent/developer/ConsumerApp.java @@ -0,0 +1,159 @@ +package io.confluent.developer; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.confluent.developer.ConsumerAppArgParser.streamFromFile; +import static java.util.concurrent.Executors.newFixedThreadPool; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + +public class ConsumerApp { + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerApp.class); + + private static Properties getKafkaProperties(String kafkaProperties, ConsumerAppArgParser.ConsumerType consumerType) { + Properties properties = new Properties(); + try (InputStream inputStream = streamFromFile(kafkaProperties)) { + properties.load(inputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + + properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); + properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); + if (consumerType.equals(ConsumerAppArgParser.ConsumerType.SHARE_CONSUMER)) { + properties.put(SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); + properties.put(MAX_POLL_RECORDS_CONFIG, "100"); + properties.put(GROUP_ID_CONFIG, "share-consumer-group"); + } else { + properties.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); // only applies to KafkaConsumer + properties.put(GROUP_ID_CONFIG, "consumer-group"); + } + + return properties; + } + + + public static void main(final String[] args) { + ConsumerAppArgParser cmdArgs = ConsumerAppArgParser.parseOptions(args); + final String topic = "strings"; + + final ConsumerAppArgParser.ConsumerType consumerType = cmdArgs.getConsumerType(); + final int numConsumerThreads = cmdArgs.getNumConsumers(); + final int processingTimeMs = cmdArgs.getSleepMs(); + final int eventsToConsume = cmdArgs.getTotalEvents(); + + AtomicBoolean printedCompletion = new AtomicBoolean(false); + AtomicInteger eventCounter = new AtomicInteger(0); + + LOGGER.info("starting {} consumers", numConsumerThreads); + + ExecutorService executorService = newFixedThreadPool(1 + numConsumerThreads); + List> consumerThreads = new ArrayList<>(); + List consumerThreadFutures = new ArrayList<>(); + long startTime = System.nanoTime(); + long eventsPerSec = Math.round(1000.0 / processingTimeMs * numConsumerThreads); + + for (int i = 0; i < numConsumerThreads; i++) { + final String consumerId = "consumer-" + i; + + ConsumerThread consumer; + ConsumerThread.EventHandler eventHandler = (ConsumerThread.EventHandler) (key, value, partition, offset) -> { + try { + Thread.sleep(processingTimeMs); + int currentCount = eventCounter.incrementAndGet(); + + // log progress about once every 5 seconds + if (currentCount % (5 * eventsPerSec) == 0) { + LOGGER.info("Consumed {} of {} events so far", currentCount, eventsToConsume); + } + + if (currentCount >= eventsToConsume) { + long endTime = System.nanoTime(); + double elapsedTimeSeconds = (double) (endTime - startTime) / 1_000_000_000; + if (printedCompletion.getAndSet(true) == false) { + LOGGER.info("Completed consuming {} messages in {} seconds.", eventsToConsume, elapsedTimeSeconds); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }; + + if (consumerType.equals(ConsumerAppArgParser.ConsumerType.SHARE_CONSUMER)) { + KafkaShareConsumer shareConsumer = new KafkaShareConsumer<>(getKafkaProperties(cmdArgs.getKafkaProperties(), consumerType)); + consumer = new ConsumerThread<>( + consumerId, + shareConsumer, + eventHandler, + eventCounter, + eventsToConsume, + topic); + consumerThreads.add(consumer); + consumerThreadFutures.add(executorService.submit(consumer)); + } else { + KafkaConsumer kafkaConsumer = new KafkaConsumer<>(getKafkaProperties(cmdArgs.getKafkaProperties(), consumerType)); + consumer = new ConsumerThread<>( + consumerId, + kafkaConsumer, + eventHandler, + eventCounter, + eventsToConsume, + topic); + consumerThreads.add(consumer); + consumerThreadFutures.add(executorService.submit(consumer)); + } + } + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + LOGGER.info("Shutdown signal received"); + + for (ConsumerThread consumerThread : consumerThreads) { + consumerThread.shutdown(); + } + + executorService.shutdown(); + try { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + })); + + for (Future consumerThreadFuture : consumerThreadFutures) { + try { + consumerThreadFuture.get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + LOGGER.info("Consumers completed"); + System.exit(0); + } + +} diff --git a/queues-for-kafka/src/main/java/io/confluent/developer/ConsumerAppArgParser.java b/queues-for-kafka/src/main/java/io/confluent/developer/ConsumerAppArgParser.java new file mode 100644 index 00000000..53cfea18 --- /dev/null +++ b/queues-for-kafka/src/main/java/io/confluent/developer/ConsumerAppArgParser.java @@ -0,0 +1,180 @@ +package io.confluent.developer; + +import lombok.Getter; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.help.HelpFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; + +@Getter +public class ConsumerAppArgParser { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerAppArgParser.class); + + private final String kafkaProperties; + private final int numConsumers; + private final int sleepMs; + private final int totalEvents; + private final ConsumerType consumerType; + + enum ConsumerType { + CONSUMER, SHARE_CONSUMER + } + + ConsumerAppArgParser(String kafkaProperties, ConsumerType consumerType, int numConsumers, int sleepMs, int totalEvents) { + this.kafkaProperties = kafkaProperties; + this.consumerType = consumerType; + this.numConsumers = numConsumers; + this.sleepMs = sleepMs; + this.totalEvents = totalEvents; + } + + public static ConsumerAppArgParser parseOptions(String[] args) { + Options options = new Options(); + + options.addOption(Option.builder("p") + .longOpt("properties-file") + .hasArg() + .argName("Kafka properties file") + .desc("Path to Kafka properties file") + .type(String.class) + .get()); + + options.addOption(Option.builder("t") + .longOpt("consumer-type") + .hasArg() + .argName("Consumer Type") + .desc("Kafka consumer type (CONSUMER or SHARE_CONSUMER)") + .type(String.class) + .get()); + + options.addOption(Option.builder("n") + .longOpt("num-consumers") + .hasArg() + .argName("Number of Consumers") + .desc("Number of consumers to start (default: 5, max: 16)") + .type(Number.class) + .get()); + + options.addOption(Option.builder("e") + .longOpt("total-events") + .hasArg() + .argName("Number of Events") + .desc("Number of events to consume (default: 1000, max: 1000000)") + .type(Number.class) + .get()); + + options.addOption(Option.builder("w") + .longOpt("wait-ms") + .hasArg() + .argName("Sleep Time per Event in ms") + .desc("Sleep Time per Event in ms (default: 50, max: 5000)") + .type(Number.class) + .get()); + + CommandLineParser parser = new DefaultParser(); + HelpFormatter formatter = HelpFormatter.builder().get(); + CommandLine cmd = null; + + try { + cmd = parser.parse(options, args); + } catch (ParseException e) { + System.err.println("Error parsing command line arguments: " + e.getMessage()); + try { + formatter.printHelp("consumer-app", "Kafka Consumer testing app", options, "", true); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + System.exit(1); + } + + String kafkaProperties = cmd.getOptionValue("properties-file", "kafka.properties"); + + boolean helpRequested = cmd.hasOption("help") || cmd.hasOption("h"); + if (helpRequested) { + printHelp(); + System.exit(0); + } + + final int numConsumerArg = Integer.parseInt(cmd.getOptionValue("num-consumers", "5")); + if (numConsumerArg < 1 || numConsumerArg > 16) { + throw new IllegalArgumentException("Number of consumers must be between 1 and 16 (inclusive)"); + } + + final int sleepMsArg = Integer.parseInt(cmd.getOptionValue("wait-ms", "50")); + if (sleepMsArg < 1 || sleepMsArg > 5000) { + throw new IllegalArgumentException("Per-event sleep time must be between 1 and 5000 ms (inclusive)"); + } + + final int totalEventsArg = Integer.parseInt(cmd.getOptionValue("total-events", "1000")); + if (totalEventsArg < 1 || totalEventsArg > 1000000) { + throw new IllegalArgumentException("Number of events to consume must be between 1 and 1,000,000 (inclusive)"); + } + + final ConsumerType consumerType; + try { + consumerType = ConsumerType.valueOf(cmd.getOptionValue("consumer-type").toUpperCase()); + } catch (IllegalArgumentException | NullPointerException e) { + throw new IllegalArgumentException("Consumer type must be consumer or share_consumer"); + } + + return new ConsumerAppArgParser(kafkaProperties, consumerType, numConsumerArg, sleepMsArg, totalEventsArg); + } + + public static InputStream streamFromFile(String path) throws IOException { + LOGGER.info("Loading configuration from file: {}", path); + + // First try loading from classpath + InputStream is = ConsumerApp.class.getClassLoader().getResourceAsStream(path); + if (is != null) { + LOGGER.info("Found configuration file in classpath: {}", path); + return is; + } + + // If not found in classpath, try the filesystem + File file = new File(path); + if (file.exists()) { + LOGGER.info("Found configuration file at: {}", file.getAbsolutePath()); + return new FileInputStream(file); + } + + throw new IOException("Configuration file not found in classpath or at path: " + path); + } + + private static void printHelp() { + System.out.println("Kafka Consumer App - Command Line Arguments"); + System.out.println("--------------------------------------------"); + System.out.println("This application demonstrates performance differences between "); + System.out.println("Kafka consumers and share consumers."); + System.out.println(); + System.out.println("Required Arguments:"); + System.out.println(" -t, --consumer-type Type of consumer (consumer or share_consumer)"); + System.out.println(); + System.out.println("Optional Arguments:"); + System.out.println(" -p, --kafka-properties Kafka properties file"); + System.out.println(" (default: kafka.properties)"); + System.out.println(" -n, --num-consumers Number of consumers to start"); + System.out.println(" (default: 5, max: 16)"); + System.out.println(" -w, --wait-ms Number of ms to sleep when processing each event"); + System.out.println(" (default: 50, max: 5000)"); + System.out.println(" -e, --total-events Number of events to consume before exiting"); + System.out.println(" (default: 1,000, max: 1,000,000)"); + System.out.println(" -h, --help Display this help message"); + System.out.println(); + System.out.println("Example:"); + System.out.println(" java -jar kafka-consumer-comparison-app.jar \\"); + System.out.println(" --consumer-type share_consumer \\"); + System.out.println(" --num-consumers 16"); + } + +} diff --git a/queues-for-kafka/src/main/java/io/confluent/developer/ConsumerThread.java b/queues-for-kafka/src/main/java/io/confluent/developer/ConsumerThread.java new file mode 100644 index 00000000..a5d9ab9e --- /dev/null +++ b/queues-for-kafka/src/main/java/io/confluent/developer/ConsumerThread.java @@ -0,0 +1,124 @@ +package io.confluent.developer; + +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class ConsumerThread implements Runnable, AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerThread.class); + + private final String consumerId; + private final KafkaShareConsumer shareConsumer; + private final KafkaConsumer consumer; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final AtomicInteger eventCounter; + private final int eventsToConsume; + private final EventHandler eventHandler; + + public ConsumerThread(String consumerId, KafkaShareConsumer shareConsumer, + EventHandler eventHandler, + AtomicInteger eventCounter, + int eventsToConsume, + String...topics) { + this.consumerId = consumerId; + this.shareConsumer = shareConsumer; + this.consumer = null; + this.shareConsumer.subscribe(Arrays.asList(topics)); + this.eventHandler = eventHandler; + this.eventCounter = eventCounter; + this.eventsToConsume = eventsToConsume; + } + + public ConsumerThread(String consumerId, KafkaConsumer consumer, + EventHandler eventHandler, + AtomicInteger eventCounter, + int eventsToConsume, + String...topics) { + this.consumerId = consumerId; + this.consumer = consumer; + this.shareConsumer = null; + this.consumer.subscribe(Arrays.asList(topics)); + this.eventHandler = eventHandler; + this.eventCounter = eventCounter; + this.eventsToConsume = eventsToConsume; + } + + @Override + public void run() { + try { + while (!closed.get()) { + if (eventCounter.get() >= eventsToConsume) { + break; + } + if (shareConsumer != null) { + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); + + for (ConsumerRecord record : records) { + try { + if (eventCounter.get() >= eventsToConsume) { + break; + } + eventHandler.handleEvent(record.key(), record.value(), record.partition(), record.offset()); + shareConsumer.acknowledge(record, AcknowledgeType.ACCEPT); + } catch (Exception e) { + LOGGER.error("consumer {}: error handling event: {}", consumerId, e.getMessage(), e); + shareConsumer.acknowledge(record, AcknowledgeType.REJECT); + } + } + } else { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + + for (ConsumerRecord record : records) { + try { + if (eventCounter.get() >= eventsToConsume) { + break; + } + eventHandler.handleEvent(record.key(), record.value(), record.partition(), record.offset()); + } catch (Exception e) { + LOGGER.error("consumer {}: error handling event: {}", consumerId, e.getMessage(), e); + } + } + } + } + } catch (WakeupException e) { + // Ignore if closing + if (!closed.get()) { + throw e; + } + } finally { + if (consumer != null) { + consumer.close(); + } else { + shareConsumer.close(); + } + LOGGER.info("consumer {} closed", consumerId); + } + } + + + public void shutdown() { + closed.set(true); + if (shareConsumer != null) { + shareConsumer.wakeup(); + } + } + + @Override + public void close() { + shutdown(); + } + + public interface EventHandler { + void handleEvent(Key key, Value value, int partition, long offset); + } +} diff --git a/queues-for-kafka/src/main/java/io/confluent/developer/ProducerApp.java b/queues-for-kafka/src/main/java/io/confluent/developer/ProducerApp.java new file mode 100644 index 00000000..7e44e8c3 --- /dev/null +++ b/queues-for-kafka/src/main/java/io/confluent/developer/ProducerApp.java @@ -0,0 +1,67 @@ +package io.confluent.developer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import static io.confluent.developer.ConsumerAppArgParser.streamFromFile; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + +public class ProducerApp { + private static final Logger LOGGER = LoggerFactory.getLogger(ProducerApp.class); + + private static Properties getKafkaProperties(String kafkaProperties) { + Properties properties = new Properties(); + try (InputStream inputStream = streamFromFile(kafkaProperties)) { + properties.load(inputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + + properties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); + properties.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); + properties.put(ACKS_CONFIG, "all"); + + return properties; + } + + public static void main(final String[] args) { + ProducerAppArgParser cmdArgs = ProducerAppArgParser.parseOptions(args); + Properties props = getKafkaProperties(cmdArgs.getKafkaProperties()); + final String topic = "strings"; + + try (final Producer producer = new KafkaProducer<>(props)) { + final int numMessages = 1000000; + for (Integer i = 0; i < numMessages; i++) { + String message = i.toString(); + final int finalI = i; + producer.send( + new ProducerRecord<>(topic, message, message), + (event, ex) -> { + if (ex != null) + ex.printStackTrace(); + else + if (finalI % 10000 == 0) + LOGGER.info("Produced {} of {} events so far", finalI, numMessages); + }); + producer.flush(); + if (i % 1000 == 0) { + Thread.sleep(100); + } + } + System.out.printf("%s events were produced to topic %s%n", numMessages, topic); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + } +} diff --git a/queues-for-kafka/src/main/java/io/confluent/developer/ProducerAppArgParser.java b/queues-for-kafka/src/main/java/io/confluent/developer/ProducerAppArgParser.java new file mode 100644 index 00000000..07329481 --- /dev/null +++ b/queues-for-kafka/src/main/java/io/confluent/developer/ProducerAppArgParser.java @@ -0,0 +1,102 @@ +package io.confluent.developer; + +import lombok.Getter; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.help.HelpFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; + +@Getter +public class ProducerAppArgParser { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProducerAppArgParser.class); + + private final String kafkaProperties; + + ProducerAppArgParser(String kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } + + public static ProducerAppArgParser parseOptions(String[] args) { + Options options = new Options(); + + options.addOption(Option.builder("p") + .longOpt("properties-file") + .hasArg() + .argName("Kafka properties file") + .desc("Path to Kafka properties file") + .type(String.class) + .get()); + + CommandLineParser parser = new DefaultParser(); + HelpFormatter formatter = HelpFormatter.builder().get(); + CommandLine cmd = null; + + try { + cmd = parser.parse(options, args); + } catch (ParseException e) { + System.err.println("Error parsing command line arguments: " + e.getMessage()); + try { + formatter.printHelp("producer-app", "Kafka producer app", options, "", true); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + System.exit(1); + } + + String kafkaProperties = cmd.getOptionValue("properties-file", "kafka.properties"); + + boolean helpRequested = cmd.hasOption("help") || cmd.hasOption("h"); + if (helpRequested) { + printHelp(); + System.exit(0); + } + + return new ProducerAppArgParser(kafkaProperties); + } + + public static InputStream streamFromFile(String path) throws IOException { + LOGGER.info("Loading configuration from file: {}", path); + + // First try loading from classpath + InputStream is = ProducerAppArgParser.class.getClassLoader().getResourceAsStream(path); + if (is != null) { + LOGGER.info("Found configuration file in classpath: {}", path); + return is; + } + + // If not found in classpath, try the filesystem + File file = new File(path); + if (file.exists()) { + LOGGER.info("Found configuration file at: {}", file.getAbsolutePath()); + return new FileInputStream(file); + } + + throw new IOException("Configuration file not found in classpath or at path: " + path); + } + + private static void printHelp() { + System.out.println("Kafka Producer App - Command Line Arguments"); + System.out.println("--------------------------------------------"); + System.out.println("This application produces a bunch of strings to a Kafka topic."); + System.out.println(); + System.out.println("Optional Arguments:"); + System.out.println(" -p, --kafka-properties Kafka properties file"); + System.out.println(" (default: kafka.properties)"); + System.out.println(" -h, --help Display this help message"); + System.out.println(); + System.out.println("Example:"); + System.out.println(" java -jar kafka-consumer-comparison-app.jar"); + } + +} diff --git a/queues-for-kafka/src/main/resources/cloud.properties b/queues-for-kafka/src/main/resources/cloud.properties new file mode 100644 index 00000000..f3cff624 --- /dev/null +++ b/queues-for-kafka/src/main/resources/cloud.properties @@ -0,0 +1,4 @@ +bootstrap.servers= +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='' password=''; +security.protocol=SASL_SSL +sasl.mechanism=PLAIN diff --git a/queues-for-kafka/src/main/resources/local.properties b/queues-for-kafka/src/main/resources/local.properties new file mode 100644 index 00000000..1f98d27d --- /dev/null +++ b/queues-for-kafka/src/main/resources/local.properties @@ -0,0 +1 @@ +bootstrap.servers=localhost:9092 diff --git a/queues-for-kafka/src/main/resources/logback.xml b/queues-for-kafka/src/main/resources/logback.xml new file mode 100644 index 00000000..b6b9be1d --- /dev/null +++ b/queues-for-kafka/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/queues-for-kafka/src/main/resources/simplelogger.properties b/queues-for-kafka/src/main/resources/simplelogger.properties new file mode 100644 index 00000000..44847358 --- /dev/null +++ b/queues-for-kafka/src/main/resources/simplelogger.properties @@ -0,0 +1,2 @@ +org.slf4j.simpleLogger.showDateTime=true +org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd'T'HH:mm:ss.SSS \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index d00cfab8..3c0faf35 100644 --- a/settings.gradle +++ b/settings.gradle @@ -54,6 +54,7 @@ include 'naming-changelog-repartition-topics:kstreams' include 'over-aggregations:flink_table_api_java' include 'over-aggregations:flinksql' include 'pattern-matching:flinksql' +include 'queues-for-kafka' include 'reordering-streams:kstreams' include 'schedule-ktable-ttl:kstreams' include 'schedule-ktable-ttl-aggregate:kstreams'