From 43492000a49e81b6d9a2420148fb2df1735301b0 Mon Sep 17 00:00:00 2001 From: "basti.lj" Date: Fri, 8 Sep 2017 12:19:49 +0800 Subject: [PATCH] JStorm-runner: Performance improvement 1. remove some logs on critical path 2. register "TimestampedValue" in Kryo to reduce the serialized size of event value --- .../serialization/BeamUtilsSerializer.java | 2 ++ .../runners/jstorm/translation/DoFnExecutor.java | 3 +-- .../jstorm/translation/ExecutorsBolt.java | 1 - .../jstorm/translation/WindowAssignExecutor.java | 16 +++++++--------- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java index db1f037c7be8..8061a9f24ac7 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Instant; /** @@ -110,5 +111,6 @@ public static void registerSerializers(Config config) { Lists.newArrayList(w1), PaneInfo.NO_FIRING).getClass()); config.registerSerialization(WindowedValue.of(null, Instant.now(), Lists.newArrayList(w1, w2), PaneInfo.NO_FIRING).getClass()); + config.registerSerialization(TimestampedValue.class); } } diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java index 5425b6cdcd0a..4b021a355aa1 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java @@ -214,7 +214,6 @@ public void process(TupleTag tag, WindowedValue elem) { } protected void processMainInput(WindowedValue elem) { - LOG.debug(String.format("Main input: tag=%s, elem=%s", mainInputTag, elem)); if (sideInputs.isEmpty()) { runner.processElement((WindowedValue) elem); } else { @@ -236,7 +235,7 @@ protected void processMainInput(WindowedValue elem) { } protected void processSideInput(TupleTag tag, WindowedValue elem) { - LOG.debug(String.format("Side inputs: tag=%s, elem=%s.", tag, elem)); + LOG.debug("Side inputs: tag={}, elem={}.", tag, elem); PCollectionView sideInputView = sideInputTagToView.get(tag); sideInputHandler.addSideInputValue(sideInputView, elem); diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java index aca2ca4e594e..1e9a4ff678be 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java @@ -295,7 +295,6 @@ private void processElement(List values, String streamId) { public void processExecutorElem(TupleTag inputTag, WindowedValue elem) { if (elem != null) { - LOG.debug("ProcessExecutorElem: value={} from tag={}", elem.getValue(), inputTag); Executor executor = inputTagToExecutor.get(inputTag); if (executor != null) { executor.process(inputTag, elem); diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java index 832c95c406dd..ffbfb1b63c01 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.jstorm.translation; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.common.collect.Iterables; import java.util.Collection; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -46,13 +44,13 @@ class JStormAssignContext JStormAssignContext(WindowFn fn, WindowedValue value) { fn.super(); - checkArgument( - Iterables.size(value.getWindows()) == 1, - String.format( - "%s passed to window assignment must be in a single window, but it was in %s: %s", - WindowedValue.class.getSimpleName(), - Iterables.size(value.getWindows()), - value.getWindows())); + if (value.getWindows().size() != 1) { + throw new IllegalArgumentException(String.format( + "%s passed to window assignment must be in a single window, but it was in %s: %s", + WindowedValue.class.getSimpleName(), + Iterables.size(value.getWindows()), + value.getWindows())); + } this.value = value; }