Skip to content

Commit

Permalink
Make EntryOutputStream#flush really flush data to HDFS
Browse files Browse the repository at this point in the history
rename variables
  • Loading branch information
cc committed Oct 8, 2015
1 parent 2e96792 commit 924e811
Showing 1 changed file with 27 additions and 16 deletions.
43 changes: 27 additions & 16 deletions servers/src/main/java/tachyon/master/journal/JournalWriter.java
Expand Up @@ -17,6 +17,7 @@

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -154,11 +155,11 @@ public void close() throws IOException {
* @return the output stream for the current log file
* @throws IOException
*/
private DataOutputStream openCurrentLog() throws IOException {
private OutputStream openCurrentLog() throws IOException {
String currentLogFile = mJournal.getCurrentLogFilePath();
DataOutputStream dos = new DataOutputStream(mUfs.create(currentLogFile));
OutputStream os = mUfs.create(currentLogFile);
LOG.info("Opened current log file: " + currentLogFile);
return dos;
return os;
}

/**
Expand Down Expand Up @@ -290,11 +291,15 @@ public synchronized void flush() throws IOException {
* handles rotating full log files, and creating the next log file.
*/
private class EntryOutputStream implements JournalOutputStream {
private DataOutputStream mOutputStream;
/** The direct output stream created by {@link UnderFileSystem} */
private OutputStream mRawOutputStream;
/** The output stream that wraps around {@link #mRawOutputStream} */
private DataOutputStream mDataOutputStream;
private boolean mIsClosed = false;

EntryOutputStream(DataOutputStream outputStream) {
mOutputStream = outputStream;
EntryOutputStream(OutputStream outputStream) {
mRawOutputStream = outputStream;
mDataOutputStream = new DataOutputStream(outputStream);
}

@Override
Expand All @@ -303,17 +308,17 @@ public synchronized void writeEntry(JournalEntry entry) throws IOException {
throw new IOException(ExceptionMessage.JOURNAL_WRITE_AFTER_CLOSE.getMessage());
}
mJournal.getJournalFormatter().serialize(
new SerializableJournalEntry(mNextEntrySequenceNumber ++, entry), mOutputStream);
new SerializableJournalEntry(mNextEntrySequenceNumber ++, entry), mDataOutputStream);
}

@Override
public synchronized void close() throws IOException {
if (mIsClosed) {
return;
}
if (mOutputStream != null) {
if (mDataOutputStream != null) {
// Close the current log file.
mOutputStream.close();
mDataOutputStream.close();
}

mIsClosed = true;
Expand All @@ -324,16 +329,22 @@ public synchronized void flush() throws IOException {
if (mIsClosed) {
return;
}
mOutputStream.flush();
if (mOutputStream instanceof FSDataOutputStream) {
((FSDataOutputStream) mOutputStream).sync();
mDataOutputStream.flush();
if (mRawOutputStream instanceof FSDataOutputStream) {
// The output stream directly created by {@link UnderFileSystem} may be
// {@link FSDataOutputStream}, which means the under filesystem is HDFS, but
// {@link DataOutputStream#flush} won't flush the data to HDFS, so we need to call
// {@link FSDataOutputStream#sync} to actually flush data to HDFS.
((FSDataOutputStream) mRawOutputStream).sync();
}
if (mOutputStream.size() > mMaxLogSize) {
LOG.info("Rotating log file. size: " + mOutputStream.size() + " maxSize: " + mMaxLogSize);
if (mDataOutputStream.size() > mMaxLogSize) {
LOG.info("Rotating log file. size: " + mDataOutputStream.size() + " maxSize: "
+ mMaxLogSize);
// rotate the current log.
mOutputStream.close();
mDataOutputStream.close();
completeCurrentLog();
mOutputStream = openCurrentLog();
mRawOutputStream = openCurrentLog();
mDataOutputStream = new DataOutputStream(mRawOutputStream);
}
}
}
Expand Down

0 comments on commit 924e811

Please sign in to comment.