From 6e0a4be46e24a2844036d9f710acbb171bc71730 Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Sun, 6 Mar 2016 10:38:34 -0800 Subject: [PATCH] APEXCORE-375 - Container killed because of Out of Sequence tuple error. --- .../bufferserver/internal/LogicalNode.java | 24 ++++++----- .../bufferserver/server/Server.java | 42 +++++++++++-------- 2 files changed, 38 insertions(+), 28 deletions(-) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index f867d694a5..3953c3a4d3 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -156,18 +156,20 @@ public boolean isReady() */ public void catchUp() { - long lBaseSeconds = (long)iterator.getBaseSeconds() << 32; - logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), Codec.getStringWindowId(lBaseSeconds)); - if (lBaseSeconds > baseSeconds) { - baseSeconds = lBaseSeconds; - } - logger.debug("Set the base seconds to {}", Codec.getStringWindowId(baseSeconds)); - int intervalMillis; - - int skippedPayloadTuples = 0; - + caughtup = false; if (isReady()) { logger.debug("catching up {}->{}", upstream, group); + + long lBaseSeconds = (long)iterator.getBaseSeconds() << 32; + logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), Codec.getStringWindowId(lBaseSeconds)); + if (lBaseSeconds > baseSeconds) { + baseSeconds = lBaseSeconds; + } + logger.debug("Set the base seconds to {}", Codec.getStringWindowId(baseSeconds)); + int intervalMillis; + + int skippedPayloadTuples = 0; + try { /* * fast forward to catch up with the windowId without consuming @@ -337,8 +339,8 @@ public void boot(EventLoop eventloop) { for (PhysicalNode pn : physicalNodes) { eventloop.disconnect(pn.getClient()); - physicalNodes.clear(); } + physicalNodes.clear(); } @Override diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index a78b136435..89561f3c45 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -219,14 +219,15 @@ private void handleResetRequest(ResetRequestTuple request, final AbstractLengthP * @param connection * @return */ - public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, AbstractLengthPrependerClient connection) + public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, + final AbstractLengthPrependerClient connection) { String identifier = request.getIdentifier(); String type = request.getStreamType(); String upstream_identifier = request.getUpstreamIdentifier(); // Check if there is a logical node of this type, if not create it. - LogicalNode ln; + final LogicalNode ln; if (subscriberGroups.containsKey(type)) { //logger.debug("adding to exiting group = {}", subscriberGroups.get(type)); /* @@ -238,15 +239,23 @@ public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, Abstra } ln = subscriberGroups.get(type); - ln.boot(eventloop); - ln.addConnection(connection); + serverHelperExecutor.submit(new Runnable() + { + @Override + public void run() + { + ln.boot(eventloop); + ln.addConnection(connection); + ln.catchUp(); + } + }); } else { /* * if there is already a datalist registered for the type in which this client is interested, * then get a iterator on the data items of that data list. If the datalist is not registered, * then create one and register it. Hopefully this one would be used by future upstream nodes. */ - DataList dl; + final DataList dl; if (publisherBuffers.containsKey(upstream_identifier)) { dl = publisherBuffers.get(upstream_identifier); //logger.debug("old list = {}", dl); @@ -270,8 +279,16 @@ public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, Abstra } subscriberGroups.put(type, ln); - ln.addConnection(connection); - dl.addDataListener(ln); + serverHelperExecutor.submit(new Runnable() + { + @Override + public void run() + { + ln.addConnection(connection); + ln.catchUp(); + dl.addDataListener(ln); + } + }); } return ln; @@ -460,16 +477,7 @@ public int readSize() key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); subscriber.registered(key); - final LogicalNode logicalNode = handleSubscriberRequest(subscriberRequest, subscriber); - serverHelperExecutor.submit(new Runnable() - { - @Override - public void run() - { - logicalNode.catchUp(); - } - - }); + handleSubscriberRequest(subscriberRequest, subscriber); break; case PURGE_REQUEST: