Skip to content

Commit

Permalink
Fix bugs related to the journal recovering from a UFS failure
Browse files Browse the repository at this point in the history
Fixes #9722

Fixes several edge cases in journal recovery.

1. If we fail to flush to the journal, do not try to recover, instead
flag the stream as needsRecovery and handle the recovery on next
operation
2. When recovering, make sure there is a previous journal entry
3. When recovering, create a new log file starting with the latest
committed journal entry (instead of our current one)
4. When recovering, fail if the latest journal entry recovered from our
buffer is not equal to our expected next sequence number - 1
5. When recovering, do not consider incomplete logs when inferring the
last persisted sequence number from the file name (it will be
INTEGER_MIN)

pr-link: #9723
change-id: cid-63623a4e96d91f9619605dd9cad1b7da7679ac9c
  • Loading branch information
calvinjia authored and alluxio-bot committed Aug 14, 2019
1 parent 38cc5b5 commit be4e9cb
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 49 deletions.
Expand Up @@ -159,6 +159,9 @@ private void maybeRecoverFromUfsFailures() throws IOException, JournalClosedExce
}

long lastPersistSeq = recoverLastPersistedJournalEntry();
if (lastPersistSeq == -1) {
throw new RuntimeException("Cannot find any journal entry to recover from.");
}

createNewLogFile(lastPersistSeq + 1);
if (!mEntriesToFlush.isEmpty()) {
Expand Down Expand Up @@ -186,6 +189,10 @@ private void maybeRecoverFromUfsFailures() throws IOException, JournalClosedExce
}
LOG.info("Finished writing unwritten journal entries from {} to {}.",
lastPersistSeq + 1, retryEndSeq);
if (retryEndSeq != mNextSequenceNumber - 1) {
throw new RuntimeException("Failed to recover all entries to flush, expecting "
+ (mNextSequenceNumber - 1) + " but only found entry " + retryEndSeq);
}
}
mNeedsRecovery = false;
}
Expand Down Expand Up @@ -224,7 +231,9 @@ private long recoverLastPersistedJournalEntry() throws IOException {
} catch (IOException e) {
throw e;
}
completeLog(currentLog, lastPersistSeq + 1);
if (lastPersistSeq != -1) { // If the current log is an empty file, do not complete with SN: 0
completeLog(currentLog, lastPersistSeq + 1);
}
}
// Search for and scan the latest COMPLETE journal and find out the sequence number of the
// last persisted journal entry, in case no entry has been found in the INCOMPLETE journal.
Expand All @@ -235,10 +244,15 @@ private long recoverLastPersistedJournalEntry() throws IOException {
// journalFiles[journalFiles.size()-1] is the latest complete journal file.
List<UfsJournalFile> journalFiles = snapshot.getLogs();
if (!journalFiles.isEmpty()) {
UfsJournalFile journal = journalFiles.get(journalFiles.size() - 1);
lastPersistSeq = journal.getEnd() - 1;
LOG.info("Found last persisted journal entry with seq {} in {}.",
lastPersistSeq, journal.getLocation().toString());
for (int i = journalFiles.size() - 1; i >= 0; i--) {
UfsJournalFile journal = journalFiles.get(i);
if (!journal.isIncompleteLog()) { // Do not consider incomplete logs (handled above)
lastPersistSeq = journal.getEnd() - 1;
LOG.info("Found last persisted journal entry with seq {} in {}.",
lastPersistSeq, journal.getLocation().toString());
break;
}
}
}
}
return lastPersistSeq;
Expand Down Expand Up @@ -328,20 +342,10 @@ public synchronized void flush() throws IOException, JournalClosedException {
mEntriesToFlush.clear();
} catch (IOJournalClosedException e) {
throw e.toJournalClosedException();
} catch (IOException e) {
} catch (IOException e) { // On next operation, attempt to recover from a UFS failure
mNeedsRecovery = true;
UfsJournalFile currentLog = mJournalOutputStream.currentLog();
// Try to close and complete the current file.
try {
closeAndCompleteCurrentStream();
} catch (IOException ioExc) {
// Journal file left in uncompleted state after flush failure.
LOG.error("Journal flush mitigation failure. Flush failure:{}. Mitigation failure:{}", e,
ioExc);
System.exit(-1);
} finally {
mRotateLogForNextWrite = true;
mJournalOutputStream = null;
}
mJournalOutputStream = null;
throw new IOException(ExceptionMessage.JOURNAL_FLUSH_FAILURE
.getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL,
currentLog, e.getMessage()), e);
Expand All @@ -359,26 +363,6 @@ public synchronized void flush() throws IOException, JournalClosedException {
}
}

/**
* Used to close the current stream during failure. If close fails, it tries to complete the
* underlying log file regardless.
*/
private void closeAndCompleteCurrentStream() throws IOException {
// Try to close and complete the current file.
try {
mJournalOutputStream.close();
} catch (IOException ioExc) {
LOG.warn("Failed to close current journal output stream at: {}. Error: {}",
mJournalOutputStream.currentLog().getLocation(), ioExc);
// Couldn't close the output stream.
// Try to complete the file.
completeLog(mJournalOutputStream.currentLog(), mNextSequenceNumber);
} finally {
mRotateLogForNextWrite = true;
mJournalOutputStream = null;
}
}

@Override
public synchronized void close() throws IOException {
Closer closer = Closer.create();
Expand Down
Expand Up @@ -289,15 +289,18 @@ public void flushFailureCompletesFile() throws Exception {
nextSN = writeJournalEntries(writer, nextSN, 1);
DataOutputStream badOut = createMockDataOutputStream(writer);
Mockito.doThrow(new IOException(INJECTED_IO_ERROR_MESSAGE)).when(badOut).flush();
nextSN = writeJournalEntries(writer, nextSN, 1);
tryFlushAndExpectToFail(writer);

// Verify that current file is completed with current SN.
UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(mJournal);
Assert.assertNull(snapshot.getCurrentLog(mJournal));
Assert.assertEquals(2, snapshot.getLogs().size());
Assert.assertEquals(nextSN, snapshot.getLogs().get(1).getEnd());

// Retry the flush, expect it to rotate the log and start a new file
writer.flush();
// Complete the last log
writer.close();

UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(mJournal);
Assert.assertEquals(3, snapshot.getLogs().size());
Assert.assertEquals(nextSN - 1, snapshot.getLogs().get(2).getStart());
Assert.assertEquals(nextSN, snapshot.getLogs().get(2).getEnd());
}

/**
Expand Down Expand Up @@ -349,22 +352,58 @@ public void missingJournalEntries() throws Exception {
}

@Test
public void missingIncompleteJournalFile() throws Exception {
public void recoverWithNoJournalFiles() throws Exception {
long startSN = 0x10;
UfsJournalLogWriter writer = new UfsJournalLogWriter(mJournal, startSN);
// Put stream into a bad state
long nextSN = writeJournalEntries(writer, startSN, 5);
DataOutputStream badOut = createMockDataOutputStream(writer);
Mockito.doThrow(new IOException(INJECTED_IO_ERROR_MESSAGE)).when(badOut)
.write(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt());
tryWriteAndExpectToFail(writer, nextSN);
Mockito.doThrow(new IOException(INJECTED_IO_ERROR_MESSAGE)).when(badOut).flush();
tryFlushAndExpectToFail(writer);

// Delete the file
UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(mJournal);
UfsJournalFile journalFile = snapshot.getLogs().get(0);
File file = new File(journalFile.getLocation().toString());
file.delete();

// Recover should fail since we deleted the file
mThrown.expect(RuntimeException.class);
mThrown.expectMessage("Cannot find any journal entry to recover from.");
writer.write(newEntry(nextSN));
writer.close();
}

@Test
public void recoverMissingJournalFiles() throws Exception {
long startSN = 0x10;
UfsJournalLogWriter writer = new UfsJournalLogWriter(mJournal, startSN);
// Create a file
long nextSN = writeJournalEntries(writer, startSN, 5);
writer.close();

// Flush some entries to the next file
writer = new UfsJournalLogWriter(mJournal, nextSN);
nextSN = writeJournalEntries(writer, nextSN, 5);
writer.flush();

// Put the stream into a bad state
nextSN = writeJournalEntries(writer, nextSN, 5);
DataOutputStream badOut = createMockDataOutputStream(writer);
Mockito.doThrow(new IOException(INJECTED_IO_ERROR_MESSAGE)).when(badOut).flush();
tryFlushAndExpectToFail(writer);

// Delete the current log
UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(mJournal);
UfsJournalFile journalFile = snapshot.getCurrentLog(mJournal);
File file = new File(journalFile.getLocation().toString());
file.delete();

// Recover should fail since we deleted the current log
mThrown.expect(RuntimeException.class);
mThrown.expectMessage(
ExceptionMessage.JOURNAL_ENTRY_MISSING.getMessageWithUrl(
RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, 0, 0x10));
RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, 0x15, 0x1A));
writer.write(newEntry(nextSN));
writer.close();
}
Expand Down

0 comments on commit be4e9cb

Please sign in to comment.