From d28c0ddd9d259855c1d9ba623bfb970342bc40c4 Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Fri, 2 Dec 2016 19:08:49 -0800 Subject: [PATCH 1/2] APEXCORE-583 - Buffer Server LogicalNode should not be reused by Subscribers --- .../bufferserver/server/Server.java | 259 ++++++++---------- 1 file changed, 109 insertions(+), 150 deletions(-) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 12eed5f440..e720248718 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -113,14 +113,27 @@ public synchronized void registered(SelectionKey key) @Override public void unregistered(SelectionKey key) { - serverHelperExecutor.shutdown(); - storageHelperExecutor.shutdown(); - try { - serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); - } catch (InterruptedException ex) { - logger.debug("Executor Termination", ex); + for (LogicalNode ln : subscriberGroups.values()) { + ln.boot(eventloop); } - logger.info("Server stopped listening at {}", address); + /* + * There may be unregister tasks scheduled to run on the event loop that use serverHelperExecutor. + */ + eventloop.submit(new Runnable() + { + @Override + public void run() + { + serverHelperExecutor.shutdown(); + storageHelperExecutor.shutdown(); + try { + serverHelperExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException ex) { + logger.debug("Executor Termination", ex); + } + logger.info("Server stopped listening at {}", address); + } + }); } public synchronized InetSocketAddress run(EventLoop eventloop) @@ -165,13 +178,12 @@ public static void main(String[] args) throws Exception @Override public String toString() { - return identity; + return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + "{address=" + address + "}"; } private final ConcurrentHashMap publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1); private final ConcurrentHashMap subscriberGroups = new ConcurrentHashMap(); private final ConcurrentHashMap publisherChannels = new ConcurrentHashMap<>(); - private final ConcurrentHashMap subscriberChannels = new ConcurrentHashMap<>(); private final int blockSize; private final int numberOfCacheBlocks; @@ -235,81 +247,70 @@ private void handleResetRequest(ResetRequestTuple request, final AbstractLengthP /** * * @param request - * @param connection - * @return + * @param key */ - public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, - final AbstractLengthPrependerClient connection) + public void handleSubscriberRequest(final SubscribeRequestTuple request, final SelectionKey key) { - String identifier = request.getIdentifier(); - String type = request.getStreamType(); - String upstream_identifier = request.getUpstreamIdentifier(); - - // Check if there is a logical node of this type, if not create it. - final LogicalNode ln; - if (subscriberGroups.containsKey(type)) { - //logger.debug("adding to exiting group = {}", subscriberGroups.get(type)); - /* - * close previous connection with the same identifier which is guaranteed to be unique. - */ - AbstractLengthPrependerClient previous = subscriberChannels.put(identifier, connection); - if (previous != null) { - eventloop.disconnect(previous); - } - - ln = subscriberGroups.get(type); + try { serverHelperExecutor.submit(new Runnable() { @Override public void run() { - ln.boot(eventloop); - ln.addConnection(connection); - ln.catchUp(); - } - }); - } else { - /* - * if there is already a datalist registered for the type in which this client is interested, - * then get a iterator on the data items of that data list. If the datalist is not registered, - * then create one and register it. Hopefully this one would be used by future upstream nodes. - */ - final DataList dl; - if (publisherBuffers.containsKey(upstream_identifier)) { - dl = publisherBuffers.get(upstream_identifier); - //logger.debug("old list = {}", dl); - } else { - dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? - new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : - new DataList(upstream_identifier, blockSize, numberOfCacheBlocks); - publisherBuffers.put(upstream_identifier, dl); - //logger.debug("new list = {}", dl); - } + final String upstream_identifier = request.getUpstreamIdentifier(); + + /* + * if there is already a datalist registered for the type in which this client is interested, + * then get a iterator on the data items of that data list. If the datalist is not registered, + * then create one and register it. Hopefully this one would be used by future upstream nodes. + */ + DataList dl = publisherBuffers.get(upstream_identifier); + if (dl == null) { + dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? + new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : + new DataList(upstream_identifier, blockSize, numberOfCacheBlocks); + DataList odl = publisherBuffers.putIfAbsent(upstream_identifier, dl); + if (odl != null) { + dl = odl; + } + } - long skipWindowId = (long)request.getBaseSeconds() << 32 | request.getWindowId(); - ln = new LogicalNode(identifier, upstream_identifier, type, dl.newIterator(skipWindowId), skipWindowId); + final String identifier = request.getIdentifier(); + final String type = request.getStreamType(); + final long skipWindowId = (long)request.getBaseSeconds() << 32 | request.getWindowId(); + final LogicalNode ln = new LogicalNode(identifier, upstream_identifier, type, dl + .newIterator(skipWindowId), skipWindowId); - int mask = request.getMask(); - if (mask != 0) { - for (Integer bs : request.getPartitions()) { - ln.addPartition(bs, mask); - } - } + int mask = request.getMask(); + if (mask != 0) { + for (Integer bs : request.getPartitions()) { + ln.addPartition(bs, mask); + } + } + final LogicalNode oln = subscriberGroups.put(type, ln); + if (oln != null) { + oln.boot(eventloop); + } + AbstractLengthPrependerClient subscriber = new Subscriber(ln, request.getBufferSize()); + + subscriber.registered(key); + key.attach(subscriber); + key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); - subscriberGroups.put(type, ln); - serverHelperExecutor.submit(new Runnable() - { - @Override - public void run() - { - ln.addConnection(connection); ln.catchUp(); dl.addDataListener(ln); } }); + } catch (RejectedExecutionException e) { + logger.error("Received subscriber request {} after server {} termination. Disconnecting {}", request, this, key.channel(), e); + if (key.isValid()) { + try { + key.channel().close(); + } catch (IOException ioe) { + logger.error("Failed to close channel {}", key.channel(), ioe); + } + } } - - return ln; } /** @@ -322,9 +323,9 @@ public DataList handlePublisherRequest(PublishRequestTuple request, AbstractLeng { String identifier = request.getIdentifier(); - DataList dl; + DataList dl = publisherBuffers.get(identifier); - if (publisherBuffers.containsKey(identifier)) { + if (dl != null) { /* * close previous connection with the same identifier which is guaranteed to be unique. */ @@ -333,7 +334,6 @@ public DataList handlePublisherRequest(PublishRequestTuple request, AbstractLeng eventloop.disconnect(previous); } - dl = publisherBuffers.get(identifier); try { dl.rewind(request.getBaseSeconds(), request.getWindowId()); } catch (IOException ie) { @@ -343,7 +343,10 @@ public DataList handlePublisherRequest(PublishRequestTuple request, AbstractLeng dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(identifier, blockSize, numberOfCacheBlocks) : new DataList(identifier, blockSize, numberOfCacheBlocks); - publisherBuffers.put(identifier, dl); + DataList odl = publisherBuffers.putIfAbsent(identifier, dl); + if (odl != null) { + dl = odl; + } } dl.setSecondaryStorage(storage, storageHelperExecutor); @@ -468,39 +471,7 @@ public int readSize() ignore = true; logger.info("Received subscriber request: {}", request); - SubscribeRequestTuple subscriberRequest = (SubscribeRequestTuple)request; - AbstractLengthPrependerClient subscriber; - -// /* for backward compatibility - set the buffer size to 16k - EXPERIMENTAL */ - int bufferSize = subscriberRequest.getBufferSize(); -// if (bufferSize == 0) { -// bufferSize = 16 * 1024; -// } - if (subscriberRequest.getVersion().equals(Tuple.FAST_VERSION)) { - subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), - subscriberRequest.getPartitions(), bufferSize); - } else { - subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), - subscriberRequest.getPartitions(), bufferSize) - { - @Override - public int readSize() - { - if (writeOffset - readOffset < 2) { - return -1; - } - - short s = buffer[readOffset++]; - return s | (buffer[readOffset++] << 8); - } - - }; - } - key.attach(subscriber); - key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); - subscriber.registered(key); - - handleSubscriberRequest(subscriberRequest, subscriber); + handleSubscriberRequest((SubscribeRequestTuple)request, key); break; case PURGE_REQUEST: @@ -530,16 +501,13 @@ public int readSize() class Subscriber extends AbstractLengthPrependerClient { - private final String type; - private final int mask; - private final int[] partitions; + private LogicalNode ln; - Subscriber(String type, int mask, int[] partitions, int bufferSize) + Subscriber(LogicalNode ln, int bufferSize) { super(1024, bufferSize); - this.type = type; - this.mask = mask; - this.partitions = partitions; + this.ln = ln; + ln.addConnection(this); super.write = false; } @@ -553,58 +521,49 @@ public void onMessage(byte[] buffer, int offset, int size) @Override public void unregistered(final SelectionKey key) { - super.unregistered(key); - teardown(); - } - - @Override - public void handleException(Exception cce, EventLoop el) - { - teardown(); - super.handleException(cce, el); - } + try { + serverHelperExecutor.submit(new Runnable() + { + @Override + public void run() + { + teardown(); + } - @Override - public String toString() - { - return "Server.Subscriber{" + "type=" + type + ", mask=" + mask + - ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + '}'; + @Override + public String toString() + { + return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + + " teardown " + Subscriber.this; + } + }); + } catch (Exception e) { + logger.error("{}", this, e); + } + super.unregistered(key); } - private volatile boolean torndown; - private void teardown() { - //logger.debug("Teardown is being called {}", torndown, new Exception()); - if (torndown) { - return; - } - torndown = true; - - LogicalNode ln = subscriberGroups.get(type); if (ln != null) { - if (subscriberChannels.containsValue(this)) { - final Iterator> i = subscriberChannels.entrySet().iterator(); - while (i.hasNext()) { - if (i.next().getValue() == this) { - i.remove(); - break; - } - } - } - - ln.removeChannel(this); + ln.removeChannel(Subscriber.this); if (ln.getPhysicalNodeCount() == 0) { DataList dl = publisherBuffers.get(ln.getUpstream()); if (dl != null) { dl.removeDataListener(ln); } - subscriberGroups.remove(ln.getGroup()); + subscriberGroups.remove(ln.getGroup(), ln); + ln.getIterator().close(); + ln = null; } - ln.getIterator().close(); } } + @Override + public String toString() + { + return "Server.Subscriber{" + "ln=" + ln + "}"; + } } /** From d1646e42bdf5594ef34070594733a7ca10123a3f Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Tue, 6 Dec 2016 14:09:03 -0800 Subject: [PATCH 2/2] APEXCORE-583 - Buffer Server LogicalNode should not be reused by Subscribers --- .../bufferserver/server/Server.java | 98 +++++++++++-------- 1 file changed, 55 insertions(+), 43 deletions(-) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index e720248718..af5514326c 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -249,7 +249,7 @@ private void handleResetRequest(ResetRequestTuple request, final AbstractLengthP * @param request * @param key */ - public void handleSubscriberRequest(final SubscribeRequestTuple request, final SelectionKey key) + private void handleSubscriberRequest(final SubscribeRequestTuple request, final SelectionKey key) { try { serverHelperExecutor.submit(new Runnable() @@ -259,11 +259,11 @@ public void run() { final String upstream_identifier = request.getUpstreamIdentifier(); - /* - * if there is already a datalist registered for the type in which this client is interested, - * then get a iterator on the data items of that data list. If the datalist is not registered, - * then create one and register it. Hopefully this one would be used by future upstream nodes. - */ + /* + * if there is already a datalist registered for the type in which this client is interested, + * then get a iterator on the data items of that data list. If the datalist is not registered, + * then create one and register it. Hopefully this one would be used by future upstream nodes. + */ DataList dl = publisherBuffers.get(upstream_identifier); if (dl == null) { dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? @@ -302,7 +302,7 @@ public void run() } }); } catch (RejectedExecutionException e) { - logger.error("Received subscriber request {} after server {} termination. Disconnecting {}", request, this, key.channel(), e); + logger.error("Received subscriber request {} after server {} termination. Disconnecting {}.", request, this, key.channel(), e); if (key.isValid()) { try { key.channel().close(); @@ -313,6 +313,52 @@ public void run() } } + private void handleSubscriberTeardown(final SelectionKey key) + { + try { + final Subscriber subscriber = (Subscriber)key.attachment(); + if (subscriber != null) { + serverHelperExecutor.submit(new Runnable() + { + @Override + public void run() + { + try { + final LogicalNode ln = subscriber.ln; + if (ln != null) { + ln.removeChannel(subscriber); + if (ln.getPhysicalNodeCount() == 0) { + DataList dl = publisherBuffers.get(ln.getUpstream()); + if (dl != null) { + logger.info("Removing ln {} from dl {}", ln, dl); + dl.removeDataListener(ln); + } + subscriberGroups.remove(ln.getGroup(), ln); + ln.getIterator().close(); + } + subscriber.ln = null; + } + } catch (Throwable t) { + logger.error("Buffer server {} failed to tear down subscriber {}.", Server.this, subscriber, t); + } + } + + @Override + public String toString() + { + return subscriber + " teardown task."; + } + }); + } else { + logger.error("Selection key {} has unexpected attachment {}.", key, key.attachment()); + } + } catch (ClassCastException e) { + logger.error("Selection key {} has unexpected attachment {}.", key, key.attachment()); + } catch (RejectedExecutionException e) { + logger.error("Subscriber {} teardown after server {} termination.", key.attachment(), this, e); + } + } + /** * * @param request @@ -521,48 +567,14 @@ public void onMessage(byte[] buffer, int offset, int size) @Override public void unregistered(final SelectionKey key) { - try { - serverHelperExecutor.submit(new Runnable() - { - @Override - public void run() - { - teardown(); - } - - @Override - public String toString() - { - return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + - " teardown " + Subscriber.this; - } - }); - } catch (Exception e) { - logger.error("{}", this, e); - } + handleSubscriberTeardown(key); super.unregistered(key); } - private void teardown() - { - if (ln != null) { - ln.removeChannel(Subscriber.this); - if (ln.getPhysicalNodeCount() == 0) { - DataList dl = publisherBuffers.get(ln.getUpstream()); - if (dl != null) { - dl.removeDataListener(ln); - } - subscriberGroups.remove(ln.getGroup(), ln); - ln.getIterator().close(); - ln = null; - } - } - } - @Override public String toString() { - return "Server.Subscriber{" + "ln=" + ln + "}"; + return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + "{ln=" + ln + "}"; } }