Skip to content

Commit

Permalink
Give constant -1 different names.
Browse files Browse the repository at this point in the history
  • Loading branch information
Cheng Chang committed Oct 29, 2017
1 parent 2e00df7 commit 54a4b0c
Showing 1 changed file with 29 additions and 23 deletions.
52 changes: 29 additions & 23 deletions core/client/fs/src/main/java/alluxio/client/file/FileInStream.java
Expand Up @@ -61,6 +61,9 @@ public class FileInStream extends InputStream


private static final boolean PASSIVE_CACHE_ENABLED = private static final boolean PASSIVE_CACHE_ENABLED =
Configuration.getBoolean(PropertyKey.USER_FILE_PASSIVE_CACHE_ENABLED); Configuration.getBoolean(PropertyKey.USER_FILE_PASSIVE_CACHE_ENABLED);
private static final int UNINITIALIZED_BLOCK_INDEX = -1;
private static final int EOF_DATA = -1;
private static final int EOF_BLOCK_ID = -1;


/** The instream options. */ /** The instream options. */
private final InStreamOptions mInStreamOptions; private final InStreamOptions mInStreamOptions;
Expand Down Expand Up @@ -105,7 +108,7 @@ public class FileInStream extends InputStream
* current block stream yet. When the file stream has reached EOF, it is set to the length of * current block stream yet. When the file stream has reached EOF, it is set to the length of
* {@link #mBlockIds}. * {@link #mBlockIds}.
*/ */
private int mBlockIdIndex = -1; private int mBlockIndex = UNINITIALIZED_BLOCK_INDEX;


/** The read buffer in file seek. This is used in {@link #readCurrentBlockToEnd()}. */ /** The read buffer in file seek. This is used in {@link #readCurrentBlockToEnd()}. */
private final byte[] mSeekBuffer; private final byte[] mSeekBuffer;
Expand Down Expand Up @@ -196,15 +199,15 @@ public int read(byte[] b, int off, int len) throws IOException {


private int readInternal() throws IOException { private int readInternal() throws IOException {
if (remainingInternal() <= 0) { if (remainingInternal() <= 0) {
return -1; return EOF_BLOCK_ID;
} }
updateStreams(); updateStreams();
Preconditions.checkState(mCurrentBlockInStream != null, PreconditionMessage.ERR_UNEXPECTED_EOF); Preconditions.checkState(mCurrentBlockInStream != null, PreconditionMessage.ERR_UNEXPECTED_EOF);


int data = mCurrentBlockInStream.read(); int data = mCurrentBlockInStream.read();
if (data == -1) { if (data == EOF_DATA) {
// The underlying stream is done. // The underlying stream is done.
return -1; return EOF_DATA;
} }


mPos++; mPos++;
Expand All @@ -225,7 +228,7 @@ private int readInternal(byte[] b, int off, int len) throws IOException {
if (len == 0) { if (len == 0) {
return 0; return 0;
} else if (remainingInternal() <= 0) { } else if (remainingInternal() <= 0) {
return -1; return EOF_DATA;
} }


int currentOffset = off; int currentOffset = off;
Expand Down Expand Up @@ -258,7 +261,7 @@ private int readInternal(byte[] b, int off, int len) throws IOException {


if (bytesLeftToRead == len && mCurrentBlockInStream.remaining() == 0) { if (bytesLeftToRead == len && mCurrentBlockInStream.remaining() == 0) {
// Nothing was read, and the underlying stream is done. // Nothing was read, and the underlying stream is done.
return -1; return EOF_DATA;
} }


return len - bytesLeftToRead; return len - bytesLeftToRead;
Expand All @@ -271,7 +274,7 @@ public int positionedRead(long pos, byte[] b, int off, int len) throws IOExcepti


private int positionedReadInternal(long pos, byte[] b, int off, int len) throws IOException { private int positionedReadInternal(long pos, byte[] b, int off, int len) throws IOException {
if (pos < 0 || pos >= mFileLength) { if (pos < 0 || pos >= mFileLength) {
return -1; return EOF_DATA;
} }


// If partial read cache is enabled, we fall back to the normal read. // If partial read cache is enabled, we fall back to the normal read.
Expand Down Expand Up @@ -387,7 +390,7 @@ protected long getBlockSize(long pos) {
* @return true if the block stream should be updated * @return true if the block stream should be updated
*/ */
protected boolean shouldUpdateStreams() { protected boolean shouldUpdateStreams() {
if (mBlockIdIndex == mBlockIds.size()) { if (mBlockIndex == mBlockIds.size()) {
// EOF. // EOF.
return false; return false;
} }
Expand Down Expand Up @@ -437,15 +440,15 @@ private void closeOrCancelCacheStream() {
} }


/** /**
* @return the current block id based on mBlockIdIndex, -1 if at the end of the file * @return the current block id based on mBlockIndex, -1 if at the end of the file
*/ */
private long getCurrentBlockId() { private long getCurrentBlockId() {
if (remainingInternal() <= 0) { if (remainingInternal() <= 0) {
return -1; return EOF_BLOCK_ID;
} }
Preconditions.checkState(mBlockIdIndex < mBlockIds.size(), Preconditions.checkState(mBlockIndex < mBlockIds.size(),
PreconditionMessage.ERR_BLOCK_INDEX.toString(), mBlockIdIndex, mPos, mBlockIds.size() - 1); PreconditionMessage.ERR_BLOCK_INDEX.toString(), mBlockIndex, mPos, mBlockIds.size() - 1);
return mBlockIds.get(mBlockIdIndex); return mBlockIds.get(mBlockIndex);
} }


/** /**
Expand Down Expand Up @@ -481,30 +484,34 @@ private void handleCacheStreamException(IOException e) {


/** /**
* Checks whether the current block stream has remaining bytes, if not, advance to the next block * Checks whether the current block stream has remaining bytes, if not, advance to the next block
* stream, otherwise, this is a no-op. This should only be used in streaming read APIs. * stream, otherwise, this is a no-op.
* For APIs like {@link #seek(long)}, {@link #skip(long)}, the block stream should be updated *
* according to the current read position, use {@link #updateStreamsBasedOnPosition()} instead. * This should only be used in streaming read APIs. For APIs like {@link #seek(long)},
* {@link #skip(long)}, the block stream should be updated according to the current read position,
* use {@link #updateStreamsBasedOnPosition()} instead.
*/ */
private void updateStreams() throws IOException { private void updateStreams() throws IOException {
assureCacheStreamInSync(); assureCacheStreamInSync();
if (shouldUpdateStreams()) { if (shouldUpdateStreams()) {
mBlockIdIndex++; mBlockIndex++;
updateStreamsInternal(); updateStreamsInternal();
} }
} }


/** /**
* Computes the current block stream ID based on the current read position, if the stream ID is * Computes the current block stream ID based on the current read position, if the stream ID is
* different from the current block stream ID, then updates the stream. * different from the current block stream ID, or the stream ID is the same as the current
* block stream ID but the current stream has no remaining bytes, then updates the stream.
*
* Comparing to {@link #updateStreams()}, this method has overhead for computing the * Comparing to {@link #updateStreams()}, this method has overhead for computing the
* block stream ID, only use this method for non-streaming read APIs like {@link #seek(long)} * block stream ID, only use this method for non-streaming read APIs like {@link #seek(long)}
* and {@link #skip(long)}. * and {@link #skip(long)}.
*/ */
private void updateStreamsBasedOnPosition() throws IOException { private void updateStreamsBasedOnPosition() throws IOException {
assureCacheStreamInSync(); assureCacheStreamInSync();
int blockIdIndex = (int) (mPos / mBlockSize); int blockIndex = (int) (mPos / mBlockSize);
if (blockIdIndex != mBlockIdIndex || shouldUpdateStreams()) { if (blockIndex != mBlockIndex || shouldUpdateStreams()) {
mBlockIdIndex = blockIdIndex; mBlockIndex = blockIndex;
updateStreamsInternal(); updateStreamsInternal();
} }
} }
Expand Down Expand Up @@ -589,8 +596,7 @@ private void updateBlockInStream(long blockId) throws IOException {
mCurrentBlockInStream = null; mCurrentBlockInStream = null;
} }


// blockId = -1 if mPos = EOF. if (blockId == EOF_BLOCK_ID) {
if (blockId < 0) {
return; return;
} }
mCurrentBlockInStream = getBlockInStream(blockId); mCurrentBlockInStream = getBlockInStream(blockId);
Expand Down

0 comments on commit 54a4b0c

Please sign in to comment.