Skip to content

Commit

Permalink
Fix bugs and add more unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aaudiber committed Nov 27, 2017
1 parent fa61234 commit 2d32f6d
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 35 deletions.
Expand Up @@ -42,6 +42,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -159,11 +160,12 @@ public BlockInStream getInStream(long blockId, Protocol.OpenUfsBlockOptions open
List<TieredIdentity> locations = blockInfo.getLocations().stream()
.map(location -> location.getWorkerAddress().getTieredIdentity())
.collect(Collectors.toList());
Collections.shuffle(locations);
Optional<TieredIdentity> nearest = mTieredIdentity.nearest(locations);
if (nearest.isPresent()) {
address = blockInfo.getLocations().stream()
.map(BlockLocation::getWorkerAddress)
.filter(a -> a.getTieredIdentity() == nearest.get())
.filter(a -> a.getTieredIdentity().equals(nearest.get()))
.findFirst().get();
if (mTieredIdentity.getTier(0).getTierName().equals(Constants.LOCALITY_NODE)
&& mTieredIdentity.topTiersMatch(nearest.get())) {
Expand Down
Expand Up @@ -44,18 +44,20 @@
public class BlockInStream extends InputStream implements BoundedStream, Seekable,
PositionedReadable {
private static final Logger LOG = LoggerFactory.getLogger(BlockInStream.class);

/** the source tracking where the block is from. */
public enum BlockInStreamSource {
LOCAL, REMOTE, UFS
}

private final WorkerNetAddress mAddress;
private final BlockInStreamSource mInStreamSource;
/** The id of the block or UFS file to which this instream provides access. */
private final long mId;
/** The size in bytes of the block. */
private final long mLength;

private final byte[] mSingleByte = new byte[1];
private final BlockInStreamSource mInStreamSource;

/** Current position of the stream, relative to the start of the block. */
private long mPos = 0;
Expand Down Expand Up @@ -125,7 +127,7 @@ private static BlockInStream createLocalBlockInStream(FileSystemContext context,
long packetSize = Configuration.getBytes(PropertyKey.USER_LOCAL_READER_PACKET_SIZE_BYTES);
return new BlockInStream(
new LocalFilePacketReader.Factory(context, address, blockId, packetSize, options),
BlockInStreamSource.LOCAL, blockId, length);
address, BlockInStreamSource.LOCAL, blockId, length);
}

/**
Expand All @@ -146,23 +148,26 @@ private static BlockInStream createNettyBlockInStream(FileSystemContext context,
Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_READER_PACKET_SIZE_BYTES);
PacketReader.Factory factory = new NettyPacketReader.Factory(context, address,
readRequestPartial.toBuilder().setPacketSize(packetSize).buildPartial(), options);
return new BlockInStream(factory, blockSource, readRequestPartial.getBlockId(), blockSize);
return new BlockInStream(factory, address, blockSource, readRequestPartial.getBlockId(),
blockSize);
}

/**
* Creates an instance of {@link BlockInStream}.
*
* @param packetReaderFactory the packet reader factory
* @param address the address of the netty data server
* @param blockSource the source location of the block
* @param id the ID (either block ID or UFS file ID)
* @param length the length
*/
protected BlockInStream(PacketReader.Factory packetReaderFactory, BlockInStreamSource blockSource,
long id, long length) {
protected BlockInStream(PacketReader.Factory packetReaderFactory, WorkerNetAddress address,
BlockInStreamSource blockSource, long id, long length) {
mPacketReaderFactory = packetReaderFactory;
mAddress = address;
mInStreamSource = blockSource;
mId = id;
mLength = length;
mInStreamSource = blockSource;
}

@Override
Expand Down Expand Up @@ -342,6 +347,13 @@ private void checkIfClosed() {
Preconditions.checkState(!mClosed, PreconditionMessage.ERR_CLOSED_BLOCK_IN_STREAM);
}

/**
* @return the address of the data server
*/
public WorkerNetAddress getAddress() {
return mAddress;
}

/**
* @return the source of the block location
*/
Expand Down
Expand Up @@ -43,6 +43,7 @@ public class BlockOutStream extends OutputStream implements BoundedStream, Cance
private final Closer mCloser;
/** Length of the stream. If unknown, set to Long.MAX_VALUE. */
private final long mLength;
private final WorkerNetAddress mAddress;
private ByteBuf mCurrentPacket = null;

private final List<PacketWriter> mPacketWriters;
Expand All @@ -62,18 +63,20 @@ public static BlockOutStream create(FileSystemContext context, long blockId, lon
WorkerNetAddress address, OutStreamOptions options) throws IOException {
PacketWriter packetWriter =
PacketWriter.Factory.create(context, blockId, blockSize, address, options);
return new BlockOutStream(packetWriter, blockSize);
return new BlockOutStream(packetWriter, blockSize, address);
}

/**
* Constructs a new {@link BlockOutStream} with only one {@link PacketWriter}.
*
* @param packetWriter the packet writer
* @param length the length of the stream
* @param address the Alluxio worker address
*/
protected BlockOutStream(PacketWriter packetWriter, long length) {
protected BlockOutStream(PacketWriter packetWriter, long length, WorkerNetAddress address) {
mCloser = Closer.create();
mLength = length;
mAddress = address;
mPacketWriters = new ArrayList<>(1);
mPacketWriters.add(packetWriter);
mCloser.register(packetWriter);
Expand Down Expand Up @@ -170,6 +173,13 @@ public void close() throws IOException {
}
}

/**
* @return the worker address for this stream
*/
public WorkerNetAddress getAddress() {
return mAddress;
}

/**
* Updates the current packet.
*
Expand Down
Expand Up @@ -39,15 +39,15 @@ public class UnderFileSystemFileOutStream extends BlockOutStream {
public static UnderFileSystemFileOutStream create(FileSystemContext context,
WorkerNetAddress address, OutStreamOptions options) throws IOException {
return new UnderFileSystemFileOutStream(NettyPacketWriter.create(context, address,
ID_UNUSED, Long.MAX_VALUE, Protocol.RequestType.UFS_FILE, options));
ID_UNUSED, Long.MAX_VALUE, Protocol.RequestType.UFS_FILE, options), address);
}

/**
* Constructs a new {@link UnderFileSystemFileOutStream} with only one {@link PacketWriter}.
*
* @param packetWriter the packet writer
*/
protected UnderFileSystemFileOutStream(PacketWriter packetWriter) {
super(packetWriter, Long.MAX_VALUE);
protected UnderFileSystemFileOutStream(PacketWriter packetWriter, WorkerNetAddress address) {
super(packetWriter, Long.MAX_VALUE, address);
}
}
Expand Up @@ -83,7 +83,7 @@ public WorkerNetAddress getWorkerForNextBlock(Iterable<BlockWorkerInfo> workerIn
}
// Map back to the worker with the nearest tiered identity.
return candidateWorkers.stream()
.filter(worker -> worker.getNetAddress().getTieredIdentity() == nearest.get())
.filter(worker -> worker.getNetAddress().getTieredIdentity().equals(nearest.get()))
.map(worker -> worker.getNetAddress())
.findFirst().orElse(null);
}
Expand Down
Expand Up @@ -12,9 +12,12 @@
package alluxio.client.block;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;

import alluxio.client.WriteType;
import alluxio.client.block.stream.BlockOutStream;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.client.file.policy.FileWriteLocationPolicy;
import alluxio.client.netty.NettyRPC;
Expand All @@ -25,12 +28,17 @@
import alluxio.network.TieredIdentityFactory;
import alluxio.network.protocol.RPCMessageDecoder;
import alluxio.proto.dataserver.Protocol;
import alluxio.proto.dataserver.Protocol.OpenUfsBlockOptions;
import alluxio.resource.DummyCloseableResource;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.util.proto.ProtoMessage;
import alluxio.wire.BlockInfo;
import alluxio.wire.BlockLocation;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
Expand All @@ -46,8 +54,10 @@

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -111,18 +121,18 @@ public void before() throws Exception {
mPipeline = PowerMockito.mock(ChannelPipeline.class);

mContext = PowerMockito.mock(FileSystemContext.class);
Mockito.when(mContext.acquireBlockMasterClientResource())
when(mContext.acquireBlockMasterClientResource())
.thenReturn(new DummyCloseableResource<>(mMasterClient));
mLocalAddr = new WorkerNetAddress().setHost(NetworkAddressUtils.getLocalHostName());

mBlockStore = new AlluxioBlockStore(mContext,
TieredIdentityFactory.fromString("node=" + WORKER_HOSTNAME_LOCAL));

Mockito.when(mContext.acquireNettyChannel(Mockito.any(WorkerNetAddress.class)))
when(mContext.acquireNettyChannel(Mockito.any(WorkerNetAddress.class)))
.thenReturn(mChannel);
Mockito.when(mChannel.pipeline()).thenReturn(mPipeline);
Mockito.when(mPipeline.last()).thenReturn(new RPCMessageDecoder());
Mockito.when(mPipeline.addLast(Mockito.any(ChannelHandler.class))).thenReturn(mPipeline);
when(mChannel.pipeline()).thenReturn(mPipeline);
when(mPipeline.last()).thenReturn(new RPCMessageDecoder());
when(mPipeline.addLast(Mockito.any(ChannelHandler.class))).thenReturn(mPipeline);
}

@Test
Expand Down Expand Up @@ -170,24 +180,82 @@ public void getOutStreamLocal() throws Exception {
ProtoMessage message = new ProtoMessage(
Protocol.LocalBlockCreateResponse.newBuilder().setPath(file.getAbsolutePath()).build());
PowerMockito.mockStatic(NettyRPC.class);
Mockito.when(NettyRPC.call(Mockito.any(NettyRPCContext.class), Mockito.any(ProtoMessage.class)))
when(NettyRPC.call(Mockito.any(NettyRPCContext.class), Mockito.any(ProtoMessage.class)))
.thenReturn(message);

OutStreamOptions options = OutStreamOptions.defaults().setBlockSizeBytes(BLOCK_LENGTH)
.setLocationPolicy(new MockFileWriteLocationPolicy(
Lists.newArrayList(WORKER_NET_ADDRESS_LOCAL)))
.setWriteType(WriteType.MUST_CACHE);
OutputStream stream = mBlockStore.getOutStream(BLOCK_ID, BLOCK_LENGTH, options);
assertEquals(alluxio.client.block.stream.BlockOutStream.class, stream.getClass());
BlockOutStream stream = mBlockStore.getOutStream(BLOCK_ID, BLOCK_LENGTH, options);
assertEquals(WORKER_NET_ADDRESS_LOCAL, stream.getAddress());
}

@Test
public void getOutStreamRemote() throws Exception {
WorkerNetAddress worker1 = new WorkerNetAddress().setHost("worker1");
WorkerNetAddress worker2 = new WorkerNetAddress().setHost("worker2");
OutStreamOptions options = OutStreamOptions.defaults().setBlockSizeBytes(BLOCK_LENGTH)
.setLocationPolicy(new MockFileWriteLocationPolicy(
Lists.newArrayList(WORKER_NET_ADDRESS_REMOTE)))
.setLocationPolicy(new MockFileWriteLocationPolicy(Arrays.asList(worker1, worker2)))
.setWriteType(WriteType.MUST_CACHE);
OutputStream stream = mBlockStore.getOutStream(BLOCK_ID, BLOCK_LENGTH, options);
assertEquals(alluxio.client.block.stream.BlockOutStream.class, stream.getClass());
BlockOutStream stream1 = mBlockStore.getOutStream(BLOCK_ID, BLOCK_LENGTH, options);
assertEquals(worker1, stream1.getAddress());
BlockOutStream stream2 = mBlockStore.getOutStream(BLOCK_ID, BLOCK_LENGTH, options);
assertEquals(worker2, stream2.getAddress());
}

@Test
public void getInStreamUfs() throws Exception {
WorkerNetAddress worker1 = new WorkerNetAddress().setHost("worker1");
WorkerNetAddress worker2 = new WorkerNetAddress().setHost("worker2");
InStreamOptions options = InStreamOptions.defaults()
.setCacheLocationPolicy(new MockFileWriteLocationPolicy(Arrays.asList(worker1, worker2)));
when(mMasterClient.getBlockInfo(BLOCK_ID)).thenReturn(new BlockInfo());
when(mMasterClient.getWorkerInfoList()).thenReturn(
Arrays.asList(new WorkerInfo().setAddress(worker1), new WorkerInfo().setAddress(worker2)));

// Location policy chooses worker1 first.
assertEquals(worker1, mBlockStore
.getInStream(BLOCK_ID, OpenUfsBlockOptions.getDefaultInstance(), options).getAddress());
// Location policy chooses worker2 second.
assertEquals(worker2, mBlockStore
.getInStream(BLOCK_ID, OpenUfsBlockOptions.getDefaultInstance(), options).getAddress());
}

@Test
public void getInStreamLocal() throws Exception {
WorkerNetAddress remote = new WorkerNetAddress().setHost("remote");
WorkerNetAddress local = new WorkerNetAddress().setHost(WORKER_HOSTNAME_LOCAL);

// Mock away Netty usage.
ProtoMessage message = new ProtoMessage(
Protocol.LocalBlockOpenResponse.newBuilder().setPath("/tmp").build());
PowerMockito.mockStatic(NettyRPC.class);
when(NettyRPC.call(Mockito.any(NettyRPCContext.class), Mockito.any(ProtoMessage.class)))
.thenReturn(message);

when(mMasterClient.getBlockInfo(BLOCK_ID)).thenReturn(
new BlockInfo().setLocations(Arrays.asList(new BlockLocation().setWorkerAddress(remote),
new BlockLocation().setWorkerAddress(local))));
assertEquals(local, mBlockStore
.getInStream(BLOCK_ID, OpenUfsBlockOptions.getDefaultInstance(), InStreamOptions.defaults())
.getAddress());
}

@Test
public void getInStreamRemote() throws Exception {
WorkerNetAddress remote1 = new WorkerNetAddress().setHost("remote1");
WorkerNetAddress remote2 = new WorkerNetAddress().setHost("remote2");

when(mMasterClient.getBlockInfo(BLOCK_ID)).thenReturn(
new BlockInfo().setLocations(Arrays.asList(new BlockLocation().setWorkerAddress(remote1),
new BlockLocation().setWorkerAddress(remote2))));
// We should sometimes get remote1 and sometimes get remote2.
Set<WorkerNetAddress> results = new HashSet<>();
for (int i = 0; i < 40; i++) {
results.add(mBlockStore.getInStream(BLOCK_ID, OpenUfsBlockOptions.getDefaultInstance(),
InStreamOptions.defaults()).getAddress());
}
assertEquals(Sets.newHashSet(remote1, remote2), results);
}
}
Expand Up @@ -26,7 +26,7 @@ public class BlockOutStreamTest {
@Test
public void packetWriteException() throws Exception {
PacketWriter writer = new FailingTestPacketWriter(ByteBuffer.allocate(PACKET_SIZE));
BlockOutStream bos = new BlockOutStream(writer, PACKET_SIZE);
BlockOutStream bos = new BlockOutStream(writer, PACKET_SIZE, null);
try {
bos.write(new byte[PACKET_SIZE]);
Assert.fail("Expected write to throw an exception.");
Expand Down
Expand Up @@ -11,6 +11,8 @@

package alluxio.client.block.stream;

import alluxio.wire.WorkerNetAddress;

import java.io.IOException;

/**
Expand All @@ -24,7 +26,7 @@ public class TestBlockInStream extends BlockInStream {

public TestBlockInStream(byte[] data, long id, long length, boolean shortCircuit,
BlockInStreamSource source) {
super(new Factory(data, shortCircuit), source, id, length);
super(new Factory(data, shortCircuit), new WorkerNetAddress(), source, id, length);
mBytesRead = 0;
}

Expand Down
Expand Up @@ -30,7 +30,7 @@ public class TestBlockOutStream extends BlockOutStream {
* @param blockSize the block size
*/
public TestBlockOutStream(ByteBuffer data, long blockSize) {
super(new TestPacketWriter(data), blockSize);
super(new TestPacketWriter(data), blockSize, null);
mData = data;
mClosed = false;
mCanceled = false;
Expand Down
Expand Up @@ -29,7 +29,7 @@ public class TestUnderFileSystemFileOutStream extends UnderFileSystemFileOutStre
* @param data the data to test
*/
public TestUnderFileSystemFileOutStream(ByteBuffer data) {
super(new TestPacketWriter(data));
super(new TestPacketWriter(data), null);
mData = data;
mClosed = false;
mCanceled = false;
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/main/java/alluxio/PropertyKey.java
Expand Up @@ -2412,7 +2412,7 @@ private Name() {} // prevent instantiation
@ThreadSafe
public enum Template {
LOCALITY_TIER("alluxio.locality.%s", "alluxio\\.locality\\.(\\w+)"),
LOCALITY_TIER_STRICT("alluxio.locality.%s.wait", "alluxio\\.locality\\.(\\w+)\\.strict"),
LOCALITY_TIER_STRICT("alluxio.locality.%s.strict", "alluxio\\.locality\\.(\\w+)\\.strict"),
MASTER_JOURNAL_UFS_OPTION("alluxio.master.journal.ufs.option",
"alluxio\\.master\\.journal\\.ufs\\.option"),
MASTER_JOURNAL_UFS_OPTION_PROPERTY("alluxio.master.journal.ufs.option.%s",
Expand Down

0 comments on commit 2d32f6d

Please sign in to comment.