From 73d062f4e3693b26adfa0b18eace2c3c1d81570a Mon Sep 17 00:00:00 2001 From: Janindu Pathirana Date: Mon, 3 Mar 2025 01:46:48 +0530 Subject: [PATCH] KAFKA-18168: Adding checkpointing for GlobalKTable during restoration and closing (#18752) To address the issue of not creating a checkpoint file during the restoring and closing process, called the GlobalStateUpdateTask.flushState() method in GlobalStateUpdateTask.initialize() and GlobalStateUpdateTask.close() methods. This will flush the state and create a checkpoint file thereby, avoiding the need to completely restore the entire state. Reviewers: Alieh Saeedi , Matthias J. Sax --- .../internals/GlobalStateUpdateTask.java | 4 +++ .../internals/GlobalStateTaskTest.java | 33 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 12a6beedbcd98..ea0359225de43 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -96,6 +96,7 @@ public Map initialize() { } initTopology(); processorContext.initialize(); + flushState(); lastFlush = time.milliseconds(); return stateMgr.changelogOffsets(); } @@ -138,6 +139,9 @@ public void flushState() { } public void close(final boolean wipeStateStore) throws IOException { + if (!wipeStateStore) { + flushState(); + } stateMgr.close(); if (wipeStateStore) { try { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index 44839f5ddb3be..d839644c70313 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -255,6 +255,10 @@ public void shouldNotCheckpointIfNotReceivedEnoughRecords() { globalStateTask.initialize(); globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9000L, "foo".getBytes(), "foo".getBytes())); time.sleep(flushInterval); // flush interval elapsed + + stateMgr.checkpointWritten = false; + stateMgr.flushed = false; + globalStateTask.maybeCheckpoint(); assertEquals(offsets, stateMgr.changelogOffsets()); @@ -270,6 +274,10 @@ public void shouldNotCheckpointWhenFlushIntervalHasNotLapsed() { globalStateTask.update(record(topic1, 1, currentOffsetT1 + 10000L, "foo".getBytes(), "foo".getBytes())); time.sleep(flushInterval / 2); + + stateMgr.checkpointWritten = false; + stateMgr.flushed = false; + globalStateTask.maybeCheckpoint(); assertEquals(offsets, stateMgr.changelogOffsets()); @@ -289,6 +297,10 @@ public void shouldCheckpointIfReceivedEnoughRecordsAndFlushIntervalHasElapsed() // 10000 records received since last flush => do not flush globalStateTask.update(record(topic1, 1, currentOffsetT1 + 9999L, "foo".getBytes(), "foo".getBytes())); + + stateMgr.checkpointWritten = false; + stateMgr.flushed = false; + globalStateTask.maybeCheckpoint(); assertEquals(offsets, stateMgr.changelogOffsets()); @@ -334,4 +346,25 @@ public void shouldWipeGlobalStateDirectory() throws Exception { globalStateTask.close(true); assertFalse(stateMgr.baseDir().exists()); } + + @Test + public void shouldCheckpointDuringInitialization() { + globalStateTask.initialize(); + + assertTrue(stateMgr.checkpointWritten); + assertTrue(stateMgr.flushed); + } + + @Test + public void shouldCheckpointDuringClose() throws Exception { + globalStateTask.initialize(); + + stateMgr.checkpointWritten = false; + stateMgr.flushed = false; + + globalStateTask.close(false); + + assertTrue(stateMgr.checkpointWritten); + assertTrue(stateMgr.flushed); + } }