Skip to content

Commit

Permalink
Merge branch '7.2.x' into 7.3.x by rayokota
Browse files Browse the repository at this point in the history
  • Loading branch information
ConfluentSemaphore committed Jun 27, 2024
2 parents c7a36ec + 0829a75 commit f49b53a
Showing 1 changed file with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.RecordTooLargeException;
Expand Down Expand Up @@ -143,6 +144,9 @@ public void init() throws StoreInitializationException {
new KafkaStoreReaderThread<>(this.bootstrapBrokers, topic, groupId,
this.storeUpdateHandler, serializer, this.localStore,
this.producer, this.noopKey, this.initialized, this.config);
// checkpoint could be updated once the reader thread starts. This could result in a
// race condition where schemas after the checkpoint during startup would be double counted.
final Map<TopicPartition, Long> checkpoints = new HashMap<>(kafkaTopicReader.checkpoints());
this.kafkaTopicReader.start();

try {
Expand All @@ -156,7 +160,7 @@ public void init() throws StoreInitializationException {
throw new StoreInitializationException("Illegal state while initializing store. Store "
+ "was already initialized");
}
this.storeUpdateHandler.cacheInitialized(new HashMap<>(kafkaTopicReader.checkpoints()));
this.storeUpdateHandler.cacheInitialized(checkpoints);
initLatch.countDown();
}

Expand Down

0 comments on commit f49b53a

Please sign in to comment.