diff --git a/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournalLogWriter.java b/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournalLogWriter.java index 487cd6002782..a6fb27f81407 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournalLogWriter.java +++ b/core/server/common/src/main/java/alluxio/master/journal/ufs/UfsJournalLogWriter.java @@ -159,6 +159,9 @@ private void maybeRecoverFromUfsFailures() throws IOException, JournalClosedExce } long lastPersistSeq = recoverLastPersistedJournalEntry(); + if (lastPersistSeq == -1) { + throw new RuntimeException("Cannot find any journal entry to recover from."); + } createNewLogFile(lastPersistSeq + 1); if (!mEntriesToFlush.isEmpty()) { @@ -186,6 +189,10 @@ private void maybeRecoverFromUfsFailures() throws IOException, JournalClosedExce } LOG.info("Finished writing unwritten journal entries from {} to {}.", lastPersistSeq + 1, retryEndSeq); + if (retryEndSeq != mNextSequenceNumber - 1) { + throw new RuntimeException("Failed to recover all entries to flush, expecting " + + (mNextSequenceNumber - 1) + " but only found entry " + retryEndSeq); + } } mNeedsRecovery = false; } @@ -224,7 +231,9 @@ private long recoverLastPersistedJournalEntry() throws IOException { } catch (IOException e) { throw e; } - completeLog(currentLog, lastPersistSeq + 1); + if (lastPersistSeq != -1) { // If the current log is an empty file, do not complete with SN: 0 + completeLog(currentLog, lastPersistSeq + 1); + } } // Search for and scan the latest COMPLETE journal and find out the sequence number of the // last persisted journal entry, in case no entry has been found in the INCOMPLETE journal. @@ -235,10 +244,15 @@ private long recoverLastPersistedJournalEntry() throws IOException { // journalFiles[journalFiles.size()-1] is the latest complete journal file. List journalFiles = snapshot.getLogs(); if (!journalFiles.isEmpty()) { - UfsJournalFile journal = journalFiles.get(journalFiles.size() - 1); - lastPersistSeq = journal.getEnd() - 1; - LOG.info("Found last persisted journal entry with seq {} in {}.", - lastPersistSeq, journal.getLocation().toString()); + for (int i = journalFiles.size() - 1; i >= 0; i--) { + UfsJournalFile journal = journalFiles.get(i); + if (!journal.isIncompleteLog()) { // Do not consider incomplete logs (handled above) + lastPersistSeq = journal.getEnd() - 1; + LOG.info("Found last persisted journal entry with seq {} in {}.", + lastPersistSeq, journal.getLocation().toString()); + break; + } + } } } return lastPersistSeq; @@ -328,20 +342,10 @@ public synchronized void flush() throws IOException, JournalClosedException { mEntriesToFlush.clear(); } catch (IOJournalClosedException e) { throw e.toJournalClosedException(); - } catch (IOException e) { + } catch (IOException e) { // On next operation, attempt to recover from a UFS failure + mNeedsRecovery = true; UfsJournalFile currentLog = mJournalOutputStream.currentLog(); - // Try to close and complete the current file. - try { - closeAndCompleteCurrentStream(); - } catch (IOException ioExc) { - // Journal file left in uncompleted state after flush failure. - LOG.error("Journal flush mitigation failure. Flush failure:{}. Mitigation failure:{}", e, - ioExc); - System.exit(-1); - } finally { - mRotateLogForNextWrite = true; - mJournalOutputStream = null; - } + mJournalOutputStream = null; throw new IOException(ExceptionMessage.JOURNAL_FLUSH_FAILURE .getMessageWithUrl(RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, currentLog, e.getMessage()), e); @@ -359,26 +363,6 @@ public synchronized void flush() throws IOException, JournalClosedException { } } - /** - * Used to close the current stream during failure. If close fails, it tries to complete the - * underlying log file regardless. - */ - private void closeAndCompleteCurrentStream() throws IOException { - // Try to close and complete the current file. - try { - mJournalOutputStream.close(); - } catch (IOException ioExc) { - LOG.warn("Failed to close current journal output stream at: {}. Error: {}", - mJournalOutputStream.currentLog().getLocation(), ioExc); - // Couldn't close the output stream. - // Try to complete the file. - completeLog(mJournalOutputStream.currentLog(), mNextSequenceNumber); - } finally { - mRotateLogForNextWrite = true; - mJournalOutputStream = null; - } - } - @Override public synchronized void close() throws IOException { Closer closer = Closer.create(); diff --git a/core/server/master/src/test/java/alluxio/master/journal/ufs/UfsJournalLogWriterTest.java b/core/server/master/src/test/java/alluxio/master/journal/ufs/UfsJournalLogWriterTest.java index 36d2ab5bb3b0..d422c03c720c 100644 --- a/core/server/master/src/test/java/alluxio/master/journal/ufs/UfsJournalLogWriterTest.java +++ b/core/server/master/src/test/java/alluxio/master/journal/ufs/UfsJournalLogWriterTest.java @@ -289,15 +289,18 @@ public void flushFailureCompletesFile() throws Exception { nextSN = writeJournalEntries(writer, nextSN, 1); DataOutputStream badOut = createMockDataOutputStream(writer); Mockito.doThrow(new IOException(INJECTED_IO_ERROR_MESSAGE)).when(badOut).flush(); + nextSN = writeJournalEntries(writer, nextSN, 1); tryFlushAndExpectToFail(writer); - // Verify that current file is completed with current SN. - UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(mJournal); - Assert.assertNull(snapshot.getCurrentLog(mJournal)); - Assert.assertEquals(2, snapshot.getLogs().size()); - Assert.assertEquals(nextSN, snapshot.getLogs().get(1).getEnd()); - + // Retry the flush, expect it to rotate the log and start a new file + writer.flush(); + // Complete the last log writer.close(); + + UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(mJournal); + Assert.assertEquals(3, snapshot.getLogs().size()); + Assert.assertEquals(nextSN - 1, snapshot.getLogs().get(2).getStart()); + Assert.assertEquals(nextSN, snapshot.getLogs().get(2).getEnd()); } /** @@ -349,22 +352,58 @@ public void missingJournalEntries() throws Exception { } @Test - public void missingIncompleteJournalFile() throws Exception { + public void recoverWithNoJournalFiles() throws Exception { long startSN = 0x10; UfsJournalLogWriter writer = new UfsJournalLogWriter(mJournal, startSN); + // Put stream into a bad state long nextSN = writeJournalEntries(writer, startSN, 5); DataOutputStream badOut = createMockDataOutputStream(writer); - Mockito.doThrow(new IOException(INJECTED_IO_ERROR_MESSAGE)).when(badOut) - .write(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt()); - tryWriteAndExpectToFail(writer, nextSN); + Mockito.doThrow(new IOException(INJECTED_IO_ERROR_MESSAGE)).when(badOut).flush(); + tryFlushAndExpectToFail(writer); + + // Delete the file + UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(mJournal); + UfsJournalFile journalFile = snapshot.getLogs().get(0); + File file = new File(journalFile.getLocation().toString()); + file.delete(); + + // Recover should fail since we deleted the file + mThrown.expect(RuntimeException.class); + mThrown.expectMessage("Cannot find any journal entry to recover from."); + writer.write(newEntry(nextSN)); + writer.close(); + } + + @Test + public void recoverMissingJournalFiles() throws Exception { + long startSN = 0x10; + UfsJournalLogWriter writer = new UfsJournalLogWriter(mJournal, startSN); + // Create a file + long nextSN = writeJournalEntries(writer, startSN, 5); + writer.close(); + + // Flush some entries to the next file + writer = new UfsJournalLogWriter(mJournal, nextSN); + nextSN = writeJournalEntries(writer, nextSN, 5); + writer.flush(); + + // Put the stream into a bad state + nextSN = writeJournalEntries(writer, nextSN, 5); + DataOutputStream badOut = createMockDataOutputStream(writer); + Mockito.doThrow(new IOException(INJECTED_IO_ERROR_MESSAGE)).when(badOut).flush(); + tryFlushAndExpectToFail(writer); + + // Delete the current log UfsJournalSnapshot snapshot = UfsJournalSnapshot.getSnapshot(mJournal); UfsJournalFile journalFile = snapshot.getCurrentLog(mJournal); File file = new File(journalFile.getLocation().toString()); file.delete(); + + // Recover should fail since we deleted the current log mThrown.expect(RuntimeException.class); mThrown.expectMessage( ExceptionMessage.JOURNAL_ENTRY_MISSING.getMessageWithUrl( - RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, 0, 0x10)); + RuntimeConstants.ALLUXIO_DEBUG_DOCS_URL, 0x15, 0x1A)); writer.write(newEntry(nextSN)); writer.close(); }