From 5e7b41fb4f446aaf705c1af70764814bedfe6c20 Mon Sep 17 00:00:00 2001 From: "basti.lj" Date: Wed, 30 Aug 2017 10:45:17 +0800 Subject: [PATCH 1/2] jstorm-runner: Fix the bug that max waiting time is missing on local mode --- .../java/org/apache/beam/runners/jstorm/JStormRunnerResult.java | 1 + 1 file changed, 1 insertion(+) diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java index 4b1850e3b37a..4bc0e3538a6d 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java @@ -74,6 +74,7 @@ private static class LocalJStormPipelineResult extends JStormRunnerResult { long localModeExecuteTimeSecs) { super(topologyName, config); this.localCluster = checkNotNull(localCluster, "localCluster"); + this.localModeExecuteTimeSecs = localModeExecuteTimeSecs; } @Override From 99c06fb3436a20fc77e6d4810d4bfdebd9f821de Mon Sep 17 00:00:00 2001 From: "basti.lj" Date: Wed, 30 Aug 2017 10:45:45 +0800 Subject: [PATCH 2/2] jstorm-runner: Fix incorrect updating of counter metrics --- .../apache/beam/runners/jstorm/translation/MetricsReporter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java index e7f3285dd54b..0315a59ad9b1 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java @@ -72,7 +72,7 @@ private void updateCounters(Iterable> counters) { AsmCounter counter = metricClient.registerCounter(metricName); Long incValue = (oldValue == null ? updateValue : updateValue - oldValue); counter.update(incValue); - reportedCounters.put(metricName, incValue); + reportedCounters.put(metricName, updateValue); } } }