From 308f1a7b0e91902ae5e6edbb614e7ea5ce975417 Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Fri, 18 Nov 2016 19:44:40 -0800 Subject: [PATCH] APEXCORE-456 - Explicitly limit Server.Subscriber to one way communication --- .../bufferserver/internal/LogicalNode.java | 6 +- .../bufferserver/internal/PhysicalNode.java | 25 ++-- .../bufferserver/server/Server.java | 117 +++++++----------- 3 files changed, 54 insertions(+), 94 deletions(-) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index c08cfb9155..ceb94694b9 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -32,8 +32,8 @@ import com.datatorrent.bufferserver.util.BitVector; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.bufferserver.util.SerializedData; -import com.datatorrent.netlet.AbstractLengthPrependerClient; import com.datatorrent.netlet.EventLoop; +import com.datatorrent.netlet.WriteOnlyClient; /** * LogicalNode represents a logical node in a DAG

@@ -99,7 +99,7 @@ public DataListIterator getIterator() * * @param connection */ - public void addConnection(AbstractLengthPrependerClient connection) + public void addConnection(WriteOnlyClient connection) { PhysicalNode pn = new PhysicalNode(connection); if (!physicalNodes.contains(pn)) { @@ -111,7 +111,7 @@ public void addConnection(AbstractLengthPrependerClient connection) * * @param client */ - public void removeChannel(AbstractLengthPrependerClient client) + public void removeChannel(WriteOnlyClient client) { for (PhysicalNode pn : physicalNodes) { if (pn.getClient() == client) { diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java index 424a51a761..9a3fe374e5 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java @@ -23,7 +23,7 @@ import org.slf4j.LoggerFactory; import com.datatorrent.bufferserver.util.SerializedData; -import com.datatorrent.netlet.AbstractLengthPrependerClient; +import com.datatorrent.netlet.WriteOnlyClient; /** * PhysicalNode represents one physical subscriber. @@ -32,16 +32,16 @@ */ public class PhysicalNode { - public static final int BUFFER_SIZE = 8 * 1024; private final long starttime; - private final AbstractLengthPrependerClient client; - private final long processedMessageCount; + private final WriteOnlyClient client; + private long processedMessageCount; + private SerializedData blocker; /** * * @param client */ - public PhysicalNode(AbstractLengthPrependerClient client) + public PhysicalNode(WriteOnlyClient client) { this.client = client; starttime = System.currentTimeMillis(); @@ -71,20 +71,11 @@ public long getUptime() * @param d * @throws InterruptedException */ - private SerializedData blocker; - public boolean send(SerializedData d) { - if (d.offset == d.dataOffset) { - if (client.write(d.buffer, d.offset, d.length)) { - return true; - } - } else { - if (client.send(d.buffer, d.offset, d.length)) { - return true; - } + if (client.send(d.buffer, d.dataOffset, d.length - (d.dataOffset - d.offset))) { + return true; } - blocker = d; return false; } @@ -150,7 +141,7 @@ public final int hashCode() /** * @return the channel */ - public AbstractLengthPrependerClient getClient() + public WriteOnlyClient getClient() { return client; } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 12eed5f440..baa4e0b73b 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -54,6 +54,7 @@ import com.datatorrent.netlet.DefaultEventLoop; import com.datatorrent.netlet.EventLoop; import com.datatorrent.netlet.Listener.ServerListener; +import com.datatorrent.netlet.WriteOnlyLengthPrependerClient; import com.datatorrent.netlet.util.VarInt; /** @@ -171,7 +172,7 @@ public String toString() private final ConcurrentHashMap publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1); private final ConcurrentHashMap subscriberGroups = new ConcurrentHashMap(); private final ConcurrentHashMap publisherChannels = new ConcurrentHashMap<>(); - private final ConcurrentHashMap subscriberChannels = new ConcurrentHashMap<>(); + private final ConcurrentHashMap subscriberChannels = new ConcurrentHashMap<>(); private final int blockSize; private final int numberOfCacheBlocks; @@ -235,15 +236,18 @@ private void handleResetRequest(ResetRequestTuple request, final AbstractLengthP /** * * @param request - * @param connection + * @param key * @return */ - public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, - final AbstractLengthPrependerClient connection) + public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, SelectionKey key) { String identifier = request.getIdentifier(); String type = request.getStreamType(); String upstream_identifier = request.getUpstreamIdentifier(); + final Subscriber subscriber = new Subscriber(type, request.getMask(), request.getPartitions(), request.getBufferSize()); + key.attach(subscriber); + subscriber.registered(key); + subscriber.connected(); // Check if there is a logical node of this type, if not create it. final LogicalNode ln; @@ -252,7 +256,7 @@ public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, /* * close previous connection with the same identifier which is guaranteed to be unique. */ - AbstractLengthPrependerClient previous = subscriberChannels.put(identifier, connection); + ClientListener previous = subscriberChannels.put(identifier, subscriber); if (previous != null) { eventloop.disconnect(previous); } @@ -264,7 +268,7 @@ public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, public void run() { ln.boot(eventloop); - ln.addConnection(connection); + ln.addConnection(subscriber); ln.catchUp(); } }); @@ -302,7 +306,7 @@ public void run() @Override public void run() { - ln.addConnection(connection); + ln.addConnection(subscriber); ln.catchUp(); dl.addDataListener(ln); } @@ -312,6 +316,32 @@ public void run() return ln; } + private void teardownSubscriber(Subscriber subscriber) + { + LogicalNode ln = subscriberGroups.get(subscriber.type); + if (ln != null) { + if (subscriberChannels.containsValue(subscriber)) { + final Iterator> i = subscriberChannels.entrySet().iterator(); + while (i.hasNext()) { + if (i.next().getValue() == subscriber) { + i.remove(); + break; + } + } + } + + ln.removeChannel(subscriber); + if (ln.getPhysicalNodeCount() == 0) { + DataList dl = publisherBuffers.get(ln.getUpstream()); + if (dl != null) { + dl.removeDataListener(ln); + } + subscriberGroups.remove(ln.getGroup()); + } + ln.getIterator().close(); + } + } + /** * * @param request @@ -464,43 +494,11 @@ public int readSize() /* * unregister the unidentified client since its job is done! */ - unregistered(key); + unregistered(key.interestOps(0)); ignore = true; logger.info("Received subscriber request: {}", request); - SubscribeRequestTuple subscriberRequest = (SubscribeRequestTuple)request; - AbstractLengthPrependerClient subscriber; - -// /* for backward compatibility - set the buffer size to 16k - EXPERIMENTAL */ - int bufferSize = subscriberRequest.getBufferSize(); -// if (bufferSize == 0) { -// bufferSize = 16 * 1024; -// } - if (subscriberRequest.getVersion().equals(Tuple.FAST_VERSION)) { - subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), - subscriberRequest.getPartitions(), bufferSize); - } else { - subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), - subscriberRequest.getPartitions(), bufferSize) - { - @Override - public int readSize() - { - if (writeOffset - readOffset < 2) { - return -1; - } - - short s = buffer[readOffset++]; - return s | (buffer[readOffset++] << 8); - } - - }; - } - key.attach(subscriber); - key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); - subscriber.registered(key); - - handleSubscriberRequest(subscriberRequest, subscriber); + handleSubscriberRequest((SubscribeRequestTuple)request, key); break; case PURGE_REQUEST: @@ -528,7 +526,7 @@ public int readSize() } - class Subscriber extends AbstractLengthPrependerClient + private class Subscriber extends WriteOnlyLengthPrependerClient { private final String type; private final int mask; @@ -536,18 +534,11 @@ class Subscriber extends AbstractLengthPrependerClient Subscriber(String type, int mask, int[] partitions, int bufferSize) { - super(1024, bufferSize); + super(1024 * 1024, bufferSize == 0 ? 256 * 1024 : bufferSize); this.type = type; this.mask = mask; this.partitions = partitions; - super.write = false; - } - - @Override - public void onMessage(byte[] buffer, int offset, int size) - { - logger.warn("Received data when no data is expected: {}", - Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size))); + super.isWriteEnabled = false; } @Override @@ -580,29 +571,7 @@ private void teardown() return; } torndown = true; - - LogicalNode ln = subscriberGroups.get(type); - if (ln != null) { - if (subscriberChannels.containsValue(this)) { - final Iterator> i = subscriberChannels.entrySet().iterator(); - while (i.hasNext()) { - if (i.next().getValue() == this) { - i.remove(); - break; - } - } - } - - ln.removeChannel(this); - if (ln.getPhysicalNodeCount() == 0) { - DataList dl = publisherBuffers.get(ln.getUpstream()); - if (dl != null) { - dl.removeDataListener(ln); - } - subscriberGroups.remove(ln.getGroup()); - } - ln.getIterator().close(); - } + teardownSubscriber(this); } }