Skip to content

Commit

Permalink
Implement PR feedbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
k-raina committed May 16, 2024
1 parent 96b1cd1 commit 920437b
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions examples/src/main/java/kafka/examples/TransactionalClientDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -14,9 +13,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.AbstractMap;


import static java.time.Duration.ofSeconds;
Expand All @@ -25,6 +21,16 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.*;

/**
* This class demonstrates a transactional Kafka client application that consumes messages from an input topic,
* processes them to generate word count statistics, and produces the results to an output topic.
* It utilizes Kafka's transactional capabilities to ensure exactly-once processing semantics.
*
* The application continuously polls for records from the input topic, processes them, and commits the offsets
* in a transactional manner. In case of exceptions or errors, it handles them appropriately, either aborting the
* transaction and resetting to the last committed positions, or restarting the application.
*
*/
public class TransactionalClientDemo {

private static final String CONSUMER_GROUP_ID = "my-group-id";
Expand All @@ -45,15 +51,14 @@ public static void main(String[] args) {
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));

// Process records to generate word count map
Map<String, Integer> wordCountMap = records.records(new TopicPartition(INPUT_TOPIC, 0))
.stream()
.flatMap(record -> Stream.of(record.value().split(" ")))
.map(word -> new AbstractMap.SimpleEntry<>(word, 1))
.collect(Collectors.toMap(
AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue,
(v1, v2) -> v1 + v2
));
Map<String, Integer> wordCountMap = new HashMap<>();

for (ConsumerRecord<String, String> record : records) {
String[] words = record.value().split(" ");
for (String word : words) {
wordCountMap.merge(word, 1, Integer::sum);
}
}

// Begin transaction
producer.beginTransaction();
Expand All @@ -76,14 +81,14 @@ public static void main(String[] args) {
// Commit transaction
producer.commitTransaction();
} catch (AbortableTransactionException e) {
// Abortable Exception: Handle Kafka exception by aborting transaction. Abortable Exception should never be thrown.
// Abortable Exception: Handle Kafka exception by aborting transaction. AbortTransaction path never throws abortable exception.
producer.abortTransaction();
resetToLastCommittedPositions(consumer);
}
} catch (InvalidConfiguationTransactionException e) {
// Fatal Error: The error is bubbled up to the application layer. The application can decide what to do
closeAll();
throw InvalidConfiguationTransactionException;
throw e;
} catch (KafkaException e) {
// Application Recoverable: The application must restart
closeAll();
Expand Down

0 comments on commit 920437b

Please sign in to comment.