Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into log-collect
Browse files Browse the repository at this point in the history
  • Loading branch information
riversand9 committed Aug 28, 2017
2 parents 9c3cdb5 + 825e48e commit 6226329
Show file tree
Hide file tree
Showing 14 changed files with 332 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,57 +66,21 @@ public class BlockOutStream extends OutputStream implements BoundedStream, Cance
*/
public static BlockOutStream create(FileSystemContext context, long blockId, long blockSize,
WorkerNetAddress address, OutStreamOptions options) throws IOException {
PacketWriter packetWriter;
if (CommonUtils.isLocalHost(address) && Configuration
.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED) && !NettyUtils
.isDomainSocketSupported(address)) {
LOG.info("Creating short circuit output stream for block {} @ {}", blockId, address);
return createLocalBlockOutStream(context, address, blockId, blockSize, options);
packetWriter =
LocalFilePacketWriter.create(context, address, blockId, options);
} else {
Protocol.WriteRequest writeRequestPartial =
Protocol.WriteRequest.newBuilder().setId(blockId).setTier(options.getWriteTier())
.setType(Protocol.RequestType.ALLUXIO_BLOCK).buildPartial();
LOG.info("Creating netty output stream for block {} @ {} from client {}", blockId, address,
NetworkAddressUtils.getClientHostName());
return createNettyBlockOutStream(context, address, blockSize, writeRequestPartial, options);
packetWriter = NettyPacketWriter
.create(context, address, blockId, blockSize, Protocol.RequestType.ALLUXIO_BLOCK,
options);
}
}

/**
* Creates a {@link BlockOutStream} that writes to a local file.
*
* @param context the file system context
* @param address the worker network address
* @param id the ID
* @param length the block or file length
* @param options the out stream options
* @return the {@link BlockOutStream} created
*/
private static BlockOutStream createLocalBlockOutStream(FileSystemContext context,
WorkerNetAddress address, long id, long length, OutStreamOptions options) throws IOException {
long packetSize = Configuration.getBytes(PropertyKey.USER_LOCAL_WRITER_PACKET_SIZE_BYTES);
PacketWriter packetWriter =
LocalFilePacketWriter.create(context, address, id, packetSize, options);
return new BlockOutStream(packetWriter, length);
}

/**
* Creates a {@link BlockOutStream} that writes to a netty data server.
*
* @param context the file system context
* @param address the netty data server address
* @param length the block or file length
* @param partialRequest details of the write request which are constant for all requests
* @param options the out stream options
* @return the {@link BlockOutStream} created
*/
private static BlockOutStream createNettyBlockOutStream(FileSystemContext context,
WorkerNetAddress address, long length, Protocol.WriteRequest partialRequest,
OutStreamOptions options) throws IOException {
long packetSize =
Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_WRITER_PACKET_SIZE_BYTES);
PacketWriter packetWriter =
new NettyPacketWriter(context, address, length, partialRequest, packetSize);
return new BlockOutStream(packetWriter, length);
return new BlockOutStream(packetWriter, blockSize);
}

/**
Expand Down Expand Up @@ -214,7 +178,7 @@ public void close() throws IOException {
try {
updateCurrentPacket(true);
} catch (Throwable t) {
mCloser.rethrow(t);
throw mCloser.rethrow(t);
} finally {
mClosed = true;
mCloser.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,37 +46,61 @@ public final class LocalFilePacketWriter implements PacketWriter {
private static final long WRITE_TIMEOUT_MS =
Configuration.getMs(PropertyKey.USER_NETWORK_NETTY_TIMEOUT_MS);

private final FileSystemContext mContext;
private final WorkerNetAddress mAddress;
private final Channel mChannel;
private final LocalFileBlockWriter mWriter;
private final long mBlockId;
private final long mPacketSize;
private final ProtoMessage mCreateRequest;
private final NettyRPCContext mNettyRPCContext;
private final OutStreamOptions mOptions;
private final Closer mCloser = Closer.create();
private final Closer mCloser;

/** The position to write the next byte at. */
private long mPos;
/** The number of bytes reserved on the block worker to hold the block. */
private long mPosReserved;

private boolean mClosed = false;

/**
* Creates an instance of {@link LocalFilePacketWriter}. This requires the block to be locked
* beforehand.
*
* @param context the file system context
* @param address the worker network address
* @param blockId the block ID
* @param packetSize the packet size
* @param options the output stream options
* @return the {@link LocalFilePacketWriter} created
*/
public static LocalFilePacketWriter create(FileSystemContext context, WorkerNetAddress address,
long blockId, long packetSize, OutStreamOptions options) throws IOException {
return new LocalFilePacketWriter(context, address, blockId, packetSize, options);
public static LocalFilePacketWriter create(final FileSystemContext context,
final WorkerNetAddress address,
long blockId, OutStreamOptions options) throws IOException {
long packetSize = Configuration.getBytes(PropertyKey.USER_LOCAL_WRITER_PACKET_SIZE_BYTES);

Closer closer = Closer.create();
try {
final Channel channel = context.acquireNettyChannel(address);
closer.register(new Closeable() {
@Override
public void close() throws IOException {
context.releaseNettyChannel(address, channel);
}
});

ProtoMessage createRequest = new ProtoMessage(
Protocol.LocalBlockCreateRequest.newBuilder().setBlockId(blockId)
.setTier(options.getWriteTier()).setSpaceToReserve(FILE_BUFFER_BYTES).build());
NettyRPCContext nettyRPCContext =
NettyRPCContext.defaults().setChannel(channel).setTimeout(WRITE_TIMEOUT_MS);
ProtoMessage message = NettyRPC.call(nettyRPCContext, createRequest);
Preconditions.checkState(message.isLocalBlockCreateResponse());
LocalFileBlockWriter writer =
closer.register(new LocalFileBlockWriter(message.asLocalBlockCreateResponse().getPath()));
return new LocalFilePacketWriter(blockId, packetSize, options, channel,
writer, createRequest, nettyRPCContext, closer);
} catch (Exception e) {
throw CommonUtils.closeAndRethrow(closer, e);
}
}

@Override
Expand Down Expand Up @@ -114,7 +138,7 @@ public void cancel() throws IOException {
Protocol.LocalBlockCompleteRequest.newBuilder().setBlockId(mBlockId).setCancel(true)
.build()));
} catch (Exception e) {
mCloser.rethrow(e);
throw mCloser.rethrow(e);
} finally {
mCloser.close();
}
Expand Down Expand Up @@ -144,41 +168,19 @@ public void close() throws IOException {
/**
* Creates an instance of {@link LocalFilePacketWriter}.
*
* @param context the file system context
* @param address the worker network address
* @param blockId the block ID
* @param packetSize the packet size
* @param options the output stream options
*/
private LocalFilePacketWriter(final FileSystemContext context, WorkerNetAddress address,
long blockId, long packetSize, OutStreamOptions options) throws IOException {
mContext = context;
mAddress = address;
mChannel = context.acquireNettyChannel(address);
mCloser.register(new Closeable() {
@Override
public void close() throws IOException {
mContext.releaseNettyChannel(mAddress, mChannel);
}
});
private LocalFilePacketWriter(long blockId, long packetSize, OutStreamOptions options,
Channel channel, LocalFileBlockWriter writer,
ProtoMessage createRequest, NettyRPCContext nettyRPCContext, Closer closer) {
mChannel = channel;
mCloser = closer;
mOptions = options;
Protocol.LocalBlockCreateRequest request =
Protocol.LocalBlockCreateRequest.newBuilder().setBlockId(blockId)
.setTier(options.getWriteTier()).setSpaceToReserve(FILE_BUFFER_BYTES).build();

try {
mCreateRequest = new ProtoMessage(request);
mNettyRPCContext =
NettyRPCContext.defaults().setChannel(mChannel).setTimeout(WRITE_TIMEOUT_MS);

ProtoMessage message = NettyRPC.call(mNettyRPCContext, mCreateRequest);
Preconditions.checkState(message.isLocalBlockCreateResponse());
mWriter = mCloser
.register(new LocalFileBlockWriter(message.asLocalBlockCreateResponse().getPath()));
} catch (Exception e) {
throw CommonUtils.closeAndRethrow(mCloser, e);
}

mWriter = writer;
mCreateRequest = createRequest;
mNettyRPCContext = nettyRPCContext;
mPosReserved += FILE_BUFFER_BYTES;
mBlockId = blockId;
mPacketSize = packetSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.CanceledException;
import alluxio.exception.status.DeadlineExceededException;
Expand Down Expand Up @@ -108,39 +109,54 @@ public final class NettyPacketWriter implements PacketWriter {
private Condition mBufferEmptyOrFailed = mLock.newCondition();

/**
* Creates an instance of {@link NettyPacketWriter}.
*
* @param context the file system context
* @param address the data server address
* @param id the block ID or UFS file ID
* @param id the block or UFS ID
* @param length the length of the block or file to write, set to Long.MAX_VALUE if unknown
* @param tier the target tier
* @param type the request type (block or UFS file)
* @param packetSize the packet size
* @param type type of the write request
* @param options the options of the output stream
* @return an instance of {@link NettyPacketWriter}
*/
public NettyPacketWriter(FileSystemContext context, final WorkerNetAddress address, long id,
long length, int tier, Protocol.RequestType type, long packetSize) throws IOException {
this(context, address, length, Protocol.WriteRequest.newBuilder().setId(id).setTier(tier)
.setType(type).buildPartial(), packetSize);
public static NettyPacketWriter create(FileSystemContext context, WorkerNetAddress address,
long id, long length, Protocol.RequestType type, OutStreamOptions options)
throws IOException {
long packetSize =
Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_WRITER_PACKET_SIZE_BYTES);
Channel nettyChannel = context.acquireNettyChannel(address);
return new NettyPacketWriter(context, address, id, length, packetSize, type, options,
nettyChannel);
}

/**
* Creates an instance of {@link NettyPacketWriter}.
*
* @param context the file system context
* @param address the data server address
* @param id the block or UFS file Id
* @param length the length of the block or file to write, set to Long.MAX_VALUE if unknown
* @param partialRequest details of the write request which are constant for all requests
* @param packetSize the packet size
* @param type type of the write request
* @param options details of the write request which are constant for all requests
* @param channel netty channel
*/
public NettyPacketWriter(FileSystemContext context, final WorkerNetAddress address, long length,
Protocol.WriteRequest partialRequest, long packetSize) throws IOException {
private NettyPacketWriter(FileSystemContext context, final WorkerNetAddress address, long id,
long length, long packetSize, Protocol.RequestType type, OutStreamOptions options,
Channel channel) {
mContext = context;
mAddress = address;
mLength = length;
mPartialRequest = partialRequest;
Protocol.WriteRequest.Builder builder =
Protocol.WriteRequest.newBuilder().setId(id).setTier(options.getWriteTier()).setType(type);
if (type == Protocol.RequestType.UFS_FILE) {
Protocol.CreateUfsFileOptions ufsFileOptions =
Protocol.CreateUfsFileOptions.newBuilder().setUfsPath(options.getUfsPath())
.setOwner(options.getOwner()).setGroup(options.getGroup())
.setMode(options.getMode().toShort()).setMountId(options.getMountId()).build();
builder.setCreateUfsFileOptions(ufsFileOptions);
}
mPartialRequest = builder.buildPartial();
mPacketSize = packetSize;
mChannel = mContext.acquireNettyChannel(address);
mChannel = channel;
mChannel.pipeline().addLast(new PacketWriteHandler());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

package alluxio.client.block.stream;

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.proto.dataserver.Protocol;
Expand All @@ -28,7 +26,7 @@
*/
@NotThreadSafe
public class UnderFileSystemFileOutStream extends BlockOutStream {
private static final int TIER_UNUSED = -1;
private static final int ID_UNUSED = -1;

/**
* Creates an instance of {@link UnderFileSystemFileOutStream} that writes to a UFS file.
Expand All @@ -40,21 +38,12 @@ public class UnderFileSystemFileOutStream extends BlockOutStream {
*/
public static UnderFileSystemFileOutStream create(FileSystemContext context,
WorkerNetAddress address, OutStreamOptions options) throws IOException {
Protocol.CreateUfsFileOptions ufsFileOptions =
Protocol.CreateUfsFileOptions.newBuilder().setUfsPath(options.getUfsPath())
.setOwner(options.getOwner()).setGroup(options.getGroup())
.setMode(options.getMode().toShort()).setMountId(options.getMountId()).build();

long packetSize =
Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_WRITER_PACKET_SIZE_BYTES);
return new UnderFileSystemFileOutStream(new NettyPacketWriter(context, address, Long.MAX_VALUE,
Protocol.WriteRequest.newBuilder().setTier(TIER_UNUSED)
.setType(Protocol.RequestType.UFS_FILE).setCreateUfsFileOptions(ufsFileOptions)
.buildPartial(), packetSize));
return new UnderFileSystemFileOutStream(NettyPacketWriter.create(context, address,
ID_UNUSED, Long.MAX_VALUE, Protocol.RequestType.UFS_FILE, options));
}

/**
* Constructs a new {@link BlockOutStream} with only one {@link PacketWriter}.
* Constructs a new {@link UnderFileSystemFileOutStream} with only one {@link PacketWriter}.
*
* @param packetWriter the packet writer
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@

package alluxio.client.block.stream;

import alluxio.ConfigurationRule;
import alluxio.Constants;
import alluxio.EmbeddedChannels;
import alluxio.PropertyKey;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.network.protocol.RPCProtoMessage;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.DataNettyBufferV2;
Expand All @@ -31,6 +34,7 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
Expand Down Expand Up @@ -64,6 +68,11 @@ public final class NettyPacketWriterTest {
private WorkerNetAddress mAddress;
private EmbeddedChannels.EmbeddedEmptyCtorChannel mChannel;

@Rule
public ConfigurationRule mConfigurationRule =
new ConfigurationRule(PropertyKey.USER_NETWORK_NETTY_WRITER_PACKET_SIZE_BYTES,
String.valueOf(PACKET_SIZE));

@Before
public void before() throws Exception {
mContext = PowerMockito.mock(FileSystemContext.class);
Expand Down Expand Up @@ -165,8 +174,9 @@ public void writeFileManyPackets() throws Exception {
*/
private PacketWriter create(long length) throws Exception {
PacketWriter writer =
new NettyPacketWriter(mContext, mAddress, BLOCK_ID, length, TIER,
Protocol.RequestType.ALLUXIO_BLOCK, PACKET_SIZE);
NettyPacketWriter.create(mContext, mAddress, BLOCK_ID, length,
Protocol.RequestType.ALLUXIO_BLOCK,
OutStreamOptions.defaults().setWriteTier(TIER));
mChannel.finishChannelCreation();
return writer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import java.io.IOException;
import java.util.Set;

import javax.annotation.Nullable;

/**
* A blob store interface to represent the local storage managing and serving all the blocks in the
* local storage.
*/
interface BlockStore extends SessionCleanable {
public interface BlockStore extends SessionCleanable {

/**
* Locks an existing block and guards subsequent reads on this block.
Expand Down Expand Up @@ -129,6 +131,7 @@ BlockMeta getBlockMeta(long sessionId, long blockId, long lockId)
* @param blockId the id of the block
* @return metadata of the block or null if the temp block does not exist
*/
@Nullable
TempBlockMeta getTempBlockMeta(long sessionId, long blockId);

/**
Expand Down

0 comments on commit 6226329

Please sign in to comment.