diff --git a/core/client/fs/src/main/java/alluxio/client/file/FileInStream.java b/core/client/fs/src/main/java/alluxio/client/file/FileInStream.java index 506124e999d8..2e55c2bb1db8 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/FileInStream.java +++ b/core/client/fs/src/main/java/alluxio/client/file/FileInStream.java @@ -61,6 +61,9 @@ public class FileInStream extends InputStream private static final boolean PASSIVE_CACHE_ENABLED = Configuration.getBoolean(PropertyKey.USER_FILE_PASSIVE_CACHE_ENABLED); + private static final int UNINITIALIZED_BLOCK_INDEX = -1; + private static final int EOF_DATA = -1; + private static final int EOF_BLOCK_ID = -1; /** The instream options. */ private final InStreamOptions mInStreamOptions; @@ -105,7 +108,7 @@ public class FileInStream extends InputStream * current block stream yet. When the file stream has reached EOF, it is set to the length of * {@link #mBlockIds}. */ - private int mBlockIdIndex = -1; + private int mBlockIndex = UNINITIALIZED_BLOCK_INDEX; /** The read buffer in file seek. This is used in {@link #readCurrentBlockToEnd()}. */ private final byte[] mSeekBuffer; @@ -196,15 +199,15 @@ public int read(byte[] b, int off, int len) throws IOException { private int readInternal() throws IOException { if (remainingInternal() <= 0) { - return -1; + return EOF_BLOCK_ID; } updateStreams(); Preconditions.checkState(mCurrentBlockInStream != null, PreconditionMessage.ERR_UNEXPECTED_EOF); int data = mCurrentBlockInStream.read(); - if (data == -1) { + if (data == EOF_DATA) { // The underlying stream is done. - return -1; + return EOF_DATA; } mPos++; @@ -225,7 +228,7 @@ private int readInternal(byte[] b, int off, int len) throws IOException { if (len == 0) { return 0; } else if (remainingInternal() <= 0) { - return -1; + return EOF_DATA; } int currentOffset = off; @@ -258,7 +261,7 @@ private int readInternal(byte[] b, int off, int len) throws IOException { if (bytesLeftToRead == len && mCurrentBlockInStream.remaining() == 0) { // Nothing was read, and the underlying stream is done. - return -1; + return EOF_DATA; } return len - bytesLeftToRead; @@ -271,7 +274,7 @@ public int positionedRead(long pos, byte[] b, int off, int len) throws IOExcepti private int positionedReadInternal(long pos, byte[] b, int off, int len) throws IOException { if (pos < 0 || pos >= mFileLength) { - return -1; + return EOF_DATA; } // If partial read cache is enabled, we fall back to the normal read. @@ -387,7 +390,7 @@ protected long getBlockSize(long pos) { * @return true if the block stream should be updated */ protected boolean shouldUpdateStreams() { - if (mBlockIdIndex == mBlockIds.size()) { + if (mBlockIndex == mBlockIds.size()) { // EOF. return false; } @@ -437,15 +440,15 @@ private void closeOrCancelCacheStream() { } /** - * @return the current block id based on mBlockIdIndex, -1 if at the end of the file + * @return the current block id based on mBlockIndex, -1 if at the end of the file */ private long getCurrentBlockId() { if (remainingInternal() <= 0) { - return -1; + return EOF_BLOCK_ID; } - Preconditions.checkState(mBlockIdIndex < mBlockIds.size(), - PreconditionMessage.ERR_BLOCK_INDEX.toString(), mBlockIdIndex, mPos, mBlockIds.size() - 1); - return mBlockIds.get(mBlockIdIndex); + Preconditions.checkState(mBlockIndex < mBlockIds.size(), + PreconditionMessage.ERR_BLOCK_INDEX.toString(), mBlockIndex, mPos, mBlockIds.size() - 1); + return mBlockIds.get(mBlockIndex); } /** @@ -481,30 +484,34 @@ private void handleCacheStreamException(IOException e) { /** * Checks whether the current block stream has remaining bytes, if not, advance to the next block - * stream, otherwise, this is a no-op. This should only be used in streaming read APIs. - * For APIs like {@link #seek(long)}, {@link #skip(long)}, the block stream should be updated - * according to the current read position, use {@link #updateStreamsBasedOnPosition()} instead. + * stream, otherwise, this is a no-op. + * + * This should only be used in streaming read APIs. For APIs like {@link #seek(long)}, + * {@link #skip(long)}, the block stream should be updated according to the current read position, + * use {@link #updateStreamsBasedOnPosition()} instead. */ private void updateStreams() throws IOException { assureCacheStreamInSync(); if (shouldUpdateStreams()) { - mBlockIdIndex++; + mBlockIndex++; updateStreamsInternal(); } } /** * Computes the current block stream ID based on the current read position, if the stream ID is - * different from the current block stream ID, then updates the stream. + * different from the current block stream ID, or the stream ID is the same as the current + * block stream ID but the current stream has no remaining bytes, then updates the stream. + * * Comparing to {@link #updateStreams()}, this method has overhead for computing the * block stream ID, only use this method for non-streaming read APIs like {@link #seek(long)} * and {@link #skip(long)}. */ private void updateStreamsBasedOnPosition() throws IOException { assureCacheStreamInSync(); - int blockIdIndex = (int) (mPos / mBlockSize); - if (blockIdIndex != mBlockIdIndex || shouldUpdateStreams()) { - mBlockIdIndex = blockIdIndex; + int blockIndex = (int) (mPos / mBlockSize); + if (blockIndex != mBlockIndex || shouldUpdateStreams()) { + mBlockIndex = blockIndex; updateStreamsInternal(); } } @@ -589,8 +596,7 @@ private void updateBlockInStream(long blockId) throws IOException { mCurrentBlockInStream = null; } - // blockId = -1 if mPos = EOF. - if (blockId < 0) { + if (blockId == EOF_BLOCK_ID) { return; } mCurrentBlockInStream = getBlockInStream(blockId);