Skip to content

Commit

Permalink
Use WorkerNetAddress instead of socket address
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed May 4, 2017
1 parent e32ec5a commit 5b64bd2
Show file tree
Hide file tree
Showing 19 changed files with 63 additions and 110 deletions.
4 changes: 3 additions & 1 deletion core/client/fs/src/main/java/alluxio/client/Locatable.java
Expand Up @@ -11,6 +11,8 @@

package alluxio.client;

import alluxio.wire.WorkerNetAddress;

import java.net.InetSocketAddress;

/**
Expand All @@ -20,7 +22,7 @@ public interface Locatable {
/**
* @return the network location
*/
InetSocketAddress location();
WorkerNetAddress location();

/**
* @return true if it is local
Expand Down
Expand Up @@ -69,11 +69,6 @@ public static BlockWorkerClient create(BlockWorkerThriftClientPool clientPool,
*/
void cancelBlock(final long blockId);

/**
* @return the address of the worker's data server
*/
InetSocketAddress getDataServerAddress();

/**
* @return the ID of the session
*/
Expand Down
Expand Up @@ -74,8 +74,6 @@ public final class RetryHandlingBlockWorkerClient
private static final AtomicInteger NUM_ACTIVE_SESSIONS = new AtomicInteger(0);

private final Long mSessionId;
// This is the address of the data server on the worker.
private final InetSocketAddress mWorkerDataServerAddress;
private final WorkerNetAddress mWorkerNetAddress;
private final InetSocketAddress mRpcAddress;

Expand Down Expand Up @@ -107,7 +105,6 @@ private RetryHandlingBlockWorkerClient(
mClientHeartbeatPool = clientHeartbeatPool;
mWorkerNetAddress = Preconditions.checkNotNull(workerNetAddress, "workerNetAddress");
mRpcAddress = NetworkAddressUtils.getRpcPortSocketAddress(workerNetAddress);
mWorkerDataServerAddress = NetworkAddressUtils.getDataPortSocketAddress(workerNetAddress);
mSessionId = sessionId;
}

Expand Down Expand Up @@ -202,11 +199,6 @@ public Void call(BlockWorkerClientService.Client client) throws TException {
});
}

@Override
public InetSocketAddress getDataServerAddress() {
return mWorkerDataServerAddress;
}

@Override
public long getSessionId() {
Preconditions.checkNotNull(mSessionId, "Session ID is accessed when it is not supported");
Expand Down
Expand Up @@ -32,13 +32,11 @@

import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import io.netty.channel.unix.DomainSocketAddress;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;

import javax.annotation.concurrent.NotThreadSafe;

Expand Down Expand Up @@ -112,14 +110,8 @@ public static BlockInStream createNettyBlockInStream(long blockId, long blockSiz
closer.register(context.createBlockWorkerClient(workerNetAddress));
LockBlockResource lockBlockResource =
closer.register(blockWorkerClient.lockBlock(blockId, LockBlockOptions.defaults()));
SocketAddress address;
if (NettyUtils.isDomainSocketSupported(workerNetAddress)) {
address = new DomainSocketAddress(workerNetAddress.getDomainSocketPath());
} else {
address = blockWorkerClient.getDataServerAddress();
}
PacketInStream inStream = closer.register(PacketInStream
.createNettyPacketInStream(context, address, blockId,
.createNettyPacketInStream(context, workerNetAddress, blockId,
lockBlockResource.getResult().getLockId(), blockWorkerClient.getSessionId(),
blockSize, false, Protocol.RequestType.ALLUXIO_BLOCK, options));
blockWorkerClient.accessBlock(blockId);
Expand Down Expand Up @@ -168,32 +160,25 @@ public static BlockInStream createUfsBlockInStream(FileSystemContext context, St
LockBlockResult lockBlockResult =
closer.register(blockWorkerClient.lockUfsBlock(blockId, lockBlockOptions)).getResult();
PacketInStream inStream;
SocketAddress address;
if (NettyUtils.isDomainSocketSupported(workerNetAddress)) {
address = new DomainSocketAddress(workerNetAddress.getDomainSocketPath());
} else {
address = blockWorkerClient.getDataServerAddress();
}
if (lockBlockResult.getLockBlockStatus().blockInAlluxio()) {
boolean local = blockWorkerClient.getDataServerAddress().getHostName()
.equals(NetworkAddressUtils.getClientHostName());
boolean local = workerNetAddress.getHost().equals(NetworkAddressUtils.getClientHostName());
if (local && Configuration.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED)
&& !NettyUtils.isDomainSocketSupported(workerNetAddress)) {
inStream = closer.register(PacketInStream
.createLocalPacketInStream(
lockBlockResult.getBlockPath(), blockId, blockSize, options));
} else {
inStream = closer.register(PacketInStream
.createNettyPacketInStream(context, address, blockId, lockBlockResult.getLockId(),
blockWorkerClient.getSessionId(), blockSize, false,
.createNettyPacketInStream(context, workerNetAddress, blockId,
lockBlockResult.getLockId(), blockWorkerClient.getSessionId(), blockSize, false,
Protocol.RequestType.ALLUXIO_BLOCK, options));
}
blockWorkerClient.accessBlock(blockId);
} else {
Preconditions.checkState(lockBlockResult.getLockBlockStatus().ufsTokenAcquired());
inStream = closer.register(PacketInStream
.createNettyPacketInStream(context, address, blockId, lockBlockResult.getLockId(),
blockWorkerClient.getSessionId(), blockSize,
.createNettyPacketInStream(context, workerNetAddress, blockId,
lockBlockResult.getLockId(), blockWorkerClient.getSessionId(), blockSize,
!options.getAlluxioStorageType().isStore(), Protocol.RequestType.UFS_BLOCK,
options));
}
Expand Down Expand Up @@ -225,8 +210,8 @@ public int positionedRead(long pos, byte[] b, int off, int len) {
}

@Override
public InetSocketAddress location() {
return mBlockWorkerClient.getDataServerAddress();
public WorkerNetAddress location() {
return mBlockWorkerClient.getWorkerNetAddress();
}

@Override
Expand Down Expand Up @@ -258,7 +243,7 @@ protected BlockInStream(PacketInStream inputStream, BlockWorkerClient blockWorke

mCloser = closer;
mCloser.register(mInputStream);
mLocal = blockWorkerClient.getDataServerAddress().getHostName()
mLocal = blockWorkerClient.getWorkerNetAddress().getHost()
.equals(NetworkAddressUtils.getClientHostName());
}
}
Expand Up @@ -84,16 +84,9 @@ public static BlockOutStream createNettyBlockOutStream(long blockId, long blockS
Closer closer = Closer.create();
try {
BlockWorkerClient client = closer.register(context.createBlockWorkerClient(workerNetAddress));

SocketAddress address;
if (NettyUtils.isDomainSocketSupported(workerNetAddress)) {
address = new DomainSocketAddress(workerNetAddress.getDomainSocketPath());
} else {
address = client.getDataServerAddress();
}
PacketOutStream outStream = PacketOutStream
.createNettyPacketOutStream(context, address, client.getSessionId(), blockId, blockSize,
Protocol.RequestType.ALLUXIO_BLOCK, options);
.createNettyPacketOutStream(context, workerNetAddress, client.getSessionId(), blockId,
blockSize, Protocol.RequestType.ALLUXIO_BLOCK, options);
closer.register(outStream);
return new BlockOutStream(outStream, blockId, blockSize, client, options);
} catch (RuntimeException e) {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import alluxio.util.CommonUtils;
import alluxio.util.network.NettyUtils;
import alluxio.util.proto.ProtoMessage;
import alluxio.wire.WorkerNetAddress;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -77,7 +78,7 @@ public final class NettyPacketReader implements PacketReader {
private final FileSystemContext mContext;
private final Channel mChannel;
private final Protocol.RequestType mRequestType;
private final SocketAddress mAddress;
private final WorkerNetAddress mAddress;
private final long mId;
private final long mStart;
private final long mBytesToRead;
Expand Down Expand Up @@ -125,7 +126,7 @@ public final class NettyPacketReader implements PacketReader {
* @param type the request type (block or UFS file)
* @param packetSize the packet size
*/
private NettyPacketReader(FileSystemContext context, SocketAddress address, long id,
private NettyPacketReader(FileSystemContext context, WorkerNetAddress address, long id,
long offset, long len, long lockId, long sessionId, boolean noCache,
Protocol.RequestType type, long packetSize) {
Preconditions.checkArgument(offset >= 0 && len > 0 && packetSize > 0);
Expand Down Expand Up @@ -345,7 +346,7 @@ private boolean acceptMessage(Object msg) {
*/
public static class Factory implements PacketReader.Factory {
private final FileSystemContext mContext;
private final SocketAddress mAddress;
private final WorkerNetAddress mAddress;
private final long mId;
private final long mLockId;
private final long mSessionId;
Expand All @@ -365,7 +366,7 @@ public static class Factory implements PacketReader.Factory {
* @param type the request type
* @param packetSize the packet size
*/
public Factory(FileSystemContext context, SocketAddress address, long id, long lockId,
public Factory(FileSystemContext context, WorkerNetAddress address, long id, long lockId,
long sessionId, boolean noCache, Protocol.RequestType type, long packetSize) {
mContext = context;
mAddress = address;
Expand Down
Expand Up @@ -25,6 +25,7 @@
import alluxio.resource.LockResource;
import alluxio.util.CommonUtils;
import alluxio.util.proto.ProtoMessage;
import alluxio.wire.WorkerNetAddress;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -72,7 +73,7 @@ public final class NettyPacketWriter implements PacketWriter {

private final FileSystemContext mContext;
private final Channel mChannel;
private final SocketAddress mAddress;
private final WorkerNetAddress mAddress;
private final long mLength;
private final Protocol.WriteRequest mPartialRequest;
private final long mPacketSize;
Expand Down Expand Up @@ -116,7 +117,7 @@ public final class NettyPacketWriter implements PacketWriter {
* @param type the request type (block or UFS file)
* @param packetSize the packet size
*/
public NettyPacketWriter(FileSystemContext context, final SocketAddress address, long id,
public NettyPacketWriter(FileSystemContext context, final WorkerNetAddress address, long id,
long length, long sessionId, int tier, Protocol.RequestType type, long packetSize) {
this(context, address, length, Protocol.WriteRequest.newBuilder().setId(id)
.setSessionId(sessionId).setTier(tier).setType(type).buildPartial(), packetSize);
Expand All @@ -131,7 +132,7 @@ public NettyPacketWriter(FileSystemContext context, final SocketAddress address,
* @param partialRequest details of the write request which are constant for all requests
* @param packetSize the packet size
*/
public NettyPacketWriter(FileSystemContext context, final SocketAddress address, long
public NettyPacketWriter(FileSystemContext context, final WorkerNetAddress address, long
length, Protocol.WriteRequest partialRequest, long packetSize) {
mContext = context;
mAddress = address;
Expand Down
Expand Up @@ -22,6 +22,7 @@
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.proto.dataserver.Protocol;
import alluxio.util.io.BufferUtils;
import alluxio.wire.WorkerNetAddress;

import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -86,7 +87,7 @@ public static PacketInStream createLocalPacketInStream(
* @return the {@link PacketInStream} created
*/
public static PacketInStream createNettyPacketInStream(FileSystemContext context,
SocketAddress address, long id, long lockId, long sessionId, long length,
WorkerNetAddress address, long id, long lockId, long sessionId, long length,
boolean noCache, Protocol.RequestType type, InStreamOptions options) {
long packetSize =
Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_READER_PACKET_SIZE_BYTES);
Expand Down
Expand Up @@ -22,6 +22,7 @@
import alluxio.exception.status.AlluxioStatusException;
import alluxio.proto.dataserver.Protocol;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerNetAddress;

import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
Expand Down Expand Up @@ -79,7 +80,7 @@ public static PacketOutStream createLocalPacketOutStream(BlockWorkerClient clien
* @return the {@link PacketOutStream} created
*/
public static PacketOutStream createNettyPacketOutStream(FileSystemContext context,
SocketAddress address, long sessionId, long id, long length,
WorkerNetAddress address, long sessionId, long id, long length,
Protocol.RequestType type, OutStreamOptions options) {
long packetSize =
Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_WRITER_PACKET_SIZE_BYTES);
Expand All @@ -99,7 +100,7 @@ public static PacketOutStream createNettyPacketOutStream(FileSystemContext conte
* @return the {@link PacketOutStream} created
*/
public static PacketOutStream createNettyPacketOutStream(FileSystemContext context,
SocketAddress address, long length, Protocol.WriteRequest partialRequest,
WorkerNetAddress address, long length, Protocol.WriteRequest partialRequest,
OutStreamOptions options) {
long packetSize =
Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_WRITER_PACKET_SIZE_BYTES);
Expand Down
Expand Up @@ -14,6 +14,7 @@
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.proto.dataserver.Protocol;
import alluxio.wire.WorkerNetAddress;

import java.io.FilterOutputStream;
import java.io.OutputStream;
Expand All @@ -35,7 +36,7 @@ public final class UnderFileSystemFileOutStream extends FilterOutputStream {
* @param options the options to construct this stream with
* @return a new {@link UnderFileSystemFileOutStream}
*/
public static OutputStream create(FileSystemContext context, SocketAddress address,
public static OutputStream create(FileSystemContext context, WorkerNetAddress address,
OutStreamOptions options) {
return new UnderFileSystemFileOutStream(context, address, options);
}
Expand All @@ -49,7 +50,7 @@ public static OutputStream create(FileSystemContext context, SocketAddress addre
* @param address the data server address
* @param options the out stream options
*/
public UnderFileSystemFileOutStream(FileSystemContext context, SocketAddress address,
public UnderFileSystemFileOutStream(FileSystemContext context, WorkerNetAddress address,
OutStreamOptions options) {
super(PacketOutStream.createNettyPacketOutStream(context, address, Long.MAX_VALUE,
Protocol.WriteRequest.newBuilder().setSessionId(-1).setTier(TIER_UNUSED)
Expand Down
Expand Up @@ -27,20 +27,16 @@
import alluxio.metrics.MetricsSystem;
import alluxio.resource.CloseableResource;
import alluxio.util.CommonUtils;
import alluxio.util.network.NettyUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.WorkerNetAddress;

import com.codahale.metrics.Counter;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import io.netty.channel.unix.DomainSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.net.SocketAddress;
import java.util.LinkedList;
import java.util.List;

Expand Down Expand Up @@ -106,14 +102,8 @@ public FileOutStream(AlluxioURI path, OutStreamOptions options, FileSystemContex
try {
WorkerNetAddress workerNetAddress = // not storing data to Alluxio, so block size is 0
options.getLocationPolicy().getWorkerForNextBlock(mBlockStore.getWorkerInfoList(), 0);
SocketAddress address;
if (NettyUtils.isDomainSocketSupported(workerNetAddress)) {
address = new DomainSocketAddress(workerNetAddress.getDomainSocketPath());
} else {
address = NetworkAddressUtils.getDataPortSocketAddress(workerNetAddress);
}
mUnderStorageOutputStream =
mCloser.register(UnderFileSystemFileOutStream.create(mContext, address, mOptions));
mUnderStorageOutputStream = mCloser
.register(UnderFileSystemFileOutStream.create(mContext, workerNetAddress, mOptions));
} catch (AlluxioStatusException e) {
CommonUtils.closeQuietly(mCloser);
throw e.toIOException();
Expand Down
Expand Up @@ -292,10 +292,11 @@ public BlockWorkerClient createBlockWorkerClient(WorkerNetAddress address,
* available in the pool, it tries to create a new one. And an exception is thrown if it fails to
* create a new one.
*
* @param address the network address of the channel
* @param workerNetAddress the network address of the channel
* @return the acquired netty channel
*/
public Channel acquireNettyChannel(final SocketAddress address) {
public Channel acquireNettyChannel(final WorkerNetAddress workerNetAddress) {
SocketAddress address = NetworkAddressUtils.getDataPortSocketAddress(workerNetAddress);
if (!mNettyChannelPools.containsKey(address)) {
Bootstrap bs = NettyClient.createClientBootstrap(address);
bs.remoteAddress(address);
Expand All @@ -317,10 +318,11 @@ public Channel acquireNettyChannel(final SocketAddress address) {
/**
* Releases a netty channel to the channel pools.
*
* @param address the address of the channel
* @param workerNetAddress the address of the channel
* @param channel the channel to release
*/
public void releaseNettyChannel(SocketAddress address, Channel channel) {
public void releaseNettyChannel(WorkerNetAddress workerNetAddress, Channel channel) {
SocketAddress address = NetworkAddressUtils.getDataPortSocketAddress(workerNetAddress);
Preconditions.checkArgument(mNettyChannelPools.containsKey(address));
mNettyChannelPools.get(address).release(channel);
}
Expand Down

0 comments on commit 5b64bd2

Please sign in to comment.