diff --git a/core/client/fs/src/main/java/alluxio/client/Locatable.java b/core/client/fs/src/main/java/alluxio/client/Locatable.java index 586f02985a48..aa47bf97fe6d 100644 --- a/core/client/fs/src/main/java/alluxio/client/Locatable.java +++ b/core/client/fs/src/main/java/alluxio/client/Locatable.java @@ -11,6 +11,8 @@ package alluxio.client; +import alluxio.wire.WorkerNetAddress; + import java.net.InetSocketAddress; /** @@ -20,7 +22,7 @@ public interface Locatable { /** * @return the network location */ - InetSocketAddress location(); + WorkerNetAddress location(); /** * @return true if it is local diff --git a/core/client/fs/src/main/java/alluxio/client/block/BlockWorkerClient.java b/core/client/fs/src/main/java/alluxio/client/block/BlockWorkerClient.java index cd41911b2988..7d6ce8671c7b 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/BlockWorkerClient.java +++ b/core/client/fs/src/main/java/alluxio/client/block/BlockWorkerClient.java @@ -69,11 +69,6 @@ public static BlockWorkerClient create(BlockWorkerThriftClientPool clientPool, */ void cancelBlock(final long blockId); - /** - * @return the address of the worker's data server - */ - InetSocketAddress getDataServerAddress(); - /** * @return the ID of the session */ diff --git a/core/client/fs/src/main/java/alluxio/client/block/RetryHandlingBlockWorkerClient.java b/core/client/fs/src/main/java/alluxio/client/block/RetryHandlingBlockWorkerClient.java index a3fc2e86ca26..39b0e1a6114e 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/RetryHandlingBlockWorkerClient.java +++ b/core/client/fs/src/main/java/alluxio/client/block/RetryHandlingBlockWorkerClient.java @@ -74,8 +74,6 @@ public final class RetryHandlingBlockWorkerClient private static final AtomicInteger NUM_ACTIVE_SESSIONS = new AtomicInteger(0); private final Long mSessionId; - // This is the address of the data server on the worker. - private final InetSocketAddress mWorkerDataServerAddress; private final WorkerNetAddress mWorkerNetAddress; private final InetSocketAddress mRpcAddress; @@ -107,7 +105,6 @@ private RetryHandlingBlockWorkerClient( mClientHeartbeatPool = clientHeartbeatPool; mWorkerNetAddress = Preconditions.checkNotNull(workerNetAddress, "workerNetAddress"); mRpcAddress = NetworkAddressUtils.getRpcPortSocketAddress(workerNetAddress); - mWorkerDataServerAddress = NetworkAddressUtils.getDataPortSocketAddress(workerNetAddress); mSessionId = sessionId; } @@ -202,11 +199,6 @@ public Void call(BlockWorkerClientService.Client client) throws TException { }); } - @Override - public InetSocketAddress getDataServerAddress() { - return mWorkerDataServerAddress; - } - @Override public long getSessionId() { Preconditions.checkNotNull(mSessionId, "Session ID is accessed when it is not supported"); diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/BlockInStream.java b/core/client/fs/src/main/java/alluxio/client/block/stream/BlockInStream.java index cbda59cf43f3..0034984c83cb 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/BlockInStream.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/BlockInStream.java @@ -32,13 +32,11 @@ import com.google.common.base.Preconditions; import com.google.common.io.Closer; -import io.netty.channel.unix.DomainSocketAddress; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; -import java.net.SocketAddress; import javax.annotation.concurrent.NotThreadSafe; @@ -112,14 +110,8 @@ public static BlockInStream createNettyBlockInStream(long blockId, long blockSiz closer.register(context.createBlockWorkerClient(workerNetAddress)); LockBlockResource lockBlockResource = closer.register(blockWorkerClient.lockBlock(blockId, LockBlockOptions.defaults())); - SocketAddress address; - if (NettyUtils.isDomainSocketSupported(workerNetAddress)) { - address = new DomainSocketAddress(workerNetAddress.getDomainSocketPath()); - } else { - address = blockWorkerClient.getDataServerAddress(); - } PacketInStream inStream = closer.register(PacketInStream - .createNettyPacketInStream(context, address, blockId, + .createNettyPacketInStream(context, workerNetAddress, blockId, lockBlockResource.getResult().getLockId(), blockWorkerClient.getSessionId(), blockSize, false, Protocol.RequestType.ALLUXIO_BLOCK, options)); blockWorkerClient.accessBlock(blockId); @@ -168,15 +160,8 @@ public static BlockInStream createUfsBlockInStream(FileSystemContext context, St LockBlockResult lockBlockResult = closer.register(blockWorkerClient.lockUfsBlock(blockId, lockBlockOptions)).getResult(); PacketInStream inStream; - SocketAddress address; - if (NettyUtils.isDomainSocketSupported(workerNetAddress)) { - address = new DomainSocketAddress(workerNetAddress.getDomainSocketPath()); - } else { - address = blockWorkerClient.getDataServerAddress(); - } if (lockBlockResult.getLockBlockStatus().blockInAlluxio()) { - boolean local = blockWorkerClient.getDataServerAddress().getHostName() - .equals(NetworkAddressUtils.getClientHostName()); + boolean local = workerNetAddress.getHost().equals(NetworkAddressUtils.getClientHostName()); if (local && Configuration.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED) && !NettyUtils.isDomainSocketSupported(workerNetAddress)) { inStream = closer.register(PacketInStream @@ -184,16 +169,16 @@ public static BlockInStream createUfsBlockInStream(FileSystemContext context, St lockBlockResult.getBlockPath(), blockId, blockSize, options)); } else { inStream = closer.register(PacketInStream - .createNettyPacketInStream(context, address, blockId, lockBlockResult.getLockId(), - blockWorkerClient.getSessionId(), blockSize, false, + .createNettyPacketInStream(context, workerNetAddress, blockId, + lockBlockResult.getLockId(), blockWorkerClient.getSessionId(), blockSize, false, Protocol.RequestType.ALLUXIO_BLOCK, options)); } blockWorkerClient.accessBlock(blockId); } else { Preconditions.checkState(lockBlockResult.getLockBlockStatus().ufsTokenAcquired()); inStream = closer.register(PacketInStream - .createNettyPacketInStream(context, address, blockId, lockBlockResult.getLockId(), - blockWorkerClient.getSessionId(), blockSize, + .createNettyPacketInStream(context, workerNetAddress, blockId, + lockBlockResult.getLockId(), blockWorkerClient.getSessionId(), blockSize, !options.getAlluxioStorageType().isStore(), Protocol.RequestType.UFS_BLOCK, options)); } @@ -225,8 +210,8 @@ public int positionedRead(long pos, byte[] b, int off, int len) { } @Override - public InetSocketAddress location() { - return mBlockWorkerClient.getDataServerAddress(); + public WorkerNetAddress location() { + return mBlockWorkerClient.getWorkerNetAddress(); } @Override @@ -258,7 +243,7 @@ protected BlockInStream(PacketInStream inputStream, BlockWorkerClient blockWorke mCloser = closer; mCloser.register(mInputStream); - mLocal = blockWorkerClient.getDataServerAddress().getHostName() + mLocal = blockWorkerClient.getWorkerNetAddress().getHost() .equals(NetworkAddressUtils.getClientHostName()); } } diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/BlockOutStream.java b/core/client/fs/src/main/java/alluxio/client/block/stream/BlockOutStream.java index 640f7c13071e..128b1898e062 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/BlockOutStream.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/BlockOutStream.java @@ -84,16 +84,9 @@ public static BlockOutStream createNettyBlockOutStream(long blockId, long blockS Closer closer = Closer.create(); try { BlockWorkerClient client = closer.register(context.createBlockWorkerClient(workerNetAddress)); - - SocketAddress address; - if (NettyUtils.isDomainSocketSupported(workerNetAddress)) { - address = new DomainSocketAddress(workerNetAddress.getDomainSocketPath()); - } else { - address = client.getDataServerAddress(); - } PacketOutStream outStream = PacketOutStream - .createNettyPacketOutStream(context, address, client.getSessionId(), blockId, blockSize, - Protocol.RequestType.ALLUXIO_BLOCK, options); + .createNettyPacketOutStream(context, workerNetAddress, client.getSessionId(), blockId, + blockSize, Protocol.RequestType.ALLUXIO_BLOCK, options); closer.register(outStream); return new BlockOutStream(outStream, blockId, blockSize, client, options); } catch (RuntimeException e) { diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/NettyPacketReader.java b/core/client/fs/src/main/java/alluxio/client/block/stream/NettyPacketReader.java index b4ba890d6aff..d5a34887fcc8 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/NettyPacketReader.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/NettyPacketReader.java @@ -24,6 +24,7 @@ import alluxio.util.CommonUtils; import alluxio.util.network.NettyUtils; import alluxio.util.proto.ProtoMessage; +import alluxio.wire.WorkerNetAddress; import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; @@ -77,7 +78,7 @@ public final class NettyPacketReader implements PacketReader { private final FileSystemContext mContext; private final Channel mChannel; private final Protocol.RequestType mRequestType; - private final SocketAddress mAddress; + private final WorkerNetAddress mAddress; private final long mId; private final long mStart; private final long mBytesToRead; @@ -125,7 +126,7 @@ public final class NettyPacketReader implements PacketReader { * @param type the request type (block or UFS file) * @param packetSize the packet size */ - private NettyPacketReader(FileSystemContext context, SocketAddress address, long id, + private NettyPacketReader(FileSystemContext context, WorkerNetAddress address, long id, long offset, long len, long lockId, long sessionId, boolean noCache, Protocol.RequestType type, long packetSize) { Preconditions.checkArgument(offset >= 0 && len > 0 && packetSize > 0); @@ -345,7 +346,7 @@ private boolean acceptMessage(Object msg) { */ public static class Factory implements PacketReader.Factory { private final FileSystemContext mContext; - private final SocketAddress mAddress; + private final WorkerNetAddress mAddress; private final long mId; private final long mLockId; private final long mSessionId; @@ -365,7 +366,7 @@ public static class Factory implements PacketReader.Factory { * @param type the request type * @param packetSize the packet size */ - public Factory(FileSystemContext context, SocketAddress address, long id, long lockId, + public Factory(FileSystemContext context, WorkerNetAddress address, long id, long lockId, long sessionId, boolean noCache, Protocol.RequestType type, long packetSize) { mContext = context; mAddress = address; diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/NettyPacketWriter.java b/core/client/fs/src/main/java/alluxio/client/block/stream/NettyPacketWriter.java index 301e7962b379..f060806b2632 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/NettyPacketWriter.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/NettyPacketWriter.java @@ -25,6 +25,7 @@ import alluxio.resource.LockResource; import alluxio.util.CommonUtils; import alluxio.util.proto.ProtoMessage; +import alluxio.wire.WorkerNetAddress; import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; @@ -72,7 +73,7 @@ public final class NettyPacketWriter implements PacketWriter { private final FileSystemContext mContext; private final Channel mChannel; - private final SocketAddress mAddress; + private final WorkerNetAddress mAddress; private final long mLength; private final Protocol.WriteRequest mPartialRequest; private final long mPacketSize; @@ -116,7 +117,7 @@ public final class NettyPacketWriter implements PacketWriter { * @param type the request type (block or UFS file) * @param packetSize the packet size */ - public NettyPacketWriter(FileSystemContext context, final SocketAddress address, long id, + public NettyPacketWriter(FileSystemContext context, final WorkerNetAddress address, long id, long length, long sessionId, int tier, Protocol.RequestType type, long packetSize) { this(context, address, length, Protocol.WriteRequest.newBuilder().setId(id) .setSessionId(sessionId).setTier(tier).setType(type).buildPartial(), packetSize); @@ -131,7 +132,7 @@ public NettyPacketWriter(FileSystemContext context, final SocketAddress address, * @param partialRequest details of the write request which are constant for all requests * @param packetSize the packet size */ - public NettyPacketWriter(FileSystemContext context, final SocketAddress address, long + public NettyPacketWriter(FileSystemContext context, final WorkerNetAddress address, long length, Protocol.WriteRequest partialRequest, long packetSize) { mContext = context; mAddress = address; diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/PacketInStream.java b/core/client/fs/src/main/java/alluxio/client/block/stream/PacketInStream.java index 86d6ab6b38f3..1dfc42ad86e4 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/PacketInStream.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/PacketInStream.java @@ -22,6 +22,7 @@ import alluxio.network.protocol.databuffer.DataBuffer; import alluxio.proto.dataserver.Protocol; import alluxio.util.io.BufferUtils; +import alluxio.wire.WorkerNetAddress; import com.google.common.base.Preconditions; @@ -86,7 +87,7 @@ public static PacketInStream createLocalPacketInStream( * @return the {@link PacketInStream} created */ public static PacketInStream createNettyPacketInStream(FileSystemContext context, - SocketAddress address, long id, long lockId, long sessionId, long length, + WorkerNetAddress address, long id, long lockId, long sessionId, long length, boolean noCache, Protocol.RequestType type, InStreamOptions options) { long packetSize = Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_READER_PACKET_SIZE_BYTES); diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/PacketOutStream.java b/core/client/fs/src/main/java/alluxio/client/block/stream/PacketOutStream.java index b8249d319cd5..3add99e84555 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/PacketOutStream.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/PacketOutStream.java @@ -22,6 +22,7 @@ import alluxio.exception.status.AlluxioStatusException; import alluxio.proto.dataserver.Protocol; import alluxio.util.CommonUtils; +import alluxio.wire.WorkerNetAddress; import com.google.common.base.Preconditions; import com.google.common.io.Closer; @@ -79,7 +80,7 @@ public static PacketOutStream createLocalPacketOutStream(BlockWorkerClient clien * @return the {@link PacketOutStream} created */ public static PacketOutStream createNettyPacketOutStream(FileSystemContext context, - SocketAddress address, long sessionId, long id, long length, + WorkerNetAddress address, long sessionId, long id, long length, Protocol.RequestType type, OutStreamOptions options) { long packetSize = Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_WRITER_PACKET_SIZE_BYTES); @@ -99,7 +100,7 @@ public static PacketOutStream createNettyPacketOutStream(FileSystemContext conte * @return the {@link PacketOutStream} created */ public static PacketOutStream createNettyPacketOutStream(FileSystemContext context, - SocketAddress address, long length, Protocol.WriteRequest partialRequest, + WorkerNetAddress address, long length, Protocol.WriteRequest partialRequest, OutStreamOptions options) { long packetSize = Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_WRITER_PACKET_SIZE_BYTES); diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/UnderFileSystemFileOutStream.java b/core/client/fs/src/main/java/alluxio/client/block/stream/UnderFileSystemFileOutStream.java index 67d767450e5c..c4661cad8cf5 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/UnderFileSystemFileOutStream.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/UnderFileSystemFileOutStream.java @@ -14,6 +14,7 @@ import alluxio.client.file.FileSystemContext; import alluxio.client.file.options.OutStreamOptions; import alluxio.proto.dataserver.Protocol; +import alluxio.wire.WorkerNetAddress; import java.io.FilterOutputStream; import java.io.OutputStream; @@ -35,7 +36,7 @@ public final class UnderFileSystemFileOutStream extends FilterOutputStream { * @param options the options to construct this stream with * @return a new {@link UnderFileSystemFileOutStream} */ - public static OutputStream create(FileSystemContext context, SocketAddress address, + public static OutputStream create(FileSystemContext context, WorkerNetAddress address, OutStreamOptions options) { return new UnderFileSystemFileOutStream(context, address, options); } @@ -49,7 +50,7 @@ public static OutputStream create(FileSystemContext context, SocketAddress addre * @param address the data server address * @param options the out stream options */ - public UnderFileSystemFileOutStream(FileSystemContext context, SocketAddress address, + public UnderFileSystemFileOutStream(FileSystemContext context, WorkerNetAddress address, OutStreamOptions options) { super(PacketOutStream.createNettyPacketOutStream(context, address, Long.MAX_VALUE, Protocol.WriteRequest.newBuilder().setSessionId(-1).setTier(TIER_UNUSED) diff --git a/core/client/fs/src/main/java/alluxio/client/file/FileOutStream.java b/core/client/fs/src/main/java/alluxio/client/file/FileOutStream.java index 2c25bd91dfc2..a99cff6dd893 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/FileOutStream.java +++ b/core/client/fs/src/main/java/alluxio/client/file/FileOutStream.java @@ -27,20 +27,16 @@ import alluxio.metrics.MetricsSystem; import alluxio.resource.CloseableResource; import alluxio.util.CommonUtils; -import alluxio.util.network.NettyUtils; -import alluxio.util.network.NetworkAddressUtils; import alluxio.wire.WorkerNetAddress; import com.codahale.metrics.Counter; import com.google.common.base.Preconditions; import com.google.common.io.Closer; -import io.netty.channel.unix.DomainSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; -import java.net.SocketAddress; import java.util.LinkedList; import java.util.List; @@ -106,14 +102,8 @@ public FileOutStream(AlluxioURI path, OutStreamOptions options, FileSystemContex try { WorkerNetAddress workerNetAddress = // not storing data to Alluxio, so block size is 0 options.getLocationPolicy().getWorkerForNextBlock(mBlockStore.getWorkerInfoList(), 0); - SocketAddress address; - if (NettyUtils.isDomainSocketSupported(workerNetAddress)) { - address = new DomainSocketAddress(workerNetAddress.getDomainSocketPath()); - } else { - address = NetworkAddressUtils.getDataPortSocketAddress(workerNetAddress); - } - mUnderStorageOutputStream = - mCloser.register(UnderFileSystemFileOutStream.create(mContext, address, mOptions)); + mUnderStorageOutputStream = mCloser + .register(UnderFileSystemFileOutStream.create(mContext, workerNetAddress, mOptions)); } catch (AlluxioStatusException e) { CommonUtils.closeQuietly(mCloser); throw e.toIOException(); diff --git a/core/client/fs/src/main/java/alluxio/client/file/FileSystemContext.java b/core/client/fs/src/main/java/alluxio/client/file/FileSystemContext.java index 5f87b1cdd21e..adb2818b51c2 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/FileSystemContext.java +++ b/core/client/fs/src/main/java/alluxio/client/file/FileSystemContext.java @@ -292,10 +292,11 @@ public BlockWorkerClient createBlockWorkerClient(WorkerNetAddress address, * available in the pool, it tries to create a new one. And an exception is thrown if it fails to * create a new one. * - * @param address the network address of the channel + * @param workerNetAddress the network address of the channel * @return the acquired netty channel */ - public Channel acquireNettyChannel(final SocketAddress address) { + public Channel acquireNettyChannel(final WorkerNetAddress workerNetAddress) { + SocketAddress address = NetworkAddressUtils.getDataPortSocketAddress(workerNetAddress); if (!mNettyChannelPools.containsKey(address)) { Bootstrap bs = NettyClient.createClientBootstrap(address); bs.remoteAddress(address); @@ -317,10 +318,11 @@ public Channel acquireNettyChannel(final SocketAddress address) { /** * Releases a netty channel to the channel pools. * - * @param address the address of the channel + * @param workerNetAddress the address of the channel * @param channel the channel to release */ - public void releaseNettyChannel(SocketAddress address, Channel channel) { + public void releaseNettyChannel(WorkerNetAddress workerNetAddress, Channel channel) { + SocketAddress address = NetworkAddressUtils.getDataPortSocketAddress(workerNetAddress); Preconditions.checkArgument(mNettyChannelPools.containsKey(address)); mNettyChannelPools.get(address).release(channel); } diff --git a/core/client/fs/src/test/java/alluxio/client/block/AlluxioBlockStoreTest.java b/core/client/fs/src/test/java/alluxio/client/block/AlluxioBlockStoreTest.java index c69886cc3ca6..0a07371d364b 100644 --- a/core/client/fs/src/test/java/alluxio/client/block/AlluxioBlockStoreTest.java +++ b/core/client/fs/src/test/java/alluxio/client/block/AlluxioBlockStoreTest.java @@ -47,7 +47,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.InetSocketAddress; import java.util.Arrays; import java.util.List; @@ -109,7 +108,6 @@ public WorkerNetAddress getWorkerForNextBlock(Iterable workerIn private AlluxioBlockStore mBlockStore; private Channel mChannel; private ChannelPipeline mPipeline; - private InetSocketAddress mLocalAddr; private FileSystemContext mContext; @Before @@ -123,15 +121,13 @@ public void before() throws Exception { // Mock block store context to return our mock clients Mockito.when(mContext.createBlockWorkerClient(Mockito.any(WorkerNetAddress.class))) .thenReturn(mBlockWorkerClient); - mLocalAddr = new InetSocketAddress(NetworkAddressUtils.getLocalHostName(), 0); - Mockito.when(mBlockWorkerClient.getDataServerAddress()).thenReturn(mLocalAddr); Mockito.when(mContext.acquireBlockMasterClientResource()) .thenReturn(new DummyCloseableResource<>(mMasterClient)); mBlockStore = new AlluxioBlockStore(mContext, WORKER_HOSTNAME_LOCAL); - Mockito.when(mContext.acquireNettyChannel(Mockito.any(InetSocketAddress.class))) + Mockito.when(mContext.acquireNettyChannel(Mockito.any(WorkerNetAddress.class))) .thenReturn(mChannel); Mockito.when(mChannel.pipeline()).thenReturn(mPipeline); Mockito.when(mPipeline.last()).thenReturn(new RPCMessageDecoder()); diff --git a/core/client/fs/src/test/java/alluxio/client/block/stream/BlockInStreamTest.java b/core/client/fs/src/test/java/alluxio/client/block/stream/BlockInStreamTest.java index 8a2cf6c9098f..addb31cb4109 100644 --- a/core/client/fs/src/test/java/alluxio/client/block/stream/BlockInStreamTest.java +++ b/core/client/fs/src/test/java/alluxio/client/block/stream/BlockInStreamTest.java @@ -35,7 +35,6 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.io.Closeable; -import java.net.InetSocketAddress; /** * Unit tests for {@link BlockInStream}. @@ -62,18 +61,14 @@ public void readFromLocal() throws Exception { .thenReturn(lockResource); // Set the data server hostname to match the client hostname. - when(blockWorkerClient.getDataServerAddress()) - .thenReturn(InetSocketAddress.createUnresolved(clientHostname, 0)); BlockInStream stream = BlockInStream.createUfsBlockInStream(context, "ufsPath", blockId, 100, - 0, 0, new WorkerNetAddress(), InStreamOptions.defaults()); + 0, 0, new WorkerNetAddress().setHost(clientHostname), InStreamOptions.defaults()); // The client hostname matches the worker hostname, so the stream should go to a local file. Assert.assertTrue(stream.isShortCircuit()); // Set the data server hostname to not match the client hostname. - when(blockWorkerClient.getDataServerAddress()) - .thenReturn(InetSocketAddress.createUnresolved("remotehost", 0)); stream = BlockInStream.createUfsBlockInStream(context, "ufsPath", blockId, 100, - 0, 0, new WorkerNetAddress(), InStreamOptions.defaults()); + 0, 0, new WorkerNetAddress().setHost("remotehost"), InStreamOptions.defaults()); // The client hostname matches the worker hostname, so the stream should go to a local file. Assert.assertFalse(stream.isShortCircuit()); } diff --git a/core/client/fs/src/test/java/alluxio/client/block/stream/NettyPacketReaderTest.java b/core/client/fs/src/test/java/alluxio/client/block/stream/NettyPacketReaderTest.java index f68916eff34f..4b6e5db30709 100644 --- a/core/client/fs/src/test/java/alluxio/client/block/stream/NettyPacketReaderTest.java +++ b/core/client/fs/src/test/java/alluxio/client/block/stream/NettyPacketReaderTest.java @@ -21,6 +21,7 @@ import alluxio.util.CommonUtils; import alluxio.util.WaitForOptions; import alluxio.util.io.BufferUtils; +import alluxio.wire.WorkerNetAddress; import com.google.common.base.Function; import io.netty.buffer.ByteBuf; @@ -36,7 +37,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.net.InetSocketAddress; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -55,14 +55,14 @@ public final class NettyPacketReaderTest { private static final long LOCK_ID = 3L; private FileSystemContext mContext; - private InetSocketAddress mAddress; + private WorkerNetAddress mAddress; private EmbeddedChannels.EmbeddedEmptyCtorChannel mChannel; private NettyPacketReader.Factory mFactory; @Before public void before() throws Exception { mContext = PowerMockito.mock(FileSystemContext.class); - mAddress = Mockito.mock(InetSocketAddress.class); + mAddress = Mockito.mock(WorkerNetAddress.class); mFactory = new NettyPacketReader.Factory(mContext, mAddress, BLOCK_ID, LOCK_ID, SESSION_ID, false, Protocol.RequestType.ALLUXIO_BLOCK, PACKET_SIZE); diff --git a/core/client/fs/src/test/java/alluxio/client/block/stream/NettyPacketWriterTest.java b/core/client/fs/src/test/java/alluxio/client/block/stream/NettyPacketWriterTest.java index 4083f892ea87..42c74aa92dc9 100644 --- a/core/client/fs/src/test/java/alluxio/client/block/stream/NettyPacketWriterTest.java +++ b/core/client/fs/src/test/java/alluxio/client/block/stream/NettyPacketWriterTest.java @@ -22,6 +22,7 @@ import alluxio.util.ThreadFactoryUtils; import alluxio.util.WaitForOptions; import alluxio.util.io.BufferUtils; +import alluxio.wire.WorkerNetAddress; import com.google.common.base.Function; import io.netty.buffer.ByteBuf; @@ -62,13 +63,13 @@ public final class NettyPacketWriterTest { private static final int TIER = 0; private FileSystemContext mContext; - private InetSocketAddress mAddress; + private WorkerNetAddress mAddress; private EmbeddedChannels.EmbeddedEmptyCtorChannel mChannel; @Before public void before() throws Exception { mContext = PowerMockito.mock(FileSystemContext.class); - mAddress = Mockito.mock(InetSocketAddress.class); + mAddress = Mockito.mock(WorkerNetAddress.class); mChannel = new EmbeddedChannels.EmbeddedEmptyCtorChannel(); PowerMockito.when(mContext.acquireNettyChannel(mAddress)).thenReturn(mChannel); diff --git a/core/client/fs/src/test/java/alluxio/client/block/stream/TestBlockWorkerClient.java b/core/client/fs/src/test/java/alluxio/client/block/stream/TestBlockWorkerClient.java index 580d71203c97..5d9701be3058 100644 --- a/core/client/fs/src/test/java/alluxio/client/block/stream/TestBlockWorkerClient.java +++ b/core/client/fs/src/test/java/alluxio/client/block/stream/TestBlockWorkerClient.java @@ -17,15 +17,10 @@ import alluxio.retry.RetryPolicy; import alluxio.wire.WorkerNetAddress; -import java.net.InetSocketAddress; - /** * A mock {@link BlockWorkerClient} which returns local host for the data server address. */ public class TestBlockWorkerClient implements BlockWorkerClient { - private static final String HOSTNAME = "localhost"; - private static final int PORT = 29998; - @Override public void accessBlock(long blockId) {} @@ -35,11 +30,6 @@ public void cacheBlock(long blockId) {} @Override public void cancelBlock(long blockId) {} - @Override - public InetSocketAddress getDataServerAddress() { - return new InetSocketAddress(HOSTNAME, PORT); - } - @Override public long getSessionId() { return 0; diff --git a/core/client/fs/src/test/java/alluxio/client/file/FileOutStreamTest.java b/core/client/fs/src/test/java/alluxio/client/file/FileOutStreamTest.java index 56c572d71f56..a065617b6b46 100644 --- a/core/client/fs/src/test/java/alluxio/client/file/FileOutStreamTest.java +++ b/core/client/fs/src/test/java/alluxio/client/file/FileOutStreamTest.java @@ -59,7 +59,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -154,7 +153,7 @@ public void flush() { PowerMockito.mockStatic(UnderFileSystemFileOutStream.class); PowerMockito.when( UnderFileSystemFileOutStream.create(any(FileSystemContext.class), - any(InetSocketAddress.class), any(OutStreamOptions.class))).thenReturn( + any(WorkerNetAddress.class), any(OutStreamOptions.class))).thenReturn( mUnderStorageOutputStream); OutStreamOptions options = OutStreamOptions.defaults().setBlockSizeBytes(BLOCK_LENGTH) diff --git a/core/common/src/main/java/alluxio/util/network/NetworkAddressUtils.java b/core/common/src/main/java/alluxio/util/network/NetworkAddressUtils.java index 76921ec0d672..bcdeb22cc102 100644 --- a/core/common/src/main/java/alluxio/util/network/NetworkAddressUtils.java +++ b/core/common/src/main/java/alluxio/util/network/NetworkAddressUtils.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import io.netty.channel.unix.DomainSocketAddress; import org.apache.thrift.transport.TServerSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,7 @@ import java.net.NetworkInterface; import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; @@ -572,15 +574,21 @@ public static InetSocketAddress getRpcPortSocketAddress(WorkerNetAddress netAddr } /** - * Extracts dataPort InetSocketAddress from Alluxio representation of network address. + * Extracts dataPort socket address from Alluxio representation of network address. * * @param netAddress the input network address representation - * @return InetSocketAddress + * @return the socket address */ - public static InetSocketAddress getDataPortSocketAddress(WorkerNetAddress netAddress) { - String host = netAddress.getHost(); - int port = netAddress.getDataPort(); - return new InetSocketAddress(host, port); + public static SocketAddress getDataPortSocketAddress(WorkerNetAddress netAddress) { + SocketAddress address; + if (NettyUtils.isDomainSocketSupported(netAddress)) { + address = new DomainSocketAddress(netAddress.getDomainSocketPath()); + } else { + String host = netAddress.getHost(); + int port = netAddress.getDataPort(); + address = new InetSocketAddress(host, port); + } + return address; } /**