Skip to content

Commit

Permalink
Choose HDFS read API based on Client buffer size
Browse files Browse the repository at this point in the history
This PR is a follow up to #10763.

When the client buffer size is large ( > 2MB) and reads are guaranteed
to be somewhat sequential, the `pread` API to HDFS is not as efficient
as simple `read`. We introduce a heuristic to choose which API to use.

pr-link: #10845
change-id: cid-3301fd510474a9c676017bbec75ade215a3eab85
  • Loading branch information
madanadit committed Feb 12, 2020
1 parent 997d57b commit 4caad25
Show file tree
Hide file tree
Showing 21 changed files with 397 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public static BlockInStream create(FileSystemContext context, BlockInfo info,
ReadRequest.newBuilder().setBlockId(blockId).setPromote(readType.isPromote());
// Add UFS fallback options
builder.setOpenUfsBlockOptions(options.getOpenUfsBlockOptions(blockId));
builder.setPositionShort(options.getPositionShort());
AlluxioConfiguration alluxioConf = context.getClusterConf();
boolean shortCircuit = alluxioConf.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED);
boolean shortCircuitPreferred =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ private int positionedReadInternal(long pos, byte[] b, int off, int len) throws
return -1;
}

if (len < mContext.getPathConf(new AlluxioURI(mStatus.getPath()))
.getBytes(PropertyKey.USER_FILE_SEQUENTIAL_PREAD_THRESHOLD)) {
mOptions.setPositionShort(true);
}
int lenCopy = len;
CountingRetry retry = new CountingRetry(mBlockWorkerClientReadRetry);
IOException lastException = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class InStreamOptions {
private final URIStatus mStatus;
private final OpenFilePOptions mProtoOptions;
private BlockLocationPolicy mUfsReadLocationPolicy;
private boolean mPositionShort;

/**
* Creates with the default {@link OpenFilePOptions}.
Expand Down Expand Up @@ -74,6 +75,7 @@ public InStreamOptions(URIStatus status, OpenFilePOptions options,
mProtoOptions = openOptions;
mUfsReadLocationPolicy = BlockLocationPolicy.Factory.create(
alluxioConf.get(PropertyKey.USER_UFS_BLOCK_READ_LOCATION_POLICY), alluxioConf);
mPositionShort = false;
}

/**
Expand All @@ -94,6 +96,15 @@ public void setUfsReadLocationPolicy(BlockLocationPolicy ufsReadLocationPolicy)
mUfsReadLocationPolicy = Preconditions.checkNotNull(ufsReadLocationPolicy);
}

/**
* Sets whether the operation is positioned read to a small buffer.
*
* @param positionShort whether the operation is positioned read to a small buffer
*/
public void setPositionShort(boolean positionShort) {
mPositionShort = positionShort;
}

/**
* @return the {@link BlockLocationPolicy} associated with the instream
*/
Expand All @@ -108,6 +119,13 @@ public URIStatus getStatus() {
return mStatus;
}

/**
* @return true, if the operation is using positioned read to a small buffer size
*/
public boolean getPositionShort() {
return mPositionShort;
}

/**
* @param blockId id of the block
* @return the block info associated with the block id, note that this will be a cached copy
Expand Down Expand Up @@ -160,14 +178,16 @@ public boolean equals(Object o) {
}
InStreamOptions that = (InStreamOptions) o;
return Objects.equal(mStatus, that.mStatus)
&& Objects.equal(mProtoOptions, that.mProtoOptions);
&& Objects.equal(mProtoOptions, that.mProtoOptions)
&& Objects.equal(mPositionShort, that.mPositionShort);
}

@Override
public int hashCode() {
return Objects.hashCode(
mStatus,
mProtoOptions
mProtoOptions,
mPositionShort
);
}

Expand All @@ -176,6 +196,7 @@ public String toString() {
return MoreObjects.toStringHelper(this)
.add("URIStatus", mStatus)
.add("OpenFileOptions", mProtoOptions)
.add("PositionShort", mPositionShort)
.toString();
}
}
13 changes: 13 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -2920,6 +2920,17 @@ public String toString() {
+ "before this file is persisted.")
.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_FILE_SEQUENTIAL_PREAD_THRESHOLD =
new Builder(Name.USER_FILE_SEQUENTIAL_PREAD_THRESHOLD)
.setDefaultValue("2MB")
.setDescription("An upper bound on the client buffer size for positioned read to hint "
+ "at the sequential nature of reads. For reads with a buffer size greater than this "
+ "threshold, the read op is treated to be sequential and the worker may handle the "
+ "read differently. For instance, cold reads from the HDFS ufs may use a different "
+ "HDFS client API.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_BLOCK_SIZE_BYTES_DEFAULT =
new Builder(Name.USER_BLOCK_SIZE_BYTES_DEFAULT)
.setDefaultValue("64MB")
Expand Down Expand Up @@ -4510,6 +4521,8 @@ public static final class Name {
public static final String USER_FILE_REPLICATION_MIN = "alluxio.user.file.replication.min";
public static final String USER_FILE_REPLICATION_DURABLE =
"alluxio.user.file.replication.durable";
public static final String USER_FILE_SEQUENTIAL_PREAD_THRESHOLD =
"alluxio.user.file.sequential.pread.threshold";
public static final String USER_FILE_UFS_TIER_ENABLED = "alluxio.user.file.ufs.tier.enabled";
public static final String USER_FILE_WAITCOMPLETED_POLL_MS =
"alluxio.user.file.waitcompleted.poll";
Expand Down
25 changes: 23 additions & 2 deletions core/common/src/main/java/alluxio/underfs/options/OpenOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public final class OpenOptions {

private long mLength;

private boolean mPositionShort;

/**
* If true, attempt to recover after failed opened attempts. Extra effort may be required in
* order to recover from a failed open.
Expand All @@ -49,6 +51,7 @@ private OpenOptions() {
mOffset = 0;
mLength = Long.MAX_VALUE;
mRecoverFailedOpen = false;
mPositionShort = false;
}

/**
Expand All @@ -72,6 +75,13 @@ public boolean getRecoverFailedOpen() {
return mRecoverFailedOpen;
}

/**
* @return true, if the operation is using positioned read to a small buffer size
*/
public boolean getPositionShort() {
return mPositionShort;
}

/**
* Sets the offset from the start of a file to be opened for reading.
*
Expand Down Expand Up @@ -101,6 +111,15 @@ public OpenOptions setRecoverFailedOpen(boolean recover) {
return this;
}

/**
* @param positionShort whether the operation is positioned read to a small buffer
* @return the updated option object
*/
public OpenOptions setPositionShort(boolean positionShort) {
mPositionShort = positionShort;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -112,12 +131,13 @@ public boolean equals(Object o) {
OpenOptions that = (OpenOptions) o;
return Objects.equal(mOffset, that.mOffset)
&& Objects.equal(mLength, that.mLength)
&& Objects.equal(mRecoverFailedOpen, that.mRecoverFailedOpen);
&& Objects.equal(mRecoverFailedOpen, that.mRecoverFailedOpen)
&& Objects.equal(mPositionShort, that.mPositionShort);
}

@Override
public int hashCode() {
return Objects.hashCode(mOffset, mLength, mRecoverFailedOpen);
return Objects.hashCode(mOffset, mLength, mRecoverFailedOpen, mPositionShort);
}

@Override
Expand All @@ -126,6 +146,7 @@ public String toString() {
.add("offset", mOffset)
.add("length", mLength)
.add("recoverFailedOpen", mRecoverFailedOpen)
.add("positionShort", mPositionShort)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private boolean cacheBlockFromUfs(long blockId, long blockSize,
return true;
}
try (BlockReader reader = mBlockWorker
.readUfsBlock(Sessions.ASYNC_CACHE_UFS_SESSION_ID, blockId, 0)) {
.readUfsBlock(Sessions.ASYNC_CACHE_UFS_SESSION_ID, blockId, 0, false)) {
// Read the entire block, caching to block store will be handled internally in UFS block store
// Note that, we read from UFS with a smaller buffer to avoid high pressure on heap
// memory when concurrent async requests are received and thus trigger GC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,11 @@ BlockReader readBlockRemote(long sessionId, long blockId, long lockId)
* @param sessionId the client session ID
* @param blockId the ID of the UFS block to read
* @param offset the offset within the block
* @param positionShort whether the operation is using positioned read to a small buffer size
* @return the block reader instance
* @throws BlockDoesNotExistException if the block does not exist in the UFS block store
*/
BlockReader readUfsBlock(long sessionId, long blockId, long offset)
BlockReader readUfsBlock(long sessionId, long blockId, long offset, boolean positionShort)
throws BlockDoesNotExistException, IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,9 @@ public BlockReader readBlockRemote(long sessionId, long blockId, long lockId)
}

@Override
public BlockReader readUfsBlock(long sessionId, long blockId, long offset)
public BlockReader readUfsBlock(long sessionId, long blockId, long offset, boolean positionShort)
throws BlockDoesNotExistException, IOException {
return mUnderFileSystemBlockStore.getBlockReader(sessionId, blockId, offset);
return mUnderFileSystemBlockStore.getBlockReader(sessionId, blockId, offset, positionShort);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ public InputStream acquire(UnderFileSystem ufs, String path, long fileId, OpenOp
inputStream = mUnderFileInputStreamCache.get(nextId, () -> {
SeekableUnderFileInputStream ufsStream
= (SeekableUnderFileInputStream) ufs.openExistingFile(path,
OpenOptions.defaults().setOffset(openOptions.getOffset()));
OpenOptions.defaults()
.setPositionShort(openOptions.getPositionShort())
.setOffset(openOptions.getOffset()));
LOG.debug("Created the under file input stream resource of {}", newId);
return new CachedSeekableInputStream(ufsStream, newId, fileId, path);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public final class UnderFileSystemBlockReader implements BlockReader {
private final UfsInputStreamManager mUfsInstreamManager;
/** The ufs client resource. */
private CloseableResource<UnderFileSystem> mUfsResource;
private boolean mIsPositionShort;

/**
* The position of mUnderFileSystemInputStream (if not null) is blockStart + mInStreamPos.
Expand All @@ -93,13 +94,15 @@ public final class UnderFileSystemBlockReader implements BlockReader {
* @param localBlockStore the Local block store
* @param ufsManager the manager of ufs
* @param ufsInstreamManager the manager of ufs instreams
* @param positionShort whether the client op is a positioned read to a small buffer
* @return the block reader
*/
public static UnderFileSystemBlockReader create(UnderFileSystemBlockMeta blockMeta, long offset,
BlockStore localBlockStore, UfsManager ufsManager, UfsInputStreamManager ufsInstreamManager)
throws IOException {
boolean positionShort, BlockStore localBlockStore, UfsManager ufsManager,
UfsInputStreamManager ufsInstreamManager) throws IOException {
UnderFileSystemBlockReader ufsBlockReader =
new UnderFileSystemBlockReader(blockMeta, localBlockStore, ufsManager, ufsInstreamManager);
new UnderFileSystemBlockReader(blockMeta, positionShort, localBlockStore, ufsManager,
ufsInstreamManager);
ufsBlockReader.init(offset);
return ufsBlockReader;
}
Expand All @@ -110,10 +113,12 @@ public static UnderFileSystemBlockReader create(UnderFileSystemBlockMeta blockMe
* @param blockMeta the block meta
* @param localBlockStore the Local block store
* @param ufsManager the manager of ufs
* @param positionShort whether the client op is a positioned read to a small buffer
* @param ufsInstreamManager the manager of ufs instreams
*/
private UnderFileSystemBlockReader(UnderFileSystemBlockMeta blockMeta, BlockStore localBlockStore,
UfsManager ufsManager, UfsInputStreamManager ufsInstreamManager) throws IOException {
private UnderFileSystemBlockReader(UnderFileSystemBlockMeta blockMeta, boolean positionShort,
BlockStore localBlockStore, UfsManager ufsManager, UfsInputStreamManager ufsInstreamManager)
throws IOException {
mInitialBlockSize = ServerConfiguration.getBytes(PropertyKey.WORKER_FILE_BUFFER_SIZE);
mBlockMeta = blockMeta;
mLocalBlockStore = localBlockStore;
Expand All @@ -123,6 +128,7 @@ private UnderFileSystemBlockReader(UnderFileSystemBlockMeta blockMeta, BlockStor
UfsManager.UfsClient ufsClient = mUfsManager.get(mBlockMeta.getMountId());
mUfsResource = ufsClient.acquireUfsResource();
mUfsMountPointUri = ufsClient.getUfsMountPointUri();
mIsPositionShort = positionShort;
}

/**
Expand Down Expand Up @@ -302,7 +308,9 @@ private void updateUnderFileSystemInputStream(long offset) throws IOException {
UnderFileSystem ufs = mUfsResource.get();
mUnderFileSystemInputStream = mUfsInstreamManager.acquire(ufs,
mBlockMeta.getUnderFileSystemPath(), IdUtils.fileIdFromBlockId(mBlockMeta.getBlockId()),
OpenOptions.defaults().setOffset(mBlockMeta.getOffset() + offset));
OpenOptions.defaults()
.setOffset(mBlockMeta.getOffset() + offset)
.setPositionShort(mIsPositionShort));
mInStreamPos = offset;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,13 @@ public void cleanupSession(long sessionId) {
* @param sessionId the client session ID that requested this read
* @param blockId the ID of the block to read
* @param offset the read offset within the block (NOT the file)
* @param positionShort whether the client op is a positioned read to a small buffer
* @return the block reader instance
* @throws BlockDoesNotExistException if the UFS block does not exist in the
* {@link UnderFileSystemBlockStore}
*/
public BlockReader getBlockReader(final long sessionId, long blockId, long offset)
throws BlockDoesNotExistException, IOException {
public BlockReader getBlockReader(final long sessionId, long blockId, long offset,
boolean positionShort) throws BlockDoesNotExistException, IOException {
final BlockInfo blockInfo;
try (LockResource lr = new LockResource(mLock)) {
blockInfo = getBlockInfo(sessionId, blockId);
Expand All @@ -239,8 +240,8 @@ public BlockReader getBlockReader(final long sessionId, long blockId, long offse
}
}
BlockReader reader =
UnderFileSystemBlockReader.create(blockInfo.getMeta(), offset, mLocalBlockStore,
mUfsManager, mUfsInstreamManager);
UnderFileSystemBlockReader.create(blockInfo.getMeta(), offset, positionShort,
mLocalBlockStore, mUfsManager, mUfsInstreamManager);
blockInfo.setBlockReader(reader);
return reader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ private void openBlock(BlockReadRequestContext context, StreamObserver<ReadRespo
if (mWorker.openUfsBlock(request.getSessionId(), request.getId(),
Protocol.OpenUfsBlockOptions.parseFrom(openUfsBlockOptions.toByteString()))) {
BlockReader reader =
mWorker.readUfsBlock(request.getSessionId(), request.getId(), request.getStart());
mWorker.readUfsBlock(request.getSessionId(), request.getId(), request.getStart(),
request.isPositionShort());
AlluxioURI ufsMountPointUri =
((UnderFileSystemBlockReader) reader).getUfsMountPointUri();
String ufsString = MetricsSystem.escape(ufsMountPointUri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
public final class BlockReadRequest extends ReadRequest {
private final Protocol.OpenUfsBlockOptions mOpenUfsBlockOptions;
private final boolean mPromote;
private final boolean mPositionShort;

/**
* Creates an instance of {@link BlockReadRequest}.
Expand All @@ -40,6 +41,7 @@ public final class BlockReadRequest extends ReadRequest {
mOpenUfsBlockOptions = null;
}
mPromote = request.getPromote();
mPositionShort = request.getPositionShort();
// Note that we do not need to seek to offset since the block worker is created at the offset.
}

Expand All @@ -50,6 +52,13 @@ public boolean isPromote() {
return mPromote;
}

/**
* @return if this is a positioned read to a small buffer
*/
public boolean isPositionShort() {
return mPositionShort;
}

/**
* @return the option to open UFS block
*/
Expand All @@ -74,6 +83,7 @@ public String toString() {
.add("promote", mPromote)
.add("sessionId", getSessionId())
.add("start", getStart())
.add("positionShort", isPositionShort())
.toString();
}
}
Loading

0 comments on commit 4caad25

Please sign in to comment.