diff --git a/core/client/fs/src/main/java/alluxio/client/block/AlluxioBlockStore.java b/core/client/fs/src/main/java/alluxio/client/block/AlluxioBlockStore.java index 980a41337d9f..16cdc86406fe 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/AlluxioBlockStore.java +++ b/core/client/fs/src/main/java/alluxio/client/block/AlluxioBlockStore.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -159,11 +160,12 @@ public BlockInStream getInStream(long blockId, Protocol.OpenUfsBlockOptions open List locations = blockInfo.getLocations().stream() .map(location -> location.getWorkerAddress().getTieredIdentity()) .collect(Collectors.toList()); + Collections.shuffle(locations); Optional nearest = mTieredIdentity.nearest(locations); if (nearest.isPresent()) { address = blockInfo.getLocations().stream() .map(BlockLocation::getWorkerAddress) - .filter(a -> a.getTieredIdentity() == nearest.get()) + .filter(a -> a.getTieredIdentity().equals(nearest.get())) .findFirst().get(); if (mTieredIdentity.getTier(0).getTierName().equals(Constants.LOCALITY_NODE) && mTieredIdentity.topTiersMatch(nearest.get())) { diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/BlockInStream.java b/core/client/fs/src/main/java/alluxio/client/block/stream/BlockInStream.java index d47f982d8d74..c768269d3b65 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/BlockInStream.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/BlockInStream.java @@ -44,18 +44,20 @@ public class BlockInStream extends InputStream implements BoundedStream, Seekable, PositionedReadable { private static final Logger LOG = LoggerFactory.getLogger(BlockInStream.class); + /** the source tracking where the block is from. */ public enum BlockInStreamSource { LOCAL, REMOTE, UFS } + private final WorkerNetAddress mAddress; + private final BlockInStreamSource mInStreamSource; /** The id of the block or UFS file to which this instream provides access. */ private final long mId; /** The size in bytes of the block. */ private final long mLength; private final byte[] mSingleByte = new byte[1]; - private final BlockInStreamSource mInStreamSource; /** Current position of the stream, relative to the start of the block. */ private long mPos = 0; @@ -125,7 +127,7 @@ private static BlockInStream createLocalBlockInStream(FileSystemContext context, long packetSize = Configuration.getBytes(PropertyKey.USER_LOCAL_READER_PACKET_SIZE_BYTES); return new BlockInStream( new LocalFilePacketReader.Factory(context, address, blockId, packetSize, options), - BlockInStreamSource.LOCAL, blockId, length); + address, BlockInStreamSource.LOCAL, blockId, length); } /** @@ -146,23 +148,26 @@ private static BlockInStream createNettyBlockInStream(FileSystemContext context, Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_READER_PACKET_SIZE_BYTES); PacketReader.Factory factory = new NettyPacketReader.Factory(context, address, readRequestPartial.toBuilder().setPacketSize(packetSize).buildPartial(), options); - return new BlockInStream(factory, blockSource, readRequestPartial.getBlockId(), blockSize); + return new BlockInStream(factory, address, blockSource, readRequestPartial.getBlockId(), + blockSize); } /** * Creates an instance of {@link BlockInStream}. * * @param packetReaderFactory the packet reader factory + * @param address the address of the netty data server * @param blockSource the source location of the block * @param id the ID (either block ID or UFS file ID) * @param length the length */ - protected BlockInStream(PacketReader.Factory packetReaderFactory, BlockInStreamSource blockSource, - long id, long length) { + protected BlockInStream(PacketReader.Factory packetReaderFactory, WorkerNetAddress address, + BlockInStreamSource blockSource, long id, long length) { mPacketReaderFactory = packetReaderFactory; + mAddress = address; + mInStreamSource = blockSource; mId = id; mLength = length; - mInStreamSource = blockSource; } @Override @@ -342,6 +347,13 @@ private void checkIfClosed() { Preconditions.checkState(!mClosed, PreconditionMessage.ERR_CLOSED_BLOCK_IN_STREAM); } + /** + * @return the address of the data server + */ + public WorkerNetAddress getAddress() { + return mAddress; + } + /** * @return the source of the block location */ diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/BlockOutStream.java b/core/client/fs/src/main/java/alluxio/client/block/stream/BlockOutStream.java index 26a380de053b..70108e63125b 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/BlockOutStream.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/BlockOutStream.java @@ -43,6 +43,7 @@ public class BlockOutStream extends OutputStream implements BoundedStream, Cance private final Closer mCloser; /** Length of the stream. If unknown, set to Long.MAX_VALUE. */ private final long mLength; + private final WorkerNetAddress mAddress; private ByteBuf mCurrentPacket = null; private final List mPacketWriters; @@ -62,7 +63,7 @@ public static BlockOutStream create(FileSystemContext context, long blockId, lon WorkerNetAddress address, OutStreamOptions options) throws IOException { PacketWriter packetWriter = PacketWriter.Factory.create(context, blockId, blockSize, address, options); - return new BlockOutStream(packetWriter, blockSize); + return new BlockOutStream(packetWriter, blockSize, address); } /** @@ -70,10 +71,12 @@ public static BlockOutStream create(FileSystemContext context, long blockId, lon * * @param packetWriter the packet writer * @param length the length of the stream + * @param address the Alluxio worker address */ - protected BlockOutStream(PacketWriter packetWriter, long length) { + protected BlockOutStream(PacketWriter packetWriter, long length, WorkerNetAddress address) { mCloser = Closer.create(); mLength = length; + mAddress = address; mPacketWriters = new ArrayList<>(1); mPacketWriters.add(packetWriter); mCloser.register(packetWriter); @@ -170,6 +173,13 @@ public void close() throws IOException { } } + /** + * @return the worker address for this stream + */ + public WorkerNetAddress getAddress() { + return mAddress; + } + /** * Updates the current packet. * diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/UnderFileSystemFileOutStream.java b/core/client/fs/src/main/java/alluxio/client/block/stream/UnderFileSystemFileOutStream.java index f319f872f280..6d99dc74af04 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/UnderFileSystemFileOutStream.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/UnderFileSystemFileOutStream.java @@ -39,7 +39,7 @@ public class UnderFileSystemFileOutStream extends BlockOutStream { public static UnderFileSystemFileOutStream create(FileSystemContext context, WorkerNetAddress address, OutStreamOptions options) throws IOException { return new UnderFileSystemFileOutStream(NettyPacketWriter.create(context, address, - ID_UNUSED, Long.MAX_VALUE, Protocol.RequestType.UFS_FILE, options)); + ID_UNUSED, Long.MAX_VALUE, Protocol.RequestType.UFS_FILE, options), address); } /** @@ -47,7 +47,7 @@ public static UnderFileSystemFileOutStream create(FileSystemContext context, * * @param packetWriter the packet writer */ - protected UnderFileSystemFileOutStream(PacketWriter packetWriter) { - super(packetWriter, Long.MAX_VALUE); + protected UnderFileSystemFileOutStream(PacketWriter packetWriter, WorkerNetAddress address) { + super(packetWriter, Long.MAX_VALUE, address); } } diff --git a/core/client/fs/src/main/java/alluxio/client/file/policy/LocalFirstPolicy.java b/core/client/fs/src/main/java/alluxio/client/file/policy/LocalFirstPolicy.java index 4ca959fb78a2..1d2c812d2ed1 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/policy/LocalFirstPolicy.java +++ b/core/client/fs/src/main/java/alluxio/client/file/policy/LocalFirstPolicy.java @@ -83,7 +83,7 @@ public WorkerNetAddress getWorkerForNextBlock(Iterable workerIn } // Map back to the worker with the nearest tiered identity. return candidateWorkers.stream() - .filter(worker -> worker.getNetAddress().getTieredIdentity() == nearest.get()) + .filter(worker -> worker.getNetAddress().getTieredIdentity().equals(nearest.get())) .map(worker -> worker.getNetAddress()) .findFirst().orElse(null); } diff --git a/core/client/fs/src/test/java/alluxio/client/block/AlluxioBlockStoreTest.java b/core/client/fs/src/test/java/alluxio/client/block/AlluxioBlockStoreTest.java index 9d20305cb299..8a2211607fe1 100644 --- a/core/client/fs/src/test/java/alluxio/client/block/AlluxioBlockStoreTest.java +++ b/core/client/fs/src/test/java/alluxio/client/block/AlluxioBlockStoreTest.java @@ -12,9 +12,12 @@ package alluxio.client.block; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; import alluxio.client.WriteType; +import alluxio.client.block.stream.BlockOutStream; import alluxio.client.file.FileSystemContext; +import alluxio.client.file.options.InStreamOptions; import alluxio.client.file.options.OutStreamOptions; import alluxio.client.file.policy.FileWriteLocationPolicy; import alluxio.client.netty.NettyRPC; @@ -25,12 +28,17 @@ import alluxio.network.TieredIdentityFactory; import alluxio.network.protocol.RPCMessageDecoder; import alluxio.proto.dataserver.Protocol; +import alluxio.proto.dataserver.Protocol.OpenUfsBlockOptions; import alluxio.resource.DummyCloseableResource; import alluxio.util.network.NetworkAddressUtils; import alluxio.util.proto.ProtoMessage; +import alluxio.wire.BlockInfo; +import alluxio.wire.BlockLocation; +import alluxio.wire.WorkerInfo; import alluxio.wire.WorkerNetAddress; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelPipeline; @@ -46,8 +54,10 @@ import java.io.File; import java.io.IOException; -import java.io.OutputStream; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import javax.annotation.concurrent.ThreadSafe; @@ -111,18 +121,18 @@ public void before() throws Exception { mPipeline = PowerMockito.mock(ChannelPipeline.class); mContext = PowerMockito.mock(FileSystemContext.class); - Mockito.when(mContext.acquireBlockMasterClientResource()) + when(mContext.acquireBlockMasterClientResource()) .thenReturn(new DummyCloseableResource<>(mMasterClient)); mLocalAddr = new WorkerNetAddress().setHost(NetworkAddressUtils.getLocalHostName()); mBlockStore = new AlluxioBlockStore(mContext, TieredIdentityFactory.fromString("node=" + WORKER_HOSTNAME_LOCAL)); - Mockito.when(mContext.acquireNettyChannel(Mockito.any(WorkerNetAddress.class))) + when(mContext.acquireNettyChannel(Mockito.any(WorkerNetAddress.class))) .thenReturn(mChannel); - Mockito.when(mChannel.pipeline()).thenReturn(mPipeline); - Mockito.when(mPipeline.last()).thenReturn(new RPCMessageDecoder()); - Mockito.when(mPipeline.addLast(Mockito.any(ChannelHandler.class))).thenReturn(mPipeline); + when(mChannel.pipeline()).thenReturn(mPipeline); + when(mPipeline.last()).thenReturn(new RPCMessageDecoder()); + when(mPipeline.addLast(Mockito.any(ChannelHandler.class))).thenReturn(mPipeline); } @Test @@ -170,24 +180,82 @@ public void getOutStreamLocal() throws Exception { ProtoMessage message = new ProtoMessage( Protocol.LocalBlockCreateResponse.newBuilder().setPath(file.getAbsolutePath()).build()); PowerMockito.mockStatic(NettyRPC.class); - Mockito.when(NettyRPC.call(Mockito.any(NettyRPCContext.class), Mockito.any(ProtoMessage.class))) + when(NettyRPC.call(Mockito.any(NettyRPCContext.class), Mockito.any(ProtoMessage.class))) .thenReturn(message); OutStreamOptions options = OutStreamOptions.defaults().setBlockSizeBytes(BLOCK_LENGTH) .setLocationPolicy(new MockFileWriteLocationPolicy( Lists.newArrayList(WORKER_NET_ADDRESS_LOCAL))) .setWriteType(WriteType.MUST_CACHE); - OutputStream stream = mBlockStore.getOutStream(BLOCK_ID, BLOCK_LENGTH, options); - assertEquals(alluxio.client.block.stream.BlockOutStream.class, stream.getClass()); + BlockOutStream stream = mBlockStore.getOutStream(BLOCK_ID, BLOCK_LENGTH, options); + assertEquals(WORKER_NET_ADDRESS_LOCAL, stream.getAddress()); } @Test public void getOutStreamRemote() throws Exception { + WorkerNetAddress worker1 = new WorkerNetAddress().setHost("worker1"); + WorkerNetAddress worker2 = new WorkerNetAddress().setHost("worker2"); OutStreamOptions options = OutStreamOptions.defaults().setBlockSizeBytes(BLOCK_LENGTH) - .setLocationPolicy(new MockFileWriteLocationPolicy( - Lists.newArrayList(WORKER_NET_ADDRESS_REMOTE))) + .setLocationPolicy(new MockFileWriteLocationPolicy(Arrays.asList(worker1, worker2))) .setWriteType(WriteType.MUST_CACHE); - OutputStream stream = mBlockStore.getOutStream(BLOCK_ID, BLOCK_LENGTH, options); - assertEquals(alluxio.client.block.stream.BlockOutStream.class, stream.getClass()); + BlockOutStream stream1 = mBlockStore.getOutStream(BLOCK_ID, BLOCK_LENGTH, options); + assertEquals(worker1, stream1.getAddress()); + BlockOutStream stream2 = mBlockStore.getOutStream(BLOCK_ID, BLOCK_LENGTH, options); + assertEquals(worker2, stream2.getAddress()); + } + + @Test + public void getInStreamUfs() throws Exception { + WorkerNetAddress worker1 = new WorkerNetAddress().setHost("worker1"); + WorkerNetAddress worker2 = new WorkerNetAddress().setHost("worker2"); + InStreamOptions options = InStreamOptions.defaults() + .setCacheLocationPolicy(new MockFileWriteLocationPolicy(Arrays.asList(worker1, worker2))); + when(mMasterClient.getBlockInfo(BLOCK_ID)).thenReturn(new BlockInfo()); + when(mMasterClient.getWorkerInfoList()).thenReturn( + Arrays.asList(new WorkerInfo().setAddress(worker1), new WorkerInfo().setAddress(worker2))); + + // Location policy chooses worker1 first. + assertEquals(worker1, mBlockStore + .getInStream(BLOCK_ID, OpenUfsBlockOptions.getDefaultInstance(), options).getAddress()); + // Location policy chooses worker2 second. + assertEquals(worker2, mBlockStore + .getInStream(BLOCK_ID, OpenUfsBlockOptions.getDefaultInstance(), options).getAddress()); + } + + @Test + public void getInStreamLocal() throws Exception { + WorkerNetAddress remote = new WorkerNetAddress().setHost("remote"); + WorkerNetAddress local = new WorkerNetAddress().setHost(WORKER_HOSTNAME_LOCAL); + + // Mock away Netty usage. + ProtoMessage message = new ProtoMessage( + Protocol.LocalBlockOpenResponse.newBuilder().setPath("/tmp").build()); + PowerMockito.mockStatic(NettyRPC.class); + when(NettyRPC.call(Mockito.any(NettyRPCContext.class), Mockito.any(ProtoMessage.class))) + .thenReturn(message); + + when(mMasterClient.getBlockInfo(BLOCK_ID)).thenReturn( + new BlockInfo().setLocations(Arrays.asList(new BlockLocation().setWorkerAddress(remote), + new BlockLocation().setWorkerAddress(local)))); + assertEquals(local, mBlockStore + .getInStream(BLOCK_ID, OpenUfsBlockOptions.getDefaultInstance(), InStreamOptions.defaults()) + .getAddress()); + } + + @Test + public void getInStreamRemote() throws Exception { + WorkerNetAddress remote1 = new WorkerNetAddress().setHost("remote1"); + WorkerNetAddress remote2 = new WorkerNetAddress().setHost("remote2"); + + when(mMasterClient.getBlockInfo(BLOCK_ID)).thenReturn( + new BlockInfo().setLocations(Arrays.asList(new BlockLocation().setWorkerAddress(remote1), + new BlockLocation().setWorkerAddress(remote2)))); + // We should sometimes get remote1 and sometimes get remote2. + Set results = new HashSet<>(); + for (int i = 0; i < 40; i++) { + results.add(mBlockStore.getInStream(BLOCK_ID, OpenUfsBlockOptions.getDefaultInstance(), + InStreamOptions.defaults()).getAddress()); + } + assertEquals(Sets.newHashSet(remote1, remote2), results); } } diff --git a/core/client/fs/src/test/java/alluxio/client/block/stream/BlockOutStreamTest.java b/core/client/fs/src/test/java/alluxio/client/block/stream/BlockOutStreamTest.java index 3f0748a9b9ec..6af17f7a1fff 100644 --- a/core/client/fs/src/test/java/alluxio/client/block/stream/BlockOutStreamTest.java +++ b/core/client/fs/src/test/java/alluxio/client/block/stream/BlockOutStreamTest.java @@ -26,7 +26,7 @@ public class BlockOutStreamTest { @Test public void packetWriteException() throws Exception { PacketWriter writer = new FailingTestPacketWriter(ByteBuffer.allocate(PACKET_SIZE)); - BlockOutStream bos = new BlockOutStream(writer, PACKET_SIZE); + BlockOutStream bos = new BlockOutStream(writer, PACKET_SIZE, null); try { bos.write(new byte[PACKET_SIZE]); Assert.fail("Expected write to throw an exception."); diff --git a/core/client/fs/src/test/java/alluxio/client/block/stream/TestBlockInStream.java b/core/client/fs/src/test/java/alluxio/client/block/stream/TestBlockInStream.java index dd29cf804a0f..27bec91a981d 100644 --- a/core/client/fs/src/test/java/alluxio/client/block/stream/TestBlockInStream.java +++ b/core/client/fs/src/test/java/alluxio/client/block/stream/TestBlockInStream.java @@ -11,6 +11,8 @@ package alluxio.client.block.stream; +import alluxio.wire.WorkerNetAddress; + import java.io.IOException; /** @@ -24,7 +26,7 @@ public class TestBlockInStream extends BlockInStream { public TestBlockInStream(byte[] data, long id, long length, boolean shortCircuit, BlockInStreamSource source) { - super(new Factory(data, shortCircuit), source, id, length); + super(new Factory(data, shortCircuit), new WorkerNetAddress(), source, id, length); mBytesRead = 0; } diff --git a/core/client/fs/src/test/java/alluxio/client/block/stream/TestBlockOutStream.java b/core/client/fs/src/test/java/alluxio/client/block/stream/TestBlockOutStream.java index 9ea2203fb7ea..fa569e03a14c 100644 --- a/core/client/fs/src/test/java/alluxio/client/block/stream/TestBlockOutStream.java +++ b/core/client/fs/src/test/java/alluxio/client/block/stream/TestBlockOutStream.java @@ -30,7 +30,7 @@ public class TestBlockOutStream extends BlockOutStream { * @param blockSize the block size */ public TestBlockOutStream(ByteBuffer data, long blockSize) { - super(new TestPacketWriter(data), blockSize); + super(new TestPacketWriter(data), blockSize, null); mData = data; mClosed = false; mCanceled = false; diff --git a/core/client/fs/src/test/java/alluxio/client/block/stream/TestUnderFileSystemFileOutStream.java b/core/client/fs/src/test/java/alluxio/client/block/stream/TestUnderFileSystemFileOutStream.java index 4b0776cb7c5d..134807b1f399 100644 --- a/core/client/fs/src/test/java/alluxio/client/block/stream/TestUnderFileSystemFileOutStream.java +++ b/core/client/fs/src/test/java/alluxio/client/block/stream/TestUnderFileSystemFileOutStream.java @@ -29,7 +29,7 @@ public class TestUnderFileSystemFileOutStream extends UnderFileSystemFileOutStre * @param data the data to test */ public TestUnderFileSystemFileOutStream(ByteBuffer data) { - super(new TestPacketWriter(data)); + super(new TestPacketWriter(data), null); mData = data; mClosed = false; mCanceled = false; diff --git a/core/common/src/main/java/alluxio/PropertyKey.java b/core/common/src/main/java/alluxio/PropertyKey.java index 1eeea8a5e315..f1f49fc9f660 100644 --- a/core/common/src/main/java/alluxio/PropertyKey.java +++ b/core/common/src/main/java/alluxio/PropertyKey.java @@ -2412,7 +2412,7 @@ private Name() {} // prevent instantiation @ThreadSafe public enum Template { LOCALITY_TIER("alluxio.locality.%s", "alluxio\\.locality\\.(\\w+)"), - LOCALITY_TIER_STRICT("alluxio.locality.%s.wait", "alluxio\\.locality\\.(\\w+)\\.strict"), + LOCALITY_TIER_STRICT("alluxio.locality.%s.strict", "alluxio\\.locality\\.(\\w+)\\.strict"), MASTER_JOURNAL_UFS_OPTION("alluxio.master.journal.ufs.option", "alluxio\\.master\\.journal\\.ufs\\.option"), MASTER_JOURNAL_UFS_OPTION_PROPERTY("alluxio.master.journal.ufs.option.%s", diff --git a/core/common/src/main/java/alluxio/wire/WorkerNetAddress.java b/core/common/src/main/java/alluxio/wire/WorkerNetAddress.java index 6c4f560c4ff7..23ebfb0b9271 100644 --- a/core/common/src/main/java/alluxio/wire/WorkerNetAddress.java +++ b/core/common/src/main/java/alluxio/wire/WorkerNetAddress.java @@ -21,7 +21,6 @@ import java.io.Serializable; import java.util.Arrays; -import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; /** @@ -102,9 +101,11 @@ public String getDomainSocketPath() { /** * @return the tiered identity */ - @Nullable public TieredIdentity getTieredIdentity() { - return mTieredIdentity; + if (mTieredIdentity != null) { + return mTieredIdentity; + } + return new TieredIdentity(Arrays.asList(new LocalityTier(Constants.LOCALITY_NODE, mHost))); } /** diff --git a/core/common/src/test/java/alluxio/PropertyKeyTest.java b/core/common/src/test/java/alluxio/PropertyKeyTest.java index 37fcc65b6a50..be98a35b99c5 100644 --- a/core/common/src/test/java/alluxio/PropertyKeyTest.java +++ b/core/common/src/test/java/alluxio/PropertyKeyTest.java @@ -12,9 +12,10 @@ package alluxio; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import alluxio.PropertyKey.Template; import alluxio.exception.ExceptionMessage; import org.junit.Assert; @@ -228,4 +229,15 @@ public void templateMatches() throws Exception { "alluxio.master.mount.table.alluxio")); } + @Test + public void localityTemplates() throws Exception { + assertTrue(PropertyKey.isValid("alluxio.locality.node")); + assertTrue(PropertyKey.isValid("alluxio.locality.node.strict")); + assertTrue(PropertyKey.isValid("alluxio.locality.custom")); + assertTrue(PropertyKey.isValid("alluxio.locality.custom.strict")); + + assertEquals("alluxio.locality.custom", Template.LOCALITY_TIER.format("custom").toString()); + assertEquals("alluxio.locality.custom.strict", + Template.LOCALITY_TIER_STRICT.format("custom").toString()); + } }