Skip to content

Commit

Permalink
add the block source to decide if triggering the caching
Browse files Browse the repository at this point in the history
  • Loading branch information
yupeng9 committed Aug 18, 2017
1 parent 06a6ce4 commit ed99b7c
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 68 deletions.
Expand Up @@ -14,6 +14,7 @@
import alluxio.client.block.policy.BlockLocationPolicy; import alluxio.client.block.policy.BlockLocationPolicy;
import alluxio.client.block.policy.options.GetWorkerOptions; import alluxio.client.block.policy.options.GetWorkerOptions;
import alluxio.client.block.stream.BlockInStream; import alluxio.client.block.stream.BlockInStream;
import alluxio.client.block.stream.BlockInStream.BlockInStreamSource;
import alluxio.client.block.stream.BlockOutStream; import alluxio.client.block.stream.BlockOutStream;
import alluxio.client.file.FileSystemContext; import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions; import alluxio.client.file.options.InStreamOptions;
Expand Down Expand Up @@ -133,6 +134,7 @@ public BlockInStream getInStream(long blockId, Protocol.OpenUfsBlockOptions open
blockInfo = masterClientResource.get().getBlockInfo(blockId); blockInfo = masterClientResource.get().getBlockInfo(blockId);
} }


BlockInStreamSource source = BlockInStreamSource.UFS;
if (blockInfo.getLocations().isEmpty() && openUfsBlockOptions == null) { if (blockInfo.getLocations().isEmpty() && openUfsBlockOptions == null) {
throw new NotFoundException("Block " + blockId + " is unavailable in both Alluxio and UFS."); throw new NotFoundException("Block " + blockId + " is unavailable in both Alluxio and UFS.");
} }
Expand All @@ -157,6 +159,7 @@ public BlockInStream getInStream(long blockId, Protocol.OpenUfsBlockOptions open
WorkerNetAddress workerNetAddress = location.getWorkerAddress(); WorkerNetAddress workerNetAddress = location.getWorkerAddress();
if (workerNetAddress.getHost().equals(mLocalHostName)) { if (workerNetAddress.getHost().equals(mLocalHostName)) {
address = workerNetAddress; address = workerNetAddress;
source = BlockInStreamSource.LOCAL;
break; break;
} }
} }
Expand All @@ -165,9 +168,10 @@ public BlockInStream getInStream(long blockId, Protocol.OpenUfsBlockOptions open
// only randomize among locations in the highest tier, or have the master randomize the order. // only randomize among locations in the highest tier, or have the master randomize the order.
List<BlockLocation> locations = blockInfo.getLocations(); List<BlockLocation> locations = blockInfo.getLocations();
address = locations.get(mRandom.nextInt(locations.size())).getWorkerAddress(); address = locations.get(mRandom.nextInt(locations.size())).getWorkerAddress();
source = BlockInStreamSource.REMOTE;
} }
return BlockInStream return BlockInStream.create(mContext, blockId, blockInfo.getLength(), address, source,
.create(mContext, blockId, blockInfo.getLength(), address, openUfsBlockOptions, options); openUfsBlockOptions, options);
} }


/** /**
Expand Down
Expand Up @@ -42,13 +42,18 @@
@NotThreadSafe @NotThreadSafe
public class BlockInStream extends InputStream implements BoundedStream, Seekable, public class BlockInStream extends InputStream implements BoundedStream, Seekable,
PositionedReadable, Locatable { PositionedReadable, Locatable {
/** the source tracking where the block is from. */
public enum BlockInStreamSource {
LOCAL, REMOTE, UFS
}

/** The id of the block or UFS file to which this instream provides access. */ /** The id of the block or UFS file to which this instream provides access. */
private final long mId; private final long mId;
/** The size in bytes of the block. */ /** The size in bytes of the block. */
private final long mLength; private final long mLength;


private final byte[] mSingleByte = new byte[1]; private final byte[] mSingleByte = new byte[1];
private final boolean mLocal; private final BlockInStreamSource mInStreamSource;
private final WorkerNetAddress mAddress; private final WorkerNetAddress mAddress;


/** Current position of the stream, relative to the start of the block. */ /** Current position of the stream, relative to the start of the block. */
Expand All @@ -69,17 +74,19 @@ public class BlockInStream extends InputStream implements BoundedStream, Seekabl
* @param blockId the block ID * @param blockId the block ID
* @param blockSize the block size in bytes * @param blockSize the block size in bytes
* @param address the Alluxio worker address * @param address the Alluxio worker address
* @param blockSource the source location of the block
* @param openUfsBlockOptions the options to open a UFS block, set to null if this is block is * @param openUfsBlockOptions the options to open a UFS block, set to null if this is block is
* not persisted in UFS * not persisted in UFS
* @param options the in stream options * @param options the in stream options
* @return the {@link InputStream} object * @return the {@link InputStream} object
*/ */
public static BlockInStream create(FileSystemContext context, long blockId, long blockSize, public static BlockInStream create(FileSystemContext context, long blockId, long blockSize,
WorkerNetAddress address, Protocol.OpenUfsBlockOptions openUfsBlockOptions, WorkerNetAddress address, BlockInStreamSource blockSource,
InStreamOptions options) throws IOException { Protocol.OpenUfsBlockOptions openUfsBlockOptions, InStreamOptions options)
if (CommonUtils.isLocalHost(address) && Configuration throws IOException {
.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED) && !NettyUtils if (Configuration.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED)
.isDomainSocketSupported(address)) { && !NettyUtils.isDomainSocketSupported(address)
&& blockSource == BlockInStreamSource.LOCAL) {
try { try {
return createLocalBlockInStream(context, address, blockId, blockSize, options); return createLocalBlockInStream(context, address, blockId, blockSize, options);
} catch (NotFoundException e) { } catch (NotFoundException e) {
Expand All @@ -93,7 +100,8 @@ public static BlockInStream create(FileSystemContext context, long blockId, long
builder.setOpenUfsBlockOptions(openUfsBlockOptions); builder.setOpenUfsBlockOptions(openUfsBlockOptions);
} }


return createNettyBlockInStream(context, address, builder.buildPartial(), blockSize, options); return createNettyBlockInStream(context, address, blockSource, builder.buildPartial(),
blockSize, options);
} }


/** /**
Expand All @@ -112,44 +120,47 @@ private static BlockInStream createLocalBlockInStream(FileSystemContext context,
long packetSize = Configuration.getBytes(PropertyKey.USER_LOCAL_READER_PACKET_SIZE_BYTES); long packetSize = Configuration.getBytes(PropertyKey.USER_LOCAL_READER_PACKET_SIZE_BYTES);
return new BlockInStream( return new BlockInStream(
new LocalFilePacketReader.Factory(context, address, blockId, packetSize, options), address, new LocalFilePacketReader.Factory(context, address, blockId, packetSize, options), address,
blockId, length); BlockInStreamSource.LOCAL, blockId, length);
} }


/** /**
* Creates a {@link BlockInStream} to read from a netty data server. * Creates a {@link BlockInStream} to read from a netty data server.
* *
* @param context the file system context * @param context the file system context
* @param address the address of the netty data server * @param address the address of the netty data server
* @param blockSource the source location of the block
* @param blockSize the block size * @param blockSize the block size
* @param readRequestPartial the partial read request * @param readRequestPartial the partial read request
* @param options the in stream options * @param options the in stream options
* @return the {@link BlockInStream} created * @return the {@link BlockInStream} created
*/ */
private static BlockInStream createNettyBlockInStream(FileSystemContext context, private static BlockInStream createNettyBlockInStream(FileSystemContext context,
WorkerNetAddress address, Protocol.ReadRequest readRequestPartial, long blockSize, WorkerNetAddress address, BlockInStreamSource blockSource,
InStreamOptions options) { Protocol.ReadRequest readRequestPartial, long blockSize, InStreamOptions options) {
long packetSize = long packetSize =
Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_READER_PACKET_SIZE_BYTES); Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_READER_PACKET_SIZE_BYTES);
PacketReader.Factory factory = new NettyPacketReader.Factory(context, address, PacketReader.Factory factory = new NettyPacketReader.Factory(context, address,
readRequestPartial.toBuilder().setPacketSize(packetSize).buildPartial(), options); readRequestPartial.toBuilder().setPacketSize(packetSize).buildPartial(), options);
return new BlockInStream(factory, address, readRequestPartial.getBlockId(), blockSize); return new BlockInStream(factory, address, blockSource, readRequestPartial.getBlockId(),
blockSize);
} }


/** /**
* Creates an instance of {@link BlockInStream}. * Creates an instance of {@link BlockInStream}.
* *
* @param packetReaderFactory the packet reader factory * @param packetReaderFactory the packet reader factory
* @param address the worker network address * @param address the worker network address
* @param blockSource the source location of the block
* @param id the ID (either block ID or UFS file ID) * @param id the ID (either block ID or UFS file ID)
* @param length the length * @param length the length
*/ */
protected BlockInStream(PacketReader.Factory packetReaderFactory, WorkerNetAddress address, protected BlockInStream(PacketReader.Factory packetReaderFactory, WorkerNetAddress address,
long id, long length) { BlockInStreamSource blockSource, long id, long length) {
mPacketReaderFactory = packetReaderFactory; mPacketReaderFactory = packetReaderFactory;
mId = id; mId = id;
mLength = length; mLength = length;
mAddress = address; mAddress = address;
mLocal = CommonUtils.isLocalHost(mAddress); mInStreamSource = blockSource;
} }


@Override @Override
Expand Down Expand Up @@ -328,6 +339,13 @@ public WorkerNetAddress location() {


@Override @Override
public boolean isLocal() { public boolean isLocal() {
return mLocal; return CommonUtils.isLocalHost(mAddress);
}

/**
* @return the source of the block location
*/
public BlockInStreamSource Source() {
return mInStreamSource;
} }
} }
68 changes: 36 additions & 32 deletions core/client/fs/src/main/java/alluxio/client/file/FileInStream.java
Expand Up @@ -21,6 +21,7 @@
import alluxio.client.PositionedReadable; import alluxio.client.PositionedReadable;
import alluxio.client.block.AlluxioBlockStore; import alluxio.client.block.AlluxioBlockStore;
import alluxio.client.block.stream.BlockInStream; import alluxio.client.block.stream.BlockInStream;
import alluxio.client.block.stream.BlockInStream.BlockInStreamSource;
import alluxio.client.block.stream.BlockOutStream; import alluxio.client.block.stream.BlockOutStream;
import alluxio.client.file.options.InStreamOptions; import alluxio.client.file.options.InStreamOptions;
import alluxio.client.file.options.OutStreamOptions; import alluxio.client.file.options.OutStreamOptions;
Expand Down Expand Up @@ -85,7 +86,7 @@ public class FileInStream extends InputStream implements BoundedStream, Seekable
* Caches the entire block even if only a portion of the block is read. Only valid when * Caches the entire block even if only a portion of the block is read. Only valid when
* mShouldCache is true. * mShouldCache is true.
*/ */
private final boolean mCachePartiallyReadBlockEnabled; private final boolean mCachePartiallyReadBlock;
/** Whether to cache blocks in this file into Alluxio. */ /** Whether to cache blocks in this file into Alluxio. */
private final boolean mShouldCache; private final boolean mShouldCache;


Expand Down Expand Up @@ -131,7 +132,7 @@ protected FileInStream(URIStatus status, InStreamOptions options, FileSystemCont
mContext = context; mContext = context;
mAlluxioStorageType = options.getAlluxioStorageType(); mAlluxioStorageType = options.getAlluxioStorageType();
mShouldCache = mAlluxioStorageType.isStore(); mShouldCache = mAlluxioStorageType.isStore();
mCachePartiallyReadBlockEnabled = options.isCachePartiallyReadBlock(); mCachePartiallyReadBlock = options.isCachePartiallyReadBlock();
mClosed = false; mClosed = false;
if (mShouldCache) { if (mShouldCache) {
Preconditions.checkNotNull(options.getCacheLocationPolicy(), Preconditions.checkNotNull(options.getCacheLocationPolicy(),
Expand Down Expand Up @@ -312,7 +313,7 @@ public void seek(long pos) throws IOException {
* @return if the partially-read block should be cached to the local worker * @return if the partially-read block should be cached to the local worker
*/ */
private boolean shouldCachePartiallyReadBlock() { private boolean shouldCachePartiallyReadBlock() {
return mShouldCache && mCachePartiallyReadBlockEnabled && mCurrentCacheStream != null; return mShouldCache && mCachePartiallyReadBlock;
} }


@Override @Override
Expand Down Expand Up @@ -476,20 +477,17 @@ private void updateStreams() throws IOException {


/** /**
* Updates {@link #mCurrentCacheStream}. When {@code mShouldCache} is true, {@code FileInStream} * Updates {@link #mCurrentCacheStream}. When {@code mShouldCache} is true, {@code FileInStream}
* will create an {@code BlockOutStream} to cache the data read only if * will create an {@code BlockOutStream} to cache the data read only if the file is read from a
* <ol> * remote worker or UFS and we have an available local worker.
* <li>the file is read from under storage, or</li> *
* <li>the file is read from a remote worker and we have an available local worker.</li>
* </ol>
* The following preconditions are checked inside: * The following preconditions are checked inside:
* <ol> * <ol>
* <li>{@link #mCurrentCacheStream} is either done or null.</li> * <li>{@link #mCurrentCacheStream} is either done or null.</li>
* <li>EOF is reached or {@link #mCurrentBlockInStream} must be valid.</li> * <li>EOF is reached or {@link #mCurrentBlockInStream} must be valid.</li>
* </ol> * </ol>
* After this call, {@link #mCurrentCacheStream} is either null or freshly created. * After this call, {@link #mCurrentCacheStream} is either null or freshly created.
* {@link #mCurrentCacheStream} is created only if the block is not cached in a chosen machine * {@link #mCurrentCacheStream} is created only if the block is not cached in a chosen machine and
* and mPos is at the beginning of a block. * mPos is at the beginning of a block. This function is only called by {@link #updateStreams()}.
* This function is only called by {@link #updateStreams()}.
* *
* @param blockId cached result of {@link #getCurrentBlockId()} * @param blockId cached result of {@link #getCurrentBlockId()}
*/ */
Expand All @@ -504,7 +502,9 @@ private void updateCacheStream(long blockId) {
return; return;
} }
Preconditions.checkNotNull(mCurrentBlockInStream, "mCurrentBlockInStream"); Preconditions.checkNotNull(mCurrentBlockInStream, "mCurrentBlockInStream");
if (!mShouldCache || mCurrentBlockInStream.isLocal()) {
// do not create cache stream when the block is in local worker
if (!mShouldCache || mCurrentBlockInStream.Source() == BlockInStreamSource.LOCAL) {
return; return;
} }


Expand Down Expand Up @@ -595,22 +595,24 @@ private void seekInternalWithCachingPartiallyReadBlock(long pos) throws IOExcept
// Precompute this because mPos will be updated several times in this function. // Precompute this because mPos will be updated several times in this function.
final boolean isInCurrentBlock = pos / mBlockSize == mPos / mBlockSize; final boolean isInCurrentBlock = pos / mBlockSize == mPos / mBlockSize;


// Make sure that mCurrentBlockInStream and mCurrentCacheStream is updated. if (!(mPos == 0 && mCurrentBlockInStream == null && !isInCurrentBlock)) {
// mPos is not updated here. // Make sure that mCurrentBlockInStream and mCurrentCacheStream is updated.
updateStreams(); // mPos is not updated here.
updateStreams();


// Cache till pos if seeking forward within the current block. Otherwise cache the whole // Cache till pos if seeking forward within the current block. Otherwise cache the whole
// block. // block.
cacheCurrentBlockToPos(pos > mPos ? pos : Long.MAX_VALUE); cacheCurrentBlockToPos(pos > mPos ? pos : Long.MAX_VALUE);


// Early return if we are at pos already. This happens if we seek forward with caching // Early return if we are at pos already. This happens if we seek forward with caching
// enabled for this block. // enabled for this block.
if (mPos == pos) { if (mPos == pos) {
return; return;
}
// The early return above guarantees that we won't close an incomplete cache stream.
Preconditions.checkState(mCurrentCacheStream == null || mCurrentCacheStream.remaining() == 0);
closeOrCancelCacheStream();
} }
// The early return above guarantees that we won't close an incomplete cache stream.
Preconditions.checkState(mCurrentCacheStream == null || mCurrentCacheStream.remaining() == 0);
closeOrCancelCacheStream();


// If seeks within the current block, directly seeks to pos if we are not yet there. // If seeks within the current block, directly seeks to pos if we are not yet there.
// If seeks outside the current block, seek to the beginning of that block first, then // If seeks outside the current block, seek to the beginning of that block first, then
Expand All @@ -627,7 +629,13 @@ private void seekInternalWithCachingPartiallyReadBlock(long pos) throws IOExcept
} else { } else {
mPos = pos / mBlockSize * mBlockSize; mPos = pos / mBlockSize * mBlockSize;
updateStreams(); updateStreams();
cacheCurrentBlockToPos(pos); if (mCurrentCacheStream != null) {
cacheCurrentBlockToPos(pos);
} else if (mCurrentBlockInStream != null) {
seekInternal(pos);
} else {
Preconditions.checkState(remaining() == 0);
}
} }
} }


Expand All @@ -645,10 +653,6 @@ private void cacheCurrentBlockToPos(long pos) throws IOException {
if (len <= 0) { if (len <= 0) {
return; return;
} }
if (mPos % mBlockSize == 0 && pos - mPos >= mBlockSize) {
closeOrCancelCacheStream();
return;
}


do { do {
// Account for the last read which might be less than mSeekBufferSizeBytes bytes. // Account for the last read which might be less than mSeekBufferSizeBytes bytes.
Expand Down
Expand Up @@ -17,8 +17,10 @@
* A {@link BlockInStream} which reads from the given byte array. * A {@link BlockInStream} which reads from the given byte array.
*/ */
public class TestBlockInStream extends BlockInStream { public class TestBlockInStream extends BlockInStream {
public TestBlockInStream(byte[] mData, long id, long length, boolean shortCircuit) { public TestBlockInStream(byte[] mData, long id, long length, boolean shortCircuit,
super(new Factory(mData, shortCircuit), new WorkerNetAddress().setHost("local"), id, length); BlockInStreamSource source) {
super(new Factory(mData, shortCircuit), new WorkerNetAddress().setHost("local"), source, id,
length);
} }


/** /**
Expand Down

0 comments on commit ed99b7c

Please sign in to comment.