From 33812f65712f9965d7e6140d5da638fafb24cc8e Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Thu, 8 Jun 2017 14:08:48 -0700 Subject: [PATCH] APEXCORE-745 Buffer server may stop processing tuples when backpressure is enabled --- .../bufferserver/internal/DataList.java | 38 ++++++++++++++----- .../bufferserver/internal/LogicalNode.java | 2 +- .../bufferserver/server/Server.java | 1 - 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 5813b56d22..69efc045a6 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -32,6 +32,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.common.util.ToStringStyle; +import org.apache.commons.lang.builder.ToStringBuilder; + import com.datatorrent.bufferserver.packet.BeginWindowTuple; import com.datatorrent.bufferserver.packet.MessageType; import com.datatorrent.bufferserver.packet.ResetWindowTuple; @@ -55,6 +58,8 @@ */ public class DataList { + private static final Logger logger = LoggerFactory.getLogger(DataList.class); + private final int MAX_COUNT_OF_INMEM_BLOCKS; protected final String identifier; private final int blockSize; @@ -291,7 +296,12 @@ public void flush(final int writeOffset) public void notifyListeners() { - listenersNotifier.moreDataAvailable(); + try { + listenersNotifier.moreDataAvailable(); + } catch (RuntimeException e) { + logger.warn("{}", listenersNotifier, e); + } + logger.debug("{} notified", listenersNotifier); } public void setAutoFlushExecutor(final ExecutorService es) @@ -359,6 +369,7 @@ public void addDataListener(DataListener dl) set.add(dl); } + listenersNotifier.run(); } public void removeDataListener(DataListener dl) @@ -536,7 +547,7 @@ public Status getStatus() @Override public String toString() { - return getClass().getName() + '@' + Integer.toHexString(hashCode()) + " {" + identifier + '}'; + return new ToStringBuilder(this, ToStringStyle.DEFAULT).append("identifier", identifier).toString(); } /** @@ -1123,7 +1134,7 @@ private void moreDataAvailable() final Future future = this.future; if (future == null || future.isDone() || future.isCancelled()) { // Do not schedule a new task if there is an existing one that is still running or is waiting in the queue - this.future = autoFlushExecutor.submit(listenersNotifier); + this.future = autoFlushExecutor.submit(this); } else { synchronized (this) { if (this.future == null) { @@ -1143,7 +1154,7 @@ private boolean addedData() try { doesAtLeastOneListenerHaveDataToSend |= dl.addedData(false); } catch (RuntimeException e) { - logger.error("{}: removing DataListener {} due to exception", DataList.this, dl, e); + logger.warn("{} removing {} due to exception", this, dl, e); removeDataListener(dl); break; } @@ -1159,7 +1170,7 @@ private boolean checkIfListenersHaveDataToSendOnly() return true; } } catch (RuntimeException e) { - logger.error("{}: removing DataListener {} due to exception", DataList.this, dl, e); + logger.warn("{} removing {} due to exception", this, dl, e); removeDataListener(dl); return checkIfListenersHaveDataToSendOnly(); } @@ -1170,6 +1181,7 @@ private boolean checkIfListenersHaveDataToSendOnly() @Override public void run() { + logger.debug("{} entered run", this); try { if (addedData() || checkIfListenersHaveDataToSendOnly()) { future = autoFlushExecutor.submit(this); @@ -1183,11 +1195,19 @@ public void run() } } } - } catch (Exception e) { - logger.error("{}", DataList.this, e); + } catch (RuntimeException e) { + logger.warn("{}", this, e); + } finally { + logger.debug("{} exiting run", this); } } - } - private static final Logger logger = LoggerFactory.getLogger(DataList.class); + @Override + public String toString() + { + return new ToStringBuilder(this, ToStringStyle.DEFAULT).append(DataList.this) + .append("future", future == null ? null : future.getClass().getSimpleName() + '@' + Integer.toHexString(System.identityHashCode(future))) + .append("isMoreDataAvailable", isMoreDataAvailable).toString(); + } + } } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index b06e60a1b8..3e8846d1e9 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -151,7 +151,7 @@ public boolean isReady() /** * */ - public void catchUp() + private void catchUp() { caughtup = false; if (isReady()) { diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 857e51e0d2..c5700f2690 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -648,7 +648,6 @@ public void run() { final DataList dl = publisherBuffers.get(ln.getUpstream()); if (dl != null) { - ln.catchUp(); dl.addDataListener(ln); } else { logger.error("Disconnecting {} with no matching data list.", this);