From 7ce2a3b615a8d6833800b3e9286c88547ed39201 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Tue, 15 May 2018 15:13:09 +0200 Subject: [PATCH] STORM-3073: Uncap pendingEmits for bolt executors, and prevent LoadSpout from overflowing pendingEmits in spout executors --- .../main/java/org/apache/storm/loadgen/LoadSpout.java | 11 +++++++++-- .../src/jvm/org/apache/storm/executor/Executor.java | 2 +- .../apache/storm/executor/spout/SpoutExecutor.java | 3 ++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java index 95ade594dcd..ef454c59298 100644 --- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java +++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java @@ -18,11 +18,13 @@ package org.apache.storm.loadgen; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.stream.Collectors; import org.apache.storm.metrics.hdrhistogram.HistogramMetric; import org.apache.storm.spout.SpoutOutputCollector; @@ -71,6 +73,7 @@ public void done() { //This is an attempt to give all of the streams an equal opportunity to emit something. private long nextStreamCounter = 0; private final int numStreams; + private final Queue replays = new ArrayDeque<>(); /** * Create a simple load spout with just a set rate per second on the default stream. @@ -99,6 +102,11 @@ public void open(Map conf, TopologyContext context, SpoutOutputC @Override public void nextTuple() { + if (!replays.isEmpty()) { + SentWithTime swt = replays.poll(); + collector.emit(swt.streamName, swt.keyValue, swt); + return; + } int size = numStreams; for (int tries = 0; tries < size; tries++) { int index = Math.abs((int) (nextStreamCounter++ % size)); @@ -131,7 +139,6 @@ public void ack(Object id) { @Override public void fail(Object id) { - SentWithTime swt = (SentWithTime)id; - collector.emit(swt.streamName, swt.keyValue, swt); + replays.add((SentWithTime)id); } } diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java index 57b61e981ff..44e0212ae72 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java @@ -108,7 +108,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer { protected final Boolean hasEventLoggers; protected final boolean ackingEnabled; protected final ErrorReportingMetrics errorReportingMetrics; - protected final MpscChunkedArrayQueue pendingEmits = new MpscChunkedArrayQueue<>(1024); + protected final MpscChunkedArrayQueue pendingEmits = new MpscChunkedArrayQueue<>(1024, (int)Math.pow(2, 30)); private final AddressedTuple flushTuple; protected ExecutorTransfer executorTransfer; protected ArrayList idToTask; diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java index 9750264ba2d..ff1d05eaab8 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java @@ -378,5 +378,6 @@ public int getSpoutRecvqCheckSkipCount() { public long getThreadId() { return threadId; - } + } + }