Skip to content

Commit

Permalink
Add changes from Luke
Browse files Browse the repository at this point in the history
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
  • Loading branch information
fvaleri committed May 9, 2023
1 parent a941308 commit c189a64
Showing 1 changed file with 17 additions and 16 deletions.
Expand Up @@ -69,16 +69,16 @@ public ExactlyOnceMessageProcessor(String threadName,
this.bootstrapServers = bootstrapServers;
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.transactionalId = threadName;
this.transactionalId = "tid-" + threadName;
// It is recommended to have a relatively short txn timeout in order to clear pending offsets faster.
final int transactionTimeoutMs = 10000;
int transactionTimeoutMs = 10_000;
// A unique transactional.id must be provided in order to properly use EOS.
producer = new Producer(
"processor-producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null)
.createKafkaProducer();
// Consumer must be in read_committed mode, which means it won't be able to read uncommitted data.
// Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances.
this.groupInstanceId = threadName;
this.groupInstanceId = "giid-" + threadName;
boolean readCommitted = true;
consumer = new Consumer(
"processor-consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
Expand All @@ -90,18 +90,14 @@ public ExactlyOnceMessageProcessor(String threadName,
public void run() {
int processedRecords = 0;
long remainingRecords = Long.MAX_VALUE;
try {
// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
int transactionTimeoutMs = 10_000;
KafkaProducer<Integer, String> producer =
new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();

// consumer must be in read_committed mode, which means it won't be able to read uncommitted data
boolean readCommitted = true;
KafkaConsumer<Integer, String> consumer = new Consumer(
"processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
.createKafkaConsumer();

// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
int transactionTimeoutMs = 10_000;
// consumer must be in read_committed mode, which means it won't be able to read uncommitted data
boolean readCommitted = true;
try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
"processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
// called first and once to fence zombies and abort any pending transaction
producer.initTransactions();

Expand Down Expand Up @@ -132,8 +128,8 @@ public void run() {
}
} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
| FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
Utils.printErr(e.getMessage());
// we can't recover from these exceptions
Utils.printErr(e.getMessage());
shutdown();
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
// invalid or no offset found without auto.reset.policy
Expand Down Expand Up @@ -168,6 +164,11 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
Utils.printOut("Assigned partitions: %s", partitions);
}

@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
Utils.printOut("Lost partitions: %s", partitions);
}

public void shutdown() {
if (!closed) {
closed = true;
Expand Down

0 comments on commit c189a64

Please sign in to comment.