Skip to content

Commit

Permalink
Update PacketOutStream
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Dec 22, 2016
1 parent 510abfc commit 488a12f
Showing 1 changed file with 17 additions and 12 deletions.
Expand Up @@ -17,6 +17,7 @@
import alluxio.client.file.FileSystemContext; import alluxio.client.file.FileSystemContext;
import alluxio.exception.PreconditionMessage; import alluxio.exception.PreconditionMessage;
import alluxio.proto.dataserver.Protocol; import alluxio.proto.dataserver.Protocol;
import alluxio.util.network.NetworkAddressUtils;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -74,34 +75,38 @@ public static PacketOutStream createLocalPacketOutStream(BlockWorkerClient clien
public static PacketOutStream createNettyPacketOutStream(FileSystemContext context, public static PacketOutStream createNettyPacketOutStream(FileSystemContext context,
InetSocketAddress address, long sessionId, long id, long length, Protocol.RequestType type) InetSocketAddress address, long sessionId, long id, long length, Protocol.RequestType type)
throws IOException { throws IOException {
List<InetSocketAddress> addresses = new ArrayList<>(); NettyPacketWriter packetWriter =
addresses.add(address); new NettyPacketWriter(context, address, id, length, sessionId, type);
List<Long> sessionIds = new ArrayList<>(); return new PacketOutStream(packetWriter, length);
sessionIds.add(sessionId);
return createNettyPacketOutStream(context, addresses, sessionIds, id, length, type);
} }


/** /**
* Creates a {@link PacketOutStream} that writes to a list of netty data servers. * Creates a {@link PacketOutStream} that writes to a list of locations.
* *
* @param context the file system context * @param context the file system context
* @param addresses the netty data server addresses * @param clients the netty data server addresses
* @param sessionIds the session IDs for all the data servers * @param sessionIds the session IDs for all the data servers
* @param id the ID (block ID or UFS file ID) * @param id the ID (block ID or UFS file ID)
* @param length the block or file length * @param length the block or file length
* @param type the request type (either block write or UFS file write) * @param type the request type (either block write or UFS file write)
* @return the {@link PacketOutStream} created * @return the {@link PacketOutStream} created
* @throws IOException if it fails to create the object * @throws IOException if it fails to create the object
*/ */
public static PacketOutStream createNettyPacketOutStream(FileSystemContext context, public static PacketOutStream createReplicatedPacketOutStream(FileSystemContext context,
List<InetSocketAddress> addresses, List<Long> sessionIds, long id, long length, List<BlockWorkerClient> clients, List<Long> sessionIds, long id, long length,
Protocol.RequestType type) throws IOException { Protocol.RequestType type) throws IOException {
Preconditions.checkArgument(addresses.size() == sessionIds.size()); Preconditions.checkArgument(clients.size() == sessionIds.size());
String localHost = NetworkAddressUtils.getLocalHostName();


Iterator<Long> iterator = sessionIds.iterator(); Iterator<Long> iterator = sessionIds.iterator();
List<PacketWriter> packetWriters = new ArrayList<>(); List<PacketWriter> packetWriters = new ArrayList<>();
for (InetSocketAddress address : addresses) { for (BlockWorkerClient client: clients) {
packetWriters.add(new NettyPacketWriter(context, address, id, length, iterator.next(), type)); if (client.getWorkerNetAddress().getHost().equals(localHost)) {
packetWriters.add(LocalFilePacketWriter.create(client, id));
} else {
packetWriters.add(new NettyPacketWriter(context, client.getDataServerAddress(), id, length,
iterator.next(), type));
}
} }
return new PacketOutStream(packetWriters, length); return new PacketOutStream(packetWriters, length);
} }
Expand Down

0 comments on commit 488a12f

Please sign in to comment.