Skip to content

Commit

Permalink
[NO ISSUE][STO] Tolerate Corrupted System Checkpoint Files
Browse files Browse the repository at this point in the history
- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Do not delete corrupted checkpoint files until a valid
  checkpoint is persisted. This ensure a forged checkpoint
  that will force recovery to start from the begining is
  always used until a valid checkpoint is found.
- Attempt to read the latest checkpoint file right after
  writing it and before attempting cleaning up invalid and
  old checkpoint files.
- Use on disk files to determine next checkpoint id to
  account for existing corrupted checkpoints.
- Maintain two older checkpoint files in addition to the
  latest one.
- Catch all exceptions on checkpointing failures since it is
  a try operation.
- Add test scenairo for the new checkpoints clean up behavior.

Change-Id: Iea689f5a644351491d9748273bb2158e8179f54d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3496
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
  • Loading branch information
mhubail committed Jul 19, 2019
1 parent 0561d10 commit 272a180
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 24 deletions.
Expand Up @@ -306,8 +306,8 @@ public void testCorruptedCheckpointFiles() {
Checkpoint cpAfterCorruption = checkpointManager.getLatest();
// Make sure the valid checkpoint was returned
Assert.assertEquals(validCheckpoint.getId(), cpAfterCorruption.getId());
// Make sure the corrupted checkpoint file was deleted
Assert.assertFalse(corruptedCheckpoint.exists());
// Make sure the corrupted checkpoint file was not deleted
Assert.assertTrue(corruptedCheckpoint.exists());
// Corrupt the valid checkpoint by replacing its content
final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId());
File validCheckpointFile = validCheckpointPath.toFile();
Expand All @@ -321,6 +321,13 @@ public void testCorruptedCheckpointFiles() {
// Make sure the forged checkpoint recovery will start from the first available log
final long readableSmallestLSN = txnSubsystem.getLogManager().getReadableSmallestLSN();
Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() <= readableSmallestLSN);
// another call should still give us the forged checkpoint and the corrupted one should still be there
forgedCheckpoint = checkpointManager.getLatest();
Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() < minFirstLSN);
Assert.assertTrue(corruptedCheckpoint.exists());
// do a succesful checkpoint and ensure now the corrupted file was deleted
checkpointManager.doSharpCheckpoint();
Assert.assertFalse(corruptedCheckpoint.exists());
} finally {
nc.deInit();
}
Expand Down
Expand Up @@ -54,4 +54,5 @@ messaging.frame.count=512
txn.log.partitionsize=2MB
txn.log.buffer.pagesize=128KB
txn.log.checkpoint.pollfrequency=2147483647
txn.log.checkpoint.history=0
storage.max.active.writable.datasets=50
Expand Up @@ -42,7 +42,7 @@
"txn\.lock\.timeout\.waitthreshold" : 60000,
"txn\.log\.buffer\.numpages" : 8,
"txn\.log\.buffer\.pagesize" : 4194304,
"txn\.log\.checkpoint\.history" : 0,
"txn\.log\.checkpoint\.history" : 2,
"txn\.log\.checkpoint\.lsnthreshold" : 67108864,
"txn\.log\.checkpoint\.pollfrequency" : 120,
"txn\.log\.partitionsize" : 268435456
Expand Down
Expand Up @@ -42,7 +42,7 @@
"txn\.lock\.timeout\.waitthreshold" : 60000,
"txn\.log\.buffer\.numpages" : 8,
"txn\.log\.buffer\.pagesize" : 4194304,
"txn\.log\.checkpoint\.history" : 0,
"txn\.log\.checkpoint\.history" : 2,
"txn\.log\.checkpoint\.lsnthreshold" : 67108864,
"txn\.log\.checkpoint\.pollfrequency" : 120,
"txn\.log\.partitionsize" : 268435456
Expand Down
Expand Up @@ -42,7 +42,7 @@
"txn\.lock\.timeout\.waitthreshold" : 60000,
"txn\.log\.buffer\.numpages" : 8,
"txn\.log\.buffer\.pagesize" : 4194304,
"txn\.log\.checkpoint\.history" : 0,
"txn\.log\.checkpoint\.history" : 2,
"txn\.log\.checkpoint\.lsnthreshold" : 67108864,
"txn\.log\.checkpoint\.pollfrequency" : 120,
"txn\.log\.partitionsize" : 268435456
Expand Down
Expand Up @@ -52,7 +52,7 @@ public enum Option implements IOption {
INTEGER,
120,
"The frequency (in seconds) the checkpoint thread should check to see if a checkpoint should be written"),
TXN_LOG_CHECKPOINT_HISTORY(INTEGER, 0, "The number of checkpoints to keep in the transaction log"),
TXN_LOG_CHECKPOINT_HISTORY(INTEGER, 2, "The number of checkpoints to keep in the transaction log"),
TXN_LOCK_ESCALATIONTHRESHOLD(
INTEGER,
1000,
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -100,7 +101,7 @@ public Checkpoint getLatest() {
if (checkpointFiles.isEmpty()) {
return null;
}
final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
final List<Checkpoint> orderedCheckpoints = getOrderedValidCheckpoints(checkpointFiles, false);
if (orderedCheckpoints.isEmpty()) {
/*
* If all checkpoint files are corrupted, we have no option but to try to perform recovery.
Expand Down Expand Up @@ -136,8 +137,7 @@ public void dumpState(OutputStream os) throws IOException {
}

public Path getCheckpointPath(long checkpointId) {
return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
+ Long.toString(checkpointId));
return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX + checkpointId);
}

protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
Expand Down Expand Up @@ -173,7 +173,8 @@ private void persist(Checkpoint checkpoint) throws HyracksDataException {
// Write checkpoint file to disk
try {
byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry));
Files.write(path, bytes);
Files.write(path, bytes, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
readCheckpoint(path);
} catch (IOException e) {
LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e);
throw HyracksDataException.create(e);
Expand All @@ -200,16 +201,14 @@ private List<File> getCheckpointFiles() {
return Arrays.asList(checkpoints);
}

private List<Checkpoint> getOrderedCheckpoints(List<File> checkpoints) {
private List<Checkpoint> getOrderedValidCheckpoints(List<File> checkpoints, boolean deleteCorrupted) {
List<Checkpoint> checkpointObjectList = new ArrayList<>();
for (File file : checkpoints) {
try {
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN, "Reading checkpoint file: " + file.getAbsolutePath());
}
final JsonNode jsonNode =
OBJECT_MAPPER.readValue(Files.readAllBytes(Paths.get(file.getAbsolutePath())), JsonNode.class);
Checkpoint cp = (Checkpoint) persistedResourceRegistry.deserialize(jsonNode);
Checkpoint cp = readCheckpoint(Paths.get(file.getAbsolutePath()));
checkpointObjectList.add(cp);
} catch (ClosedByInterruptException e) {
Thread.currentThread().interrupt();
Expand All @@ -222,9 +221,8 @@ private List<Checkpoint> getOrderedCheckpoints(List<File> checkpoints) {
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN, "Failed to read checkpoint file: " + file.getAbsolutePath(), e);
}
file.delete();
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN, "Deleted corrupted checkpoint file: " + file.getAbsolutePath());
if (deleteCorrupted && file.delete()) {
LOGGER.warn("Deleted corrupted checkpoint file: {}", file::getAbsolutePath);
}
}
}
Expand All @@ -234,7 +232,7 @@ private List<Checkpoint> getOrderedCheckpoints(List<File> checkpoints) {

private void cleanup() {
final List<File> checkpointFiles = getCheckpointFiles();
final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
final List<Checkpoint> orderedCheckpoints = getOrderedValidCheckpoints(checkpointFiles, true);
final int deleteCount = orderedCheckpoints.size() - historyToKeep;
for (int i = 0; i < deleteCount; i++) {
final Checkpoint checkpoint = orderedCheckpoints.get(i);
Expand All @@ -247,11 +245,20 @@ private void cleanup() {
}

private long getNextCheckpointId() {
final Checkpoint latest = getLatest();
if (latest == null) {
final List<File> checkpointFiles = getCheckpointFiles();
if (checkpointFiles.isEmpty()) {
return FIRST_CHECKPOINT_ID;
}
return latest.getId() + 1;
long maxOnDiskId = -1;
for (File checkpointFile : checkpointFiles) {
long fileId = Long.parseLong(checkpointFile.getName().substring(CHECKPOINT_FILENAME_PREFIX.length()));
maxOnDiskId = Math.max(maxOnDiskId, fileId);
}
return maxOnDiskId + 1;
}

}
private Checkpoint readCheckpoint(Path checkpointPath) throws IOException {
final JsonNode jsonNode = OBJECT_MAPPER.readValue(Files.readAllBytes(checkpointPath), JsonNode.class);
return (Checkpoint) persistedResourceRegistry.deserialize(jsonNode);
}
}
Expand Up @@ -94,8 +94,8 @@ public void run() {
if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN) {
lastCheckpointLSN = currentCheckpointAttemptMinLSN;
}
} catch (HyracksDataException e) {
LOGGER.log(Level.ERROR, "Error during checkpoint", e);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "checkpoint attempt failed", e);
}
}
}
Expand Down

0 comments on commit 272a180

Please sign in to comment.