Skip to content

Commit

Permalink
Send block info from client. (#6793)
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinjia committed Jan 19, 2018
1 parent 90ede12 commit 628f25f
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,9 @@ private void closeBlockInStream(BlockInStream stream) throws IOException {
}
try {
// Construct the async cache request
long blockLength = mOptions.getBlockInfo(blockId).getLength();
Protocol.AsyncCacheRequest request =
Protocol.AsyncCacheRequest.newBuilder().setBlockId(blockId)
Protocol.AsyncCacheRequest.newBuilder().setBlockId(blockId).setLength(blockLength)
.setOpenUfsBlockOptions(mOptions.getOpenUfsBlockOptions(blockId))
.setSourceHost(dataSource.getHost()).setSourcePort(dataSource.getDataPort())
.build();
Expand Down
147 changes: 114 additions & 33 deletions core/protobuf/src/main/java/alluxio/proto/dataserver/Protocol.java

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion core/protobuf/src/proto/dataserver/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ message ReadRequest {
}

// Options for caching a block asynchronously
// next available id: 5
// next available id: 6
message AsyncCacheRequest {
optional int64 block_id = 1;
// TODO(calvin): source host and port should be replace with WorkerNetAddress
optional string source_host = 2;
optional int32 source_port = 3;
optional OpenUfsBlockOptions open_ufs_block_options = 4;
optional int64 length = 5;
}

// Options to open a UFS block.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,24 @@ public AsyncCacheRequestManager(ExecutorService service, BlockWorker blockWorker
*/
public void submitRequest(Protocol.AsyncCacheRequest request) {
long blockId = request.getBlockId();
long blockLength = request.getLength();
if (mPendingRequests.putIfAbsent(blockId, request) != null) {
// This block is already planned.
return;
}
try {
mAsyncCacheExecutor.submit(() -> {
Protocol.OpenUfsBlockOptions openUfsBlockOptions = request.getOpenUfsBlockOptions();
long blockSize = openUfsBlockOptions.getBlockSize();
boolean isSourceLocal = mLocalWorkerHostname.equals(request.getSourceHost());
// Depends on the request, cache the target block from different sources
boolean result;
if (isSourceLocal) {
result = cacheBlockFromUfs(blockId, blockSize, openUfsBlockOptions);
result = cacheBlockFromUfs(blockId, blockLength, openUfsBlockOptions);
} else {
InetSocketAddress sourceAddress =
new InetSocketAddress(request.getSourceHost(), request.getSourcePort());
result =
cacheBlockFromRemoteWorker(blockId, blockSize, sourceAddress, openUfsBlockOptions);
cacheBlockFromRemoteWorker(blockId, blockLength, sourceAddress, openUfsBlockOptions);
}
LOG.debug("Result of async caching block {}: {}", blockId, result);
mPendingRequests.remove(blockId);
Expand Down

0 comments on commit 628f25f

Please sign in to comment.