Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14273; Close file before atomic move #14354

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 19 additions & 12 deletions raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
Expand Up @@ -64,6 +64,7 @@ public class FileBasedStateStore implements QuorumStateStore {
private final File stateFile;

static final String DATA_VERSION = "data_version";
static final short HIGHEST_SUPPORTED_VERSION = 0;

public FileBasedStateStore(final File stateFile) {
this.stateFile = stateFile;
Expand Down Expand Up @@ -144,21 +145,27 @@ private void writeElectionStateToFile(final File stateFile, QuorumStateData stat

log.trace("Writing tmp quorum state {}", temp.getAbsolutePath());

try (final FileOutputStream fileOutputStream = new FileOutputStream(temp);
final BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
short version = state.highestSupportedVersion();

ObjectNode jsonState = (ObjectNode) QuorumStateDataJsonConverter.write(state, version);
jsonState.set(DATA_VERSION, new ShortNode(version));
writer.write(jsonState.toString());
writer.flush();
fileOutputStream.getFD().sync();
try {
try (final FileOutputStream fileOutputStream = new FileOutputStream(temp);
final BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8)
)
) {
ObjectNode jsonState = (ObjectNode) QuorumStateDataJsonConverter.write(state, HIGHEST_SUPPORTED_VERSION);
jsonState.set(DATA_VERSION, new ShortNode(HIGHEST_SUPPORTED_VERSION));
writer.write(jsonState.toString());
writer.flush();
fileOutputStream.getFD().sync();
}
Utils.atomicMoveWithFallback(temp.toPath(), stateFile.toPath());
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error while writing the Quorum status from the file %s",
stateFile.getAbsolutePath()), e);
String.format(
"Error while writing the Quorum status from the file %s",
stateFile.getAbsolutePath()
),
e
);
} finally {
// cleanup the temp file when the write finishes (either success or fail).
deleteFileIfExists(temp);
Expand Down
Expand Up @@ -19,7 +19,9 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.types.TaggedFields;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.generated.QuorumStateData;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;

Expand Down Expand Up @@ -107,6 +109,20 @@ public void testCantReadVersionQuorumState() throws IOException {
assertCantReadQuorumStateVersion(jsonString);
}

@Test
public void testSupportedVersion() {
// If the next few checks fail, please check that they are compatible with previous releases of KRaft

// Check that FileBasedStateStore supports the latest version
assertEquals(FileBasedStateStore.HIGHEST_SUPPORTED_VERSION, QuorumStateData.HIGHEST_SUPPORTED_VERSION);
// Check that the supported versions haven't changed
assertEquals(0, QuorumStateData.HIGHEST_SUPPORTED_VERSION);
assertEquals(0, QuorumStateData.LOWEST_SUPPORTED_VERSION);
// For the latest version check that the number of tagged fields hasn't changed
TaggedFields taggedFields = (TaggedFields) QuorumStateData.SCHEMA_0.get(6).def.type;
assertEquals(0, taggedFields.numFields());
}

public void assertCantReadQuorumStateVersion(String jsonString) throws IOException {
final File stateFile = TestUtils.tempFile();
stateStore = new FileBasedStateStore(stateFile);
Expand Down