Skip to content

Commit

Permalink
[ALLUXIO-2977] Implement BlockReadRequestContext and other required c…
Browse files Browse the repository at this point in the history
…lasses
  • Loading branch information
apc999 committed Aug 7, 2017
1 parent 5928aa2 commit f065de1
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 133 deletions.
Expand Up @@ -234,7 +234,7 @@ protected boolean acceptMessage(Object object) {
* @param request the block read request * @param request the block read request
* @return an instance of read request based on the request read from channel * @return an instance of read request based on the request read from channel
*/ */
protected abstract T createRequestContext(Protocol.ReadRequest request) throws Exception; protected abstract T createRequestContext(Protocol.ReadRequest request);


/** /**
* Completes the read request. When the request is closed, we should clean up any temporary state * Completes the read request. When the request is closed, we should clean up any temporary state
Expand Down
Expand Up @@ -45,7 +45,6 @@
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;


import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;


/** /**
Expand All @@ -55,7 +54,7 @@
justification = "false positive with superclass generics, " justification = "false positive with superclass generics, "
+ "see more description in https://sourceforge.net/p/findbugs/bugs/1242/") + "see more description in https://sourceforge.net/p/findbugs/bugs/1242/")
@NotThreadSafe @NotThreadSafe
final class BlockReadHandler extends AbstractReadHandler<BlockReadHandler.BlockReadRequest> { final class BlockReadHandler extends AbstractReadHandler<BlockReadRequestContext> {
private static final Logger LOG = LoggerFactory.getLogger(BlockReadHandler.class); private static final Logger LOG = LoggerFactory.getLogger(BlockReadHandler.class);
private static final long UFS_BLOCK_OPEN_TIMEOUT_MS = private static final long UFS_BLOCK_OPEN_TIMEOUT_MS =
Configuration.getMs(PropertyKey.WORKER_UFS_BLOCK_OPEN_TIMEOUT_MS); Configuration.getMs(PropertyKey.WORKER_UFS_BLOCK_OPEN_TIMEOUT_MS);
Expand All @@ -65,103 +64,39 @@ final class BlockReadHandler extends AbstractReadHandler<BlockReadHandler.BlockR
/** The transfer type used by the data server. */ /** The transfer type used by the data server. */
private final FileTransferType mTransferType; private final FileTransferType mTransferType;


/**
* The internal representation of a block read request.
*/
@NotThreadSafe
public static final class BlockReadRequest extends ReadRequest {
private final Protocol.OpenUfsBlockOptions mOpenUfsBlockOptions;
private final boolean mPromote;
private BlockReader mBlockReader;

/**
* Creates an instance of {@link BlockReadRequest}.
*
* @param request the block read request
*/
BlockReadRequest(Protocol.ReadRequest request) throws Exception {
super(request.getBlockId(), request.getOffset(), request.getOffset() + request.getLength(),
request.getPacketSize());

if (request.hasOpenUfsBlockOptions()) {
mOpenUfsBlockOptions = request.getOpenUfsBlockOptions();
} else {
mOpenUfsBlockOptions = null;
}
mPromote = request.getPromote();
// Note that we do not need to seek to offset since the block worker is created at the offset.
}

/**
* @return if the block read type indicate promote in tier storage
*/
public boolean isPromote() {
return mPromote;
}

/**
* @return the option to open UFS block
*/
public Protocol.OpenUfsBlockOptions getOpenUfsBlockOptions() {
return mOpenUfsBlockOptions;
}

/**
* @return true if the block is persisted in UFS
*/
public boolean isPersisted() {
return mOpenUfsBlockOptions != null && mOpenUfsBlockOptions.hasUfsPath();
}

/**
* @return block reader
*/
@Nullable
public BlockReader getBlockReader() {
return mBlockReader;
}

/**
* @param blockReader block reader to set
*/
public void setBlockReader(BlockReader blockReader) {
mBlockReader = blockReader;
}
}

@NotThreadSafe @NotThreadSafe
public final class BlockPacketReader extends PacketReader { public final class BlockPacketReader extends PacketReader {
/** The Block Worker. */ /** The Block Worker. */
private final BlockWorker mWorker; private final BlockWorker mWorker;
/** An object storing the mapping of tier aliases to ordinals. */ /** An object storing the mapping of tier aliases to ordinals. */
private final StorageTierAssoc mStorageTierAssoc = new WorkerStorageTierAssoc(); private final StorageTierAssoc mStorageTierAssoc = new WorkerStorageTierAssoc();


BlockPacketReader(BlockReadRequest request, Channel channel, BlockWorker blockWorker) { BlockPacketReader(BlockReadRequestContext context, Channel channel, BlockWorker blockWorker) {
super(request, channel); super(context, channel);
mWorker = blockWorker; mWorker = blockWorker;
} }


@Override @Override
protected void completeRequest(BlockReadRequest request) throws Exception { protected void completeRequest(BlockReadRequestContext context) throws Exception {
BlockReader reader = request.getBlockReader(); BlockReader reader = context.getBlockReader();
if (reader != null) { if (reader != null) {
try { try {
reader.close(); reader.close();
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Failed to close block reader for block {} with error {}.", request.getId(), LOG.warn("Failed to close block reader for block {} with error {}.",
e.getMessage()); context.getRequest().getId(), e.getMessage());
} }
} }
if (!mWorker.unlockBlock(request.getSessionId(), request.getId())) { if (!mWorker.unlockBlock(context.getRequest().getSessionId(), context.getRequest().getId())) {
mWorker.closeUfsBlock(request.getSessionId(), request.getId()); mWorker.closeUfsBlock(context.getRequest().getSessionId(), context.getRequest().getId());
} }
} }


@Override @Override
protected DataBuffer getDataBuffer(BlockReadRequest request, Channel channel, long offset, protected DataBuffer getDataBuffer(BlockReadRequestContext context, Channel channel, long offset,
int len) throws Exception { int len) throws Exception {
openBlock(request, channel); openBlock(context, channel);
BlockReader blockReader = request.getBlockReader(); BlockReader blockReader = context.getBlockReader();
Preconditions.checkState(blockReader != null); Preconditions.checkState(blockReader != null);
if (mTransferType == FileTransferType.TRANSFER if (mTransferType == FileTransferType.TRANSFER
&& (blockReader instanceof LocalFileBlockReader)) { && (blockReader instanceof LocalFileBlockReader)) {
Expand All @@ -186,10 +121,11 @@ protected DataBuffer getDataBuffer(BlockReadRequest request, Channel channel, lo
* @param channel the netty channel * @param channel the netty channel
* @throws Exception if it fails to open the block * @throws Exception if it fails to open the block
*/ */
private void openBlock(BlockReadRequest request, Channel channel) throws Exception { private void openBlock(BlockReadRequestContext context, Channel channel) throws Exception {
if (request.getBlockReader() != null) { if (context.getBlockReader() != null) {
return; return;
} }
BlockReadRequest request = context.getRequest();
int retryInterval = Constants.SECOND_MS; int retryInterval = Constants.SECOND_MS;
RetryPolicy retryPolicy = new TimeoutRetry(UFS_BLOCK_OPEN_TIMEOUT_MS, retryInterval); RetryPolicy retryPolicy = new TimeoutRetry(UFS_BLOCK_OPEN_TIMEOUT_MS, retryInterval);


Expand All @@ -216,8 +152,8 @@ private void openBlock(BlockReadRequest request, Channel channel) throws Excepti
try { try {
BlockReader reader = BlockReader reader =
mWorker.readBlockRemote(request.getSessionId(), request.getId(), lockId); mWorker.readBlockRemote(request.getSessionId(), request.getId(), lockId);
request.setBlockReader(reader); context.setBlockReader(reader);
request.setCounter(MetricsSystem.workerCounter("BytesReadAlluxio")); context.setCounter(MetricsSystem.workerCounter("BytesReadAlluxio"));
mWorker.accessBlock(request.getSessionId(), request.getId()); mWorker.accessBlock(request.getSessionId(), request.getId());
((FileChannel) reader.getChannel()).position(request.getStart()); ((FileChannel) reader.getChannel()).position(request.getStart());
return; return;
Expand All @@ -237,8 +173,8 @@ private void openBlock(BlockReadRequest request, Channel channel) throws Excepti
((UnderFileSystemBlockReader) reader).getUfsMountPointUri(); ((UnderFileSystemBlockReader) reader).getUfsMountPointUri();
String ufsString = MetricsSystem.escape(ufsMountPointUri); String ufsString = MetricsSystem.escape(ufsMountPointUri);
String metricName = String.format("BytesReadUfs-Ufs:%s", ufsString); String metricName = String.format("BytesReadUfs-Ufs:%s", ufsString);
request.setBlockReader(reader); context.setBlockReader(reader);
request.setCounter(MetricsSystem.workerCounter(metricName)); context.setCounter(MetricsSystem.workerCounter(metricName));
return; return;
} catch (Exception e) { } catch (Exception e) {
mWorker.closeUfsBlock(request.getSessionId(), request.getId()); mWorker.closeUfsBlock(request.getSessionId(), request.getId());
Expand Down Expand Up @@ -272,12 +208,12 @@ public BlockReadHandler(ExecutorService executorService, BlockWorker blockWorker
} }


@Override @Override
protected BlockReadRequest createRequestContext(Protocol.ReadRequest request) throws Exception { protected BlockReadRequestContext createRequestContext(Protocol.ReadRequest request) {
return new BlockReadRequest(request); return new BlockReadRequestContext(request);
} }


@Override @Override
protected PacketReader createPacketReader(BlockReadRequest request, Channel channel) { protected PacketReader createPacketReader(BlockReadRequestContext context, Channel channel) {
return new BlockPacketReader(request, channel, mWorker); return new BlockPacketReader(context, channel, mWorker);
} }
} }
@@ -0,0 +1,54 @@
package alluxio.worker.netty;

import alluxio.proto.dataserver.Protocol;

import javax.annotation.concurrent.NotThreadSafe;

/**
* The internal representation of a block read request.
*/
@NotThreadSafe
public final class BlockReadRequest extends ReadRequest {
private final Protocol.OpenUfsBlockOptions mOpenUfsBlockOptions;
private final boolean mPromote;

/**
* Creates an instance of {@link BlockReadRequest}.
*
* @param request the block read request
*/
BlockReadRequest(Protocol.ReadRequest request) {
super(request.getBlockId(), request.getOffset(), request.getOffset() + request.getLength(),
request.getPacketSize());

if (request.hasOpenUfsBlockOptions()) {
mOpenUfsBlockOptions = request.getOpenUfsBlockOptions();
} else {
mOpenUfsBlockOptions = null;
}
mPromote = request.getPromote();
// Note that we do not need to seek to offset since the block worker is created at the offset.
}

/**
* @return if the block read type indicate promote in tier storage
*/
public boolean isPromote() {
return mPromote;
}

/**
* @return the option to open UFS block
*/
public Protocol.OpenUfsBlockOptions getOpenUfsBlockOptions() {
return mOpenUfsBlockOptions;
}

/**
* @return true if the block is persisted in UFS
*/
public boolean isPersisted() {
return mOpenUfsBlockOptions != null && mOpenUfsBlockOptions.hasUfsPath();
}

}
@@ -0,0 +1,34 @@
package alluxio.worker.netty;

import alluxio.proto.dataserver.Protocol;
import alluxio.worker.block.io.BlockReader;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

/**
* Context of {@link BlockReadRequest}.
*/
@NotThreadSafe
public final class BlockReadRequestContext extends ReadRequestContext<BlockReadRequest> {
private BlockReader mBlockReader;

public BlockReadRequestContext(Protocol.ReadRequest request) {
super(new BlockReadRequest(request));
}

/**
* @return block reader
*/
@Nullable
public BlockReader getBlockReader() {
return mBlockReader;
}

/**
* @param blockReader block reader to set
*/
public void setBlockReader(BlockReader blockReader) {
mBlockReader = blockReader;
}
}
@@ -1,21 +1,7 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.worker.netty; package alluxio.worker.netty;


import alluxio.util.IdUtils; import alluxio.util.IdUtils;


import com.codahale.metrics.Counter;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;


/** /**
Expand All @@ -28,7 +14,6 @@ class ReadRequest {
private final long mEnd; private final long mEnd;
private final long mPacketSize; private final long mPacketSize;
private final long mSessionId; private final long mSessionId;
private Counter mCounter;


ReadRequest(long id, long start, long end, long packetSize) { ReadRequest(long id, long start, long end, long packetSize) {
mId = id; mId = id;
Expand Down Expand Up @@ -72,19 +57,4 @@ public long getEnd() {
public long getPacketSize() { public long getPacketSize() {
return mPacketSize; return mPacketSize;
} }

/**
* @return counter
*/
@Nullable
public Counter getCounter() {
return mCounter;
}

/**
* @param counter counter to set
*/
public void setCounter(Counter counter) {
mCounter = counter;
}
} }

0 comments on commit f065de1

Please sign in to comment.