From be4e9cb1f41cb5299832ada1f7f3f93542da2d89 Mon Sep 17 00:00:00 2001 From: Calvin Jia Date: Wed, 14 Aug 2019 11:38:19 -0700 Subject: [PATCH] Fix bugs related to the journal recovering from a UFS failure Fixes #9722 Fixes several edge cases in journal recovery. 1. If we fail to flush to the journal, do not try to recover, instead flag the stream as needsRecovery and handle the recovery on next operation 2. When recovering, make sure there is a previous journal entry 3. When recovering, create a new log file starting with the latest committed journal entry (instead of our current one) 4. When recovering, fail if the latest journal entry recovered from our buffer is not equal to our expected next sequence number - 1 5. When recovering, do not consider incomplete logs when inferring the last persisted sequence number from the file name (it will be INTEGER_MIN) pr-link: Alluxio/alluxio#9723 change-id: cid-63623a4e96d91f9619605dd9cad1b7da7679ac9c --- .../journal/ufs/UfsJournalLogWriter.java | 60 +++++++----------- .../journal/ufs/UfsJournalLogWriterTest.java | 61 +++++++++++++++---- 2 files changed, 72 insertions(+), 49 deletions(-) diff --git a/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournalLogWriter.java b/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournalLogWriter.java index 487cd6002782..a6fb27f81407 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournalLogWriter.java +++ b/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournalLogWriter.java @@ -159,6 +159,9 @@ private void maybeRecoverFromUfsFailures() throws IOException, JournalClosedExce } long lastPersistSeq = recoverLastPersistedJournalEntry(); + if (lastPersistSeq == -1) { + throw new RuntimeException("Cannot find any journal entry to recover from."); + } createNewLogFile(lastPersistSeq + 1); if (!mEntriesToFlush.isEmpty()) { @@ -186,6 +189,10 @@ private void maybeRecoverFromUfsFailures() throws IOException, JournalClosedExce } LOG.info("Finished writing unwritten journal entries from {} to {}.", lastPersistSeq + 1, retryEndSeq); + if (retryEndSeq != mNextSequenceNumber - 1) { + throw new RuntimeException("Failed to recover all entries to flush, expecting " + + (mNextSequenceNumber - 1) + " but only found entry " + retryEndSeq); + } } mNeedsRecovery = false; } @@ -224,7 +231,9 @@ private long recoverLastPersistedJournalEntry() throws IOException { } catch (IOException e) { throw e; } - completeLog(currentLog, lastPersistSeq + 1); + if (lastPersistSeq != -1) { // If the current log is an empty file, do not complete with SN: 0 + completeLog(currentLog, lastPersistSeq + 1); + } } // Search for and scan the latest COMPLETE journal and find out the sequence number of the // last persisted journal entry, in case no entry has been found in the INCOMPLETE journal. @@ -235,10 +244,15 @@ private long recoverLastPersistedJournalEntry() throws IOException { // journalFiles[journalFiles.size()-1] is the latest complete journal file. List journalFiles = snapshot.getLogs(); if (!journalFiles.isEmpty()) { - UfsJournalFile journal = journalFiles.get(journalFiles.size() - 1); - lastPersistSeq = journal.getEnd() - 1; - LOG.info("Found last persisted journal entry with seq {} in {}.", - lastPersistSeq, journal.getLocation().toString()); + for (int i = journalFiles.size() - 1; i >= 0; i--) { + UfsJournalFile journal = journalFiles.get(i); + if (!journal.isIncompleteLog()) { // Do not consider incomplete logs (handled above) + lastPersistSeq = journal.getEnd() - 1; + LOG.info("Found last persisted journal entry with seq {} in {}.", + lastPersistSeq, journal.getLocation().toString()); + break; + } + } } } return lastPersistSeq; @@ -328,20 +342,10 @@ public synchronized void flush() throws IOException, JournalClosedException { mEntriesToFlush.clear(); } catch (IOJournalClosedException e) { throw e.toJournalClosedException(); - } catch (IOException e) { + } catch (IOException e) { // On next operation, attempt to recover from a UFS failure + mNeedsRecovery = true; UfsJournalFile currentLog = mJournalOutputStream.currentLog(); - // Try to close and complete the current file. - try { - closeAndCompleteCurrentStream(); - } catch (IOException ioExc) { - // Journal file left in uncompleted state after flush failure. - LOG.error("Journal flush mitigation failure. Flush failure:{}. Mitigation failure:{}", e, - ioExc); - System.exit(-1); - } finally { - mRotateLogForNextWrite = true; - mJournalOutputStream = null; - } + mJournalOutputStream = null; throw new IOException(ExceptionMessage.JOURNAL_FLUSH_FAILURE .getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, currentLog, e.getMessage()), e); @@ -359,26 +363,6 @@ public synchronized void flush() throws IOException, JournalClosedException { } } - /** - * Used to close the current stream during failure. If close fails, it tries to complete the - * underlying log file regardless. - */ - private void closeAndCompleteCurrentStream() throws IOException { - // Try to close and complete the current file. - try { - mJournalOutputStream.close(); - } catch (IOException ioExc) { - LOG.warn("Failed to close current journal output stream at: {}. Error: {}", - mJournalOutputStream.currentLog().getLocation(), ioExc); - // Couldn't close the output stream. - // Try to complete the file. - completeLog(mJournalOutputStream.currentLog(), mNextSequenceNumber); - } finally { - mRotateLogForNextWrite = true; - mJournalOutputStream = null; - } - } - @Override public synchronized void close() throws IOException { Closer closer = Closer.create(); diff --git a/core/server/master/src/test/java/alluxio/master/journal/ufs/UfsJournalLogWriterTest.java b/core/server/master/src/test/java/alluxio/master/journal/ufs/UfsJournalLogWriterTest.java index 36d2ab5bb3b0..d422c03c720c 100644 --- a/core/server/master/src/test/java/alluxio/master/journal/ufs/UfsJournalLogWriterTest.java +++ b/core/server/master/src/test/java/alluxio/master/journal/ufs/UfsJournalLogWriterTest.java @@ -289,15 +289,18 @@ public void flushFailureCompletesFile() throws Exception { nextSN = writeJournalEntries(writer, nextSN, 1); DataOutputStream badOut = createMockDataOutputStream(writer); Mockito.doThrow(new IOException(INJECTED_IO_ERROR_MESSAGE)).when(badOut).flush(); + nextSN = writeJournalEntries(writer, nextSN, 1); tryFlushAndExpectToFail(writer); - // Verify that current file is completed with current SN. - UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(mJournal); - Assert.assertNull(snapshot.getCurrentLog(mJournal)); - Assert.assertEquals(2, snapshot.getLogs().size()); - Assert.assertEquals(nextSN, snapshot.getLogs().get(1).getEnd()); - + // Retry the flush, expect it to rotate the log and start a new file + writer.flush(); + // Complete the last log writer.close(); + + UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(mJournal); + Assert.assertEquals(3, snapshot.getLogs().size()); + Assert.assertEquals(nextSN - 1, snapshot.getLogs().get(2).getStart()); + Assert.assertEquals(nextSN, snapshot.getLogs().get(2).getEnd()); } /** @@ -349,22 +352,58 @@ public void missingJournalEntries() throws Exception { } @Test - public void missingIncompleteJournalFile() throws Exception { + public void recoverWithNoJournalFiles() throws Exception { long startSN = 0x10; UfsJournalLogWriter writer = new UfsJournalLogWriter(mJournal, startSN); + // Put stream into a bad state long nextSN = writeJournalEntries(writer, startSN, 5); DataOutputStream badOut = createMockDataOutputStream(writer); - Mockito.doThrow(new IOException(INJECTED_IO_ERROR_MESSAGE)).when(badOut) - .write(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt()); - tryWriteAndExpectToFail(writer, nextSN); + Mockito.doThrow(new IOException(INJECTED_IO_ERROR_MESSAGE)).when(badOut).flush(); + tryFlushAndExpectToFail(writer); + + // Delete the file + UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(mJournal); + UfsJournalFile journalFile = snapshot.getLogs().get(0); + File file = new File(journalFile.getLocation().toString()); + file.delete(); + + // Recover should fail since we deleted the file + mThrown.expect(RuntimeException.class); + mThrown.expectMessage("Cannot find any journal entry to recover from."); + writer.write(newEntry(nextSN)); + writer.close(); + } + + @Test + public void recoverMissingJournalFiles() throws Exception { + long startSN = 0x10; + UfsJournalLogWriter writer = new UfsJournalLogWriter(mJournal, startSN); + // Create a file + long nextSN = writeJournalEntries(writer, startSN, 5); + writer.close(); + + // Flush some entries to the next file + writer = new UfsJournalLogWriter(mJournal, nextSN); + nextSN = writeJournalEntries(writer, nextSN, 5); + writer.flush(); + + // Put the stream into a bad state + nextSN = writeJournalEntries(writer, nextSN, 5); + DataOutputStream badOut = createMockDataOutputStream(writer); + Mockito.doThrow(new IOException(INJECTED_IO_ERROR_MESSAGE)).when(badOut).flush(); + tryFlushAndExpectToFail(writer); + + // Delete the current log UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(mJournal); UfsJournalFile journalFile = snapshot.getCurrentLog(mJournal); File file = new File(journalFile.getLocation().toString()); file.delete(); + + // Recover should fail since we deleted the current log mThrown.expect(RuntimeException.class); mThrown.expectMessage( ExceptionMessage.JOURNAL_ENTRY_MISSING.getMessageWithUrl( - RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, 0, 0x10)); + RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, 0x15, 0x1A)); writer.write(newEntry(nextSN)); writer.close(); }