Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14752: Kafka examples improvements - consumer changes #13514

Merged
merged 7 commits into from May 2, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
165 changes: 107 additions & 58 deletions examples/src/main/java/kafka/examples/Consumer.java
Expand Up @@ -21,97 +21,146 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;

import static java.util.Collections.emptyList;
import static java.util.Collections.singleton;

/**
* A simple consumer thread that demonstrate subscribe and poll use case. The thread subscribes to a topic,
* then runs a loop to poll new messages, and print the message out. The thread closes until the target {@code
* numMessageToConsume} is hit or catching an exception.
* A simple consumer thread that subscribes to a topic, fetches new records and prints them.
* The thread does not stop until all records are completed or an exception is raised.
*/
public class Consumer extends Thread implements ConsumerRebalanceListener {
private final KafkaConsumer<Integer, String> consumer;
private final String bootstrapServers;
private final String topic;
private final String groupId;
private final int numMessageToConsume;
private int messageRemaining;
private final Optional<String> instanceId;
private final boolean readCommitted;
private final int numRecords;
private final CountDownLatch latch;
private volatile boolean closed;
private int remainingRecords;

public Consumer(final String topic,
final String groupId,
final Optional<String> instanceId,
final boolean readCommitted,
final int numMessageToConsume,
final CountDownLatch latch) {
super("KafkaConsumerExample");
this.groupId = groupId;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
if (readCommitted) {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
}
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

consumer = new KafkaConsumer<>(props);
public Consumer(String threadName,
String bootstrapServers,
String topic,
String groupId,
Optional<String> instanceId,
boolean readCommitted,
int numRecords,
CountDownLatch latch) {
super(threadName);
this.bootstrapServers = bootstrapServers;
this.topic = topic;
this.numMessageToConsume = numMessageToConsume;
this.messageRemaining = numMessageToConsume;
this.groupId = groupId;
this.instanceId = instanceId;
this.readCommitted = readCommitted;
this.numRecords = numRecords;
this.remainingRecords = numRecords;
this.latch = latch;
}

KafkaConsumer<Integer, String> get() {
return consumer;
}

@Override
public void run() {
try {
System.out.println("Subscribe to:" + this.topic);
consumer.subscribe(Collections.singletonList(this.topic), this);
do {
doWork();
} while (messageRemaining > 0);
System.out.println(groupId + " finished reading " + numMessageToConsume + " messages");
} catch (WakeupException e) {
// swallow the wakeup
} catch (Exception e) {
System.out.println("Unexpected termination, exception thrown:" + e);
} finally {
shutdown();
// the consumer instance is NOT thread safe
try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) {
// subscribes to a list of topics to get dynamically assigned partitions
// this class implements the rebalance listener that we pass here to be notified of such events
consumer.subscribe(singleton(topic), this);
fvaleri marked this conversation as resolved.
Show resolved Hide resolved
Utils.printOut("Subscribed to %s", topic);
while (!closed && remainingRecords > 0) {
try {
// if required, poll updates partition assignment and invokes the configured rebalance listener
// then tries to fetch records sequentially using the last committed offset or auto.offset.reset policy
// returns immediately if there are records or times out returning an empty record set
// the next poll must be called within session.timeout.ms to avoid group rebalance
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
fvaleri marked this conversation as resolved.
Show resolved Hide resolved
for (ConsumerRecord<Integer, String> record : records) {
Utils.maybePrintRecord(numRecords, record);
}
remainingRecords -= records.count();
} catch (AuthorizationException | UnsupportedVersionException
| RecordDeserializationException e) {
// we can't recover from these exceptions
Utils.printErr(e.getMessage());
shutdown();
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
// invalid or no offset found without auto.reset.policy
Utils.printOut("Invalid or no offset found, using latest");
consumer.seekToEnd(e.partitions());
consumer.commitSync();
showuon marked this conversation as resolved.
Show resolved Hide resolved
} catch (KafkaException e) {
fvaleri marked this conversation as resolved.
Show resolved Hide resolved
// log the exception and try to continue
Utils.printErr(e.getMessage());
}
}
} catch (Throwable e) {
Utils.printOut("Unhandled exception");
e.printStackTrace();
}
Utils.printOut("Fetched %d records", numRecords - remainingRecords);
shutdown();
}
public void doWork() {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Integer, String> record : records) {
System.out.println(groupId + " received message : from partition " + record.partition() + ", (" + record.key() + ", " + record.value() + ") at offset " + record.offset());

public void shutdown() {
if (!closed) {
closed = true;
latch.countDown();
}
messageRemaining -= records.count();
}

public void shutdown() {
this.consumer.close();
latch.countDown();
public KafkaConsumer<Integer, String> createKafkaConsumer() {
Properties props = new Properties();
// bootstrap server config is required for consumer to connect to brokers
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// client id is not required, but it's good to track the source of requests beyond just ip/port
// by allowing a logical application name to be included in server-side request logging
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
// consumer group id is required when we use subscribe(topics) for group management
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// sets static membership to improve availability (e.g. rolling restart)
instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
// disables auto commit when EOS is enabled, because offsets are committed with the transaction
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");
fvaleri marked this conversation as resolved.
Show resolved Hide resolved
// key and value are just byte arrays, so we need to set appropriate deserializers
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if (readCommitted) {
// skips ongoing and aborted transactions
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
}
// sets the reset offset policy in case of invalid or no offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Revoking partitions:" + partitions);
Utils.printOut("Revoked partitions: %s", partitions);
}
fvaleri marked this conversation as resolved.
Show resolved Hide resolved

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Assigning partitions:" + partitions);
Utils.printOut("Assigned partitions: %s", partitions);
}

@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
Utils.printOut("Lost partitions: %s", partitions);
}
}
Expand Up @@ -70,8 +70,10 @@ public ExactlyOnceMessageProcessor(final String inputTopic,
// Consumer must be in read_committed mode, which means it won't be able to read uncommitted data.
// Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances.
this.groupInstanceId = "Txn-consumer-" + instanceIdx;
consumer = new Consumer(inputTopic, "Eos-consumer",
Optional.of(groupInstanceId), READ_COMMITTED, -1, null).get();
boolean readCommitted = true;
consumer = new Consumer(
"processor-consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
.createKafkaConsumer();
this.latch = latch;
}

Expand Down
Expand Up @@ -29,7 +29,8 @@ public static void main(String[] args) throws InterruptedException {
Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch);
producerThread.start();

Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, "DemoConsumer", Optional.empty(), false, 10000, latch);
Consumer consumerThread = new Consumer(
"consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, "DemoConsumer", Optional.empty(), false, 10000, latch);
consumerThread.start();

if (!latch.await(5, TimeUnit.MINUTES)) {
Expand Down
Expand Up @@ -114,7 +114,8 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc
CountDownLatch consumeLatch = new CountDownLatch(1);

/* Stage 4: consume all processed messages to verify exactly once */
Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch);
Consumer consumerThread = new Consumer(
"consumer", "DemoConsumer", OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch);
consumerThread.start();

if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
Expand Down
106 changes: 106 additions & 0 deletions examples/src/main/java/kafka/examples/Utils.java
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.examples;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.lang.String.format;

public class Utils {
private Utils() {
}

public static void printHelp(String message, Object... args) {
System.out.println(format(message, args));
}

public static void printOut(String message, Object... args) {
System.out.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args));
}

public static void printErr(String message, Object... args) {
System.err.printf("%s - %s%n", Thread.currentThread().getName(), format(message, args));
}

public static void maybePrintRecord(long numRecords, ConsumerRecord<Integer, String> record) {
maybePrintRecord(numRecords, record.key(), record.value(), record.topic(), record.partition(), record.offset());
}

public static void maybePrintRecord(long numRecords, int key, String value, RecordMetadata metadata) {
maybePrintRecord(numRecords, key, value, metadata.topic(), metadata.partition(), metadata.offset());
}

private static void maybePrintRecord(long numRecords, int key, String value, String topic, int partition, long offset) {
// we only print 10 records when there are 20 or more to send
fvaleri marked this conversation as resolved.
Show resolved Hide resolved
if (key % Math.max(1, numRecords / 10) == 0) {
printOut("Sample: record(%d, %s), partition(%s-%d), offset(%d)", key, value, topic, partition, offset);
}
}

public static void recreateTopics(String bootstrapServers, int numPartitions, String... topicNames) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AdminClientConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
try (Admin admin = Admin.create(props)) {
// delete topics if present
try {
admin.deleteTopics(Arrays.asList(topicNames)).all().get();
} catch (ExecutionException e) {
if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
throw e;
}
printErr("Topics deletion error: %s", e.getCause());
}
printOut("Deleted topics: %s", Arrays.toString(topicNames));
// create topics in a retry loop
while (true) {
// use default RF to avoid NOT_ENOUGH_REPLICAS error with minISR > 1
short replicationFactor = -1;
List<NewTopic> newTopics = Arrays.stream(topicNames)
.map(name -> new NewTopic(name, numPartitions, replicationFactor))
.collect(Collectors.toList());
try {
admin.createTopics(newTopics).all().get();
printOut("Created topics: %s", Arrays.toString(topicNames));
break;
} catch (ExecutionException e) {
if (!(e.getCause() instanceof TopicExistsException)) {
throw e;
}
printOut("Waiting for topics metadata cleanup");
TimeUnit.MILLISECONDS.sleep(1_000);
}
}
} catch (Throwable e) {
throw new RuntimeException("Topics creation error", e);
}
}
}