From cfb710d688701f7fbbf61d6e41c29397e9e3fabe Mon Sep 17 00:00:00 2001 From: Prateek Maheshwari Date: Wed, 26 Apr 2017 15:35:38 -0700 Subject: [PATCH 1/4] SAMZA-1219 Add metrics for operator message received and execution times --- .../samza/operators/impl/OperatorImpl.java | 130 ++++++++-- .../operators/impl/OperatorImplGraph.java | 33 ++- .../impl/PartialJoinOperatorImpl.java | 39 ++- .../operators/impl/RootOperatorImpl.java | 36 ++- .../impl/SessionWindowOperatorImpl.java | 52 ---- .../operators/impl/SinkOperatorImpl.java | 25 +- .../operators/impl/StreamOperatorImpl.java | 27 ++- .../operators/impl/WindowOperatorImpl.java | 48 +++- .../samza/operators/spec/OperatorSpec.java | 10 +- .../spec/PartialJoinOperatorSpec.java | 8 - .../operators/spec/SinkOperatorSpec.java | 7 - .../operators/spec/StreamOperatorSpec.java | 7 - .../operators/spec/WindowOperatorSpec.java | 9 - .../apache/samza/task/StreamOperatorTask.java | 8 +- .../samza/operators/TestJoinOperator.java | 12 +- .../samza/operators/TestWindowOperator.java | 6 +- .../operators/impl/TestOperatorImpl.java | 226 ++++++++++++++---- .../operators/impl/TestOperatorImpls.java | 29 ++- .../operators/impl/TestSinkOperatorImpl.java | 7 +- .../impl/TestStreamOperatorImpl.java | 22 +- .../test/operator/RepartitionWindowApp.java | 3 +- 21 files changed, 503 insertions(+), 241 deletions(-) delete mode 100644 samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index b9a606b9fe..042dfd96fc 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -18,9 +18,17 @@ */ package org.apache.samza.operators.impl; +import org.apache.samza.config.Config; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.Timer; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -29,60 +37,136 @@ * Abstract base class for all stream operator implementations. */ public abstract class OperatorImpl { + private static final String METRICS_GROUP = OperatorImpl.class.getName(); - private final Set> nextOperators = new HashSet<>(); + private Set> registeredOperators; + + private boolean initialized; + private Counter messageCounter; + private Timer handleMessageTimer; + private Timer handleTimerTimer; + + public OperatorImpl() {} + + /** + * Initialize this {@link OperatorImpl} and its user-defined functions. + * + * @param config the {@link Config} for the task + * @param context the {@link TaskContext} for the task + */ + public final void init(Config config, TaskContext context) { + String opName = getOpName(); + + if (initialized) { + throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opName)); + } + + this.registeredOperators = new HashSet<>(); + MetricsRegistry metricsRegistry = context.getMetricsRegistry(); + this.messageCounter = metricsRegistry.newCounter(METRICS_GROUP, opName + "-messages"); + this.handleMessageTimer = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-message-ns"); + this.handleTimerTimer = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-timer-ns"); + + doInit(config, context); + + initialized = true; + } /** - * Register the next operator in the chain that this operator should propagate its output to. - * @param nextOperator the next operator in the chain. + * Initialize this {@link OperatorImpl} and its user-defined functions. + * + * @param config the {@link Config} for the task + * @param context the {@link TaskContext} for the task + */ + protected abstract void doInit(Config config, TaskContext context); + + /** + * Register an operator that this operator should propagate its results to. + * + * @param nextOperator the next operator to propagate results to */ void registerNextOperator(OperatorImpl nextOperator) { - nextOperators.add(nextOperator); + if (!initialized) { + throw new IllegalStateException( + String.format("Attempted to register next operator before initializing operator %s.", getOpName())); + } + this.registeredOperators.add(nextOperator); } /** - * Perform the transformation required for this operator and call the downstream operators. + * Handle the incoming {@code message} for this {@link OperatorImpl} and propagate results to registered operators. + *

+ * Delegates to {@link #handleMessage(Object, MessageCollector, TaskCoordinator)} for handling the message. * - * Must call {@link #propagateResult} to propagate the output to registered downstream operators correctly. + * @param message the input message + * @param collector the {@link MessageCollector} for this message + * @param coordinator the {@link TaskCoordinator} for this message + */ + public final void onMessage(M message, MessageCollector collector, TaskCoordinator coordinator) { + this.messageCounter.inc(); + long startNs = System.nanoTime(); + Collection results = handleMessage(message, collector, coordinator); + long endNs = System.nanoTime(); + this.handleMessageTimer.update(endNs - startNs); + + results.forEach(rm -> + this.registeredOperators.forEach(op -> + op.onMessage(rm, collector, coordinator))); + } + + /** + * Handle the incoming {@code message} and return the results to be propagated to registered operators. * * @param message the input message * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context + * @return results of the transformation */ - public abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator); + protected abstract Collection handleMessage(M message, MessageCollector collector, + TaskCoordinator coordinator); /** - * Invoked at every tick. This method delegates to {@link #onTimer(MessageCollector, TaskCoordinator)} + * Handle timer ticks for this {@link OperatorImpl} and propagate the results and timer tick to registered operators. + *

+ * Delegates to {@link #handleTimer(MessageCollector, TaskCoordinator)} for handling the timer tick. * * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context */ - public final void onTick(MessageCollector collector, TaskCoordinator coordinator) { - onTimer(collector, coordinator); - nextOperators.forEach(sub -> sub.onTick(collector, coordinator)); + public final void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + long startNs = System.nanoTime(); + Collection results = handleTimer(collector, coordinator); + long endNs = System.nanoTime(); + this.handleTimerTimer.update(endNs - startNs); + + results.forEach(rm -> + this.registeredOperators.forEach(op -> + op.onMessage(rm, collector, coordinator))); + this.registeredOperators.forEach(op -> op.onTimer(collector, coordinator)); } /** - * Invoked at every tick. Implementations must call {@link #propagateResult} to propagate any generated output - * to registered downstream operators. + * Handle the the timer tick for this operator and return the results to be propagated to registered operators. + *

+ * Defaults to a no-op implementation. * * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context + * @return results of the timed operation */ - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + protected Collection handleTimer(MessageCollector collector, TaskCoordinator coordinator) { + return Collections.emptyList(); } /** - * Helper method to propagate the output of this operator to all registered downstream operators. - * - * This method must be called from {@link #onNext} and {@link #onTimer} - * to propagate the operator output correctly. + * Get the {@link OperatorSpec} for this {@link OperatorImpl}. * - * @param outputMessage output message - * @param collector the {@link MessageCollector} in the context - * @param coordinator the {@link TaskCoordinator} in the context + * @return the {@link OperatorSpec} for this {@link OperatorImpl} */ - void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) { - nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator)); + protected abstract OperatorSpec getOpSpec(); + + private String getOpName() { + OperatorSpec opSpec = getOpSpec(); + return String.format("%s-%s", opSpec.getOpCode().name().toLowerCase(), opSpec.getOpId()); } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 8e492dc966..e2b1cd5aaa 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -104,20 +104,22 @@ public Collection getAllRootOperators() { * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node. * * @param source the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for - * @param the type of messagess in the {@code source} {@link MessageStreamImpl} * @param config the {@link Config} required to instantiate operators * @param context the {@link TaskContext} required to instantiate operators + * @param the type of messages in the {@code source} {@link MessageStreamImpl} * @return root node for the {@link OperatorImpl} DAG */ - private RootOperatorImpl createOperatorImpls(MessageStreamImpl source, Config config, TaskContext context) { + private RootOperatorImpl createOperatorImpls(MessageStreamImpl source, + Config config, TaskContext context) { // since the source message stream might have multiple operator specs registered on it, // create a new root node as a single point of entry for the DAG. RootOperatorImpl rootOperator = new RootOperatorImpl<>(); + rootOperator.init(config, context); // create the pipeline/topology starting from the source source.getRegisteredOperatorSpecs().forEach(registeredOperator -> { - // pass in the source and context s.t. stateful stream operators can initialize their stores + // pass in the context so that operator implementations can initialize their functions OperatorImpl operatorImpl = - createAndRegisterOperatorImpl(registeredOperator, source, config, context); + createAndRegisterOperatorImpl(registeredOperator, config, context); rootOperator.registerNextOperator(operatorImpl); }); return rootOperator; @@ -127,27 +129,26 @@ private RootOperatorImpl createOperatorImpls(MessageStreamImpl source, * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding * {@link OperatorImpl}s. * - * @param operatorSpec the operatorSpec registered with the {@code source} - * @param source the source {@link MessageStreamImpl} - * @param type of input message + * @param operatorSpec the operatorSpec to create the {@link OperatorImpl} for * @param config the {@link Config} required to instantiate operators * @param context the {@link TaskContext} required to instantiate operators + * @param type of input message * @return the operator implementation for the operatorSpec */ private OperatorImpl createAndRegisterOperatorImpl(OperatorSpec operatorSpec, - MessageStreamImpl source, Config config, TaskContext context) { + Config config, TaskContext context) { if (!operatorImpls.containsKey(operatorSpec)) { - OperatorImpl operatorImpl = createOperatorImpl(source, operatorSpec, config, context); + OperatorImpl operatorImpl = createOperatorImpl(operatorSpec, config, context); if (operatorImpls.putIfAbsent(operatorSpec, operatorImpl) == null) { // this is the first time we've added the operatorImpl corresponding to the operatorSpec, // so traverse and initialize and register the rest of the DAG. // initialize the corresponding operator function - operatorSpec.init(config, context); + operatorImpl.init(config, context); MessageStreamImpl nextStream = operatorSpec.getNextStream(); if (nextStream != null) { Collection registeredSpecs = nextStream.getRegisteredOperatorSpecs(); registeredSpecs.forEach(registeredSpec -> { - OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context); + OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, config, context); operatorImpl.registerNextOperator(subImpl); }); } @@ -163,24 +164,22 @@ private RootOperatorImpl createOperatorImpls(MessageStreamImpl source, /** * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}. * - * @param source the source {@link MessageStreamImpl} - * @param type of input message * @param operatorSpec the immutable {@link OperatorSpec} definition. * @param config the {@link Config} required to instantiate operators * @param context the {@link TaskContext} required to instantiate operators + * @param type of input message * @return the {@link OperatorImpl} implementation instance */ - private OperatorImpl createOperatorImpl(MessageStreamImpl source, - OperatorSpec operatorSpec, Config config, TaskContext context) { + private OperatorImpl createOperatorImpl(OperatorSpec operatorSpec, Config config, TaskContext context) { if (operatorSpec instanceof StreamOperatorSpec) { StreamOperatorSpec streamOpSpec = (StreamOperatorSpec) operatorSpec; - return new StreamOperatorImpl<>(streamOpSpec, source, config, context); + return new StreamOperatorImpl<>(streamOpSpec, config, context); } else if (operatorSpec instanceof SinkOperatorSpec) { return new SinkOperatorImpl<>((SinkOperatorSpec) operatorSpec, config, context); } else if (operatorSpec instanceof WindowOperatorSpec) { return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock); } else if (operatorSpec instanceof PartialJoinOperatorSpec) { - return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context, clock); + return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, config, context, clock); } throw new IllegalArgumentException( String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName())); diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java index e4cb9c2325..1569606732 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -18,12 +18,10 @@ */ package org.apache.samza.operators.impl; -import java.util.ArrayList; -import java.util.List; import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction.PartialJoinMessage; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.PartialJoinOperatorSpec; import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueIterator; @@ -35,6 +33,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + /** * Implementation of a {@link PartialJoinOperatorSpec} that joins messages of type {@code M} in this stream * with buffered messages of type {@code JM} in the other stream. @@ -47,35 +50,41 @@ class PartialJoinOperatorImpl extends OperatorImpl { private static final Logger LOGGER = LoggerFactory.getLogger(PartialJoinOperatorImpl.class); + private final PartialJoinOperatorSpec partialJoinOpSpec; private final PartialJoinFunction thisPartialJoinFn; private final PartialJoinFunction otherPartialJoinFn; private final long ttlMs; - private final int opId; private final Clock clock; - PartialJoinOperatorImpl(PartialJoinOperatorSpec partialJoinOperatorSpec, MessageStreamImpl source, + PartialJoinOperatorImpl(PartialJoinOperatorSpec partialJoinOpSpec, Config config, TaskContext context, Clock clock) { - this.thisPartialJoinFn = partialJoinOperatorSpec.getThisPartialJoinFn(); - this.otherPartialJoinFn = partialJoinOperatorSpec.getOtherPartialJoinFn(); - this.ttlMs = partialJoinOperatorSpec.getTtlMs(); - this.opId = partialJoinOperatorSpec.getOpId(); + this.partialJoinOpSpec = partialJoinOpSpec; + this.thisPartialJoinFn = partialJoinOpSpec.getThisPartialJoinFn(); + this.otherPartialJoinFn = partialJoinOpSpec.getOtherPartialJoinFn(); + this.ttlMs = partialJoinOpSpec.getTtlMs(); this.clock = clock; } @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + protected void doInit(Config config, TaskContext context) { + this.thisPartialJoinFn.init(config, context); + } + + @Override + public Collection handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) { K key = thisPartialJoinFn.getKey(message); thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, clock.currentTimeMillis())); PartialJoinMessage otherMessage = otherPartialJoinFn.getState().get(key); long now = clock.currentTimeMillis(); if (otherMessage != null && otherMessage.getReceivedTimeMs() > now - ttlMs) { RM joinResult = thisPartialJoinFn.apply(message, otherMessage.getMessage()); - this.propagateResult(joinResult, collector, coordinator); + return Collections.singletonList(joinResult); } + return Collections.emptyList(); } @Override - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + public Collection handleTimer(MessageCollector collector, TaskCoordinator coordinator) { long now = clock.currentTimeMillis(); KeyValueStore> thisState = thisPartialJoinFn.getState(); @@ -94,7 +103,11 @@ public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { iterator.close(); thisState.deleteAll(keysToRemove); - LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, clock.currentTimeMillis() - now); + return Collections.emptyList(); } + @Override + protected OperatorSpec getOpSpec() { + return partialJoinOpSpec; + } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java index eb9b5e21f3..2d8387dd43 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java @@ -18,9 +18,16 @@ */ package org.apache.samza.operators.impl; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import java.util.Collection; +import java.util.Collections; + /** * A no-op operator implementation that forwards incoming messages to all of its subscribers. @@ -29,7 +36,32 @@ public final class RootOperatorImpl extends OperatorImpl { @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - this.propagateResult(message, collector, coordinator); + protected void doInit(Config config, TaskContext context) { + } + + @Override + public Collection handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) { + return Collections.singletonList(message); + } + + // TODO: SAMZA-1221 - Change to InputOperatorSpec that also builds the message + @Override + protected OperatorSpec getOpSpec() { + return new OperatorSpec() { + @Override + public MessageStreamImpl getNextStream() { + return null; + } + + @Override + public OpCode getOpCode() { + return OpCode.INPUT; + } + + @Override + public int getOpId() { + return -1; + } + }; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java deleted file mode 100644 index 2bb362c539..0000000000 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; - - -/** - * Default implementation class of a {@link WindowOperatorSpec} for a session window. - * - * @param the type of input message - * @param the type of window key - * @param the type of window state - */ -class SessionWindowOperatorImpl extends OperatorImpl> { - - private final WindowOperatorSpec windowSpec; - - SessionWindowOperatorImpl(WindowOperatorSpec windowSpec, MessageStreamImpl source, Config config, TaskContext context) { - this.windowSpec = windowSpec; - } - - @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - } - - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { - // This is to periodically check the timeout triggers to get the list of window states to be updated - } -} diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java index f92fbfbb33..026f249d7a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java @@ -20,27 +20,44 @@ import org.apache.samza.config.Config; import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import java.util.Collection; +import java.util.Collections; + /** * Implementation for {@link SinkOperatorSpec} */ class SinkOperatorImpl extends OperatorImpl { + private final SinkOperatorSpec sinkOpSpec; private final SinkFunction sinkFn; - SinkOperatorImpl(SinkOperatorSpec sinkOp, Config config, TaskContext context) { - this.sinkFn = sinkOp.getSinkFn(); + SinkOperatorImpl(SinkOperatorSpec sinkOpSpec, Config config, TaskContext context) { + this.sinkOpSpec = sinkOpSpec; + this.sinkFn = sinkOpSpec.getSinkFn(); + } + + @Override + protected void doInit(Config config, TaskContext context) { + this.sinkFn.init(config, context); } @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + public Collection handleMessage(M message, MessageCollector collector, + TaskCoordinator coordinator) { this.sinkFn.apply(message, collector, coordinator); // there should be no further chained operators since this is a terminal operator. - // hence we don't call #propogateResult() here. + return Collections.emptyList(); + } + + @Override + protected OperatorSpec getOpSpec() { + return sinkOpSpec; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java index 644de2031b..dbb2ea4cad 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java @@ -19,13 +19,15 @@ package org.apache.samza.operators.impl; import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import java.util.Collection; + /** * A StreamOperator that accepts a 1:n transform function and applies it to each incoming message. @@ -35,15 +37,28 @@ */ class StreamOperatorImpl extends OperatorImpl { + private final StreamOperatorSpec streamOpSpec; private final FlatMapFunction transformFn; - StreamOperatorImpl(StreamOperatorSpec streamOperatorSpec, MessageStreamImpl source, Config config, TaskContext context) { - this.transformFn = streamOperatorSpec.getTransformFn(); + StreamOperatorImpl(StreamOperatorSpec streamOpSpec, + Config config, TaskContext context) { + this.streamOpSpec = streamOpSpec; + this.transformFn = streamOpSpec.getTransformFn(); + } + + @Override + protected void doInit(Config config, TaskContext context) { + transformFn.init(config, context); + } + + @Override + public Collection handleMessage(M message, MessageCollector collector, + TaskCoordinator coordinator) { + return this.transformFn.apply(message); } @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - // call the transform function and then for each output call propagateResult() - this.transformFn.apply(message).forEach(r -> this.propagateResult(r, collector, coordinator)); + protected OperatorSpec getOpSpec() { + return streamOpSpec; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index cd3b1bcd95..fbaedbb37f 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -20,14 +20,16 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.config.Config; import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.operators.triggers.FiringType; import org.apache.samza.operators.triggers.RepeatingTriggerImpl; import org.apache.samza.operators.triggers.TimeTrigger; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.TriggerImpl; import org.apache.samza.operators.triggers.TriggerImpls; -import org.apache.samza.operators.triggers.FiringType; import org.apache.samza.operators.util.InternalInMemoryStore; import org.apache.samza.operators.windows.AccumulationMode; import org.apache.samza.operators.windows.WindowKey; @@ -36,6 +38,7 @@ import org.apache.samza.operators.windows.internal.WindowType; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.util.Clock; import org.slf4j.Logger; @@ -43,6 +46,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,22 +78,35 @@ public class WindowOperatorImpl extends OperatorImpl windowOpSpec; + private final Clock clock; private final WindowInternal window; private final KeyValueStore, WindowState> store = new InternalInMemoryStore<>(); - TriggerScheduler triggerScheduler ; + private TriggerScheduler triggerScheduler; + + // Results to be returned for the current handleMessage and handleTimer call. + private List> currentResults = new ArrayList<>(); // The trigger state corresponding to each {@link TriggerKey}. private final Map, TriggerImplHandler> triggers = new HashMap<>(); - private final Clock clock; - public WindowOperatorImpl(WindowOperatorSpec spec, Clock clock) { + public WindowOperatorImpl(WindowOperatorSpec windowOpSpec, Clock clock) { + this.windowOpSpec = windowOpSpec; this.clock = clock; - this.window = spec.getWindow(); + this.window = windowOpSpec.getWindow(); this.triggerScheduler= new TriggerScheduler(clock); } @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + protected void doInit(Config config, TaskContext context) { + WindowInternal window = windowOpSpec.getWindow(); + if (window.getFoldLeftFunction() != null) { + window.getFoldLeftFunction().init(config, context); + } + } + + @Override + public Collection> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) { LOG.trace("Processing message envelope: {}", message); WindowKey storeKey = getStoreKey(message); @@ -111,10 +128,14 @@ public void onNext(M message, MessageCollector collector, TaskCoordinator coordi getOrCreateTriggerImplWrapper(triggerKey, window.getDefaultTrigger()) .onMessage(triggerKey, message, collector, coordinator); } + + List> results = currentResults; + currentResults = new ArrayList<>(); + return Collections.unmodifiableList(results); } @Override - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + public Collection> handleTimer(MessageCollector collector, TaskCoordinator coordinator) { List> keys = triggerScheduler.runPendingCallbacks(); for (TriggerKey key : keys) { @@ -124,6 +145,14 @@ public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { } } + List> results = currentResults; + currentResults = new ArrayList<>(); + return Collections.unmodifiableList(results); + } + + @Override + protected OperatorSpec> getOpSpec() { + return windowOpSpec; } /** @@ -200,7 +229,7 @@ private void onTriggerFired(TriggerKey triggerKey, MessageCollector collecto } WindowPane paneOutput = computePaneOutput(triggerKey, state); - super.propagateResult(paneOutput, collector, coordinator); + currentResults.add(paneOutput); // Handle accumulation modes. if (window.getAccumulationMode() == AccumulationMode.DISCARDING) { @@ -315,5 +344,4 @@ public boolean isRepeating() { return this.impl instanceof RepeatingTriggerImpl; } } - } \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index 18090e24dd..9699ced020 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -19,9 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.task.TaskContext; /** @@ -34,6 +32,7 @@ public interface OperatorSpec { enum OpCode { + INPUT, MAP, FLAT_MAP, FILTER, @@ -63,11 +62,4 @@ enum OpCode { */ int getOpId(); - /** - * Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP. - * - * @param config the {@link Config} object for this task - * @param context the {@link TaskContext} object for this task - */ - default void init(Config config, TaskContext context) { } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java index b1dc529321..e85626f762 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java @@ -18,10 +18,8 @@ */ package org.apache.samza.operators.spec; -import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.task.TaskContext; /** @@ -88,10 +86,4 @@ public OperatorSpec.OpCode getOpCode() { public int getOpId() { return this.opId; } - - @Override - public void init(Config config, TaskContext context) { - this.thisPartialJoinFn.init(config, context); - } - } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java index 7de85f357c..0d135d341a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java @@ -18,14 +18,12 @@ */ package org.apache.samza.operators.spec; -import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.stream.OutputStreamInternal; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; @@ -101,11 +99,6 @@ public int getOpId() { return this.opId; } - @Override - public void init(Config config, TaskContext context) { - this.sinkFn.init(config, context); - } - /** * Creates a {@link SinkFunction} to send messages to the provided {@code output}. * @param outputStream the {@link OutputStreamInternal} to send messages to diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java index f9bbe2d3b1..204e566438 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java @@ -18,10 +18,8 @@ */ package org.apache.samza.operators.spec; -import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.task.TaskContext; /** @@ -71,9 +69,4 @@ public OperatorSpec.OpCode getOpCode() { public int getOpId() { return this.opId; } - - @Override - public void init(Config config, TaskContext context) { - this.transformFn.init(config, context); - } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 9515e38ca2..73b17b54bb 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -19,11 +19,9 @@ package org.apache.samza.operators.spec; -import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; -import org.apache.samza.task.TaskContext; /** @@ -52,13 +50,6 @@ public class WindowOperatorSpec implements OperatorSpec> getNextStream() { return this.nextStream; diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index be525650d2..4720298d2d 100644 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -118,17 +118,19 @@ public final void init(Config config, TaskContext context) throws Exception { public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) { SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream(); InputStreamInternal inputStream = inputSystemStreamToInputStream.get(systemStream); - // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde before applying the msgBuilder. RootOperatorImpl rootOperatorImpl = operatorImplGraph.getRootOperator(systemStream); if (rootOperatorImpl != null) { - rootOperatorImpl.onNext(inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage()), collector, coordinator); + // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde + // before applying the msgBuilder. + Object message = inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage()); + rootOperatorImpl.onMessage(message, collector, coordinator); } } @Override public final void window(MessageCollector collector, TaskCoordinator coordinator) { operatorImplGraph.getAllRootOperators() - .forEach(rootOperator -> rootOperator.onTick(collector, coordinator)); + .forEach(rootOperator -> rootOperator.onTimer(collector, coordinator)); } @Override diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java index 7a6f9599e9..23b67aa01d 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java @@ -19,13 +19,10 @@ package org.apache.samza.operators; import com.google.common.collect.ImmutableSet; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; import org.apache.samza.Partition; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.IncomingMessageEnvelope; @@ -42,6 +39,11 @@ import org.apache.samza.util.SystemClock; import org.junit.Test; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -237,6 +239,8 @@ private StreamOperatorTask createStreamOperatorTask(Clock clock) throws Exceptio when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("insystem", "instream", new Partition(0)), new SystemStreamPartition("insystem2", "instream2", new Partition(0)))); + when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + Config config = mock(Config.class); StreamApplication sgb = new TestStreamApplication(); diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java index 660313703d..597244e2c6 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java +++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java @@ -26,9 +26,8 @@ import org.apache.samza.Partition; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.triggers.FiringType; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.testUtils.TestClock; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.AccumulationMode; @@ -36,11 +35,13 @@ import org.apache.samza.operators.windows.Windows; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamOperatorTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.testUtils.TestClock; import org.junit.Before; import org.junit.Test; @@ -71,6 +72,7 @@ public void setup() throws Exception { runner = mock(ApplicationRunner.class); when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet .of(new SystemStreamPartition("kafka", "integers", new Partition(0)))); + when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); when(runner.getStreamSpec("integer-stream")).thenReturn(new StreamSpec("integer-stream", "integers", "kafka")); } diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index f978c3cca1..47ca0254cd 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -18,61 +18,205 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.data.TestMessageEnvelope; -import org.apache.samza.operators.data.TestOutputMessageEnvelope; +import org.apache.samza.config.Config; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.metrics.Timer; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; -import org.hamcrest.core.IsEqual; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.argThat; +import java.util.Collection; +import java.util.Collections; + +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestOperatorImpl { - TestMessageEnvelope curInputMsg; - MessageCollector curCollector; - TaskCoordinator curCoordinator; + @Test(expected = IllegalStateException.class) + public void testMultipleInitShouldThrow() { + OperatorImpl opImpl = new TestOpImpl(mock(Object.class)); + TaskContext mockTaskContext = mock(TaskContext.class); + when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + opImpl.init(mock(Config.class), mockTaskContext); + opImpl.init(mock(Config.class), mockTaskContext); + } + + @Test(expected = IllegalStateException.class) + public void testRegisterNextOperatorBeforeInitShouldThrow() { + OperatorImpl opImpl = new TestOpImpl(mock(Object.class)); + opImpl.registerNextOperator(mock(OperatorImpl.class)); + } + + @Test + public void testOnMessagePropagatesResults() { + TaskContext mockTaskContext = mock(TaskContext.class); + when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + + Object mockTestOpImplOutput = mock(Object.class); + OperatorImpl opImpl = new TestOpImpl(mockTestOpImplOutput); + opImpl.init(mock(Config.class), mockTaskContext); + + // register a couple of operators + OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class); + when(mockNextOpImpl1.getOpSpec()).thenReturn(new TestOpSpec()); + when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); + mockNextOpImpl1.init(mock(Config.class), mockTaskContext); + opImpl.registerNextOperator(mockNextOpImpl1); + + OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class); + when(mockNextOpImpl2.getOpSpec()).thenReturn(new TestOpSpec()); + when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); + mockNextOpImpl2.init(mock(Config.class), mockTaskContext); + opImpl.registerNextOperator(mockNextOpImpl2); + + // send a message to this operator + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + opImpl.onMessage(mock(Object.class), mockCollector, mockCoordinator); + + // verify that it propagates its handleMessage results to next operators + verify(mockNextOpImpl1, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator); + verify(mockNextOpImpl2, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator); + } + + @Test + public void testOnMessageUpdatesMetrics() { + TaskContext mockTaskContext = mock(TaskContext.class); + MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class); + when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry); + Counter mockCounter = mock(Counter.class); + Timer mockTimer = mock(Timer.class); + when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockCounter); + when(mockMetricsRegistry.newTimer(anyString(), anyString())).thenReturn(mockTimer); + + Object mockTestOpImplOutput = mock(Object.class); + OperatorImpl opImpl = new TestOpImpl(mockTestOpImplOutput); + opImpl.init(mock(Config.class), mockTaskContext); + + // send a message to this operator + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + opImpl.onMessage(mock(Object.class), mockCollector, mockCoordinator); + + // verify that it updates message count and timer metrics + verify(mockCounter, times(1)).inc(); + verify(mockTimer, times(1)).update(anyLong()); + } + + @Test + public void testOnTimerPropagatesResultsAndTimer() { + TaskContext mockTaskContext = mock(TaskContext.class); + when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); + + Object mockTestOpImplOutput = mock(Object.class); + OperatorImpl opImpl = new TestOpImpl(mockTestOpImplOutput); + opImpl.init(mock(Config.class), mockTaskContext); + + // register a couple of operators + OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class); + when(mockNextOpImpl1.getOpSpec()).thenReturn(new TestOpSpec()); + when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); + mockNextOpImpl1.init(mock(Config.class), mockTaskContext); + opImpl.registerNextOperator(mockNextOpImpl1); + + OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class); + when(mockNextOpImpl2.getOpSpec()).thenReturn(new TestOpSpec()); + when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); + mockNextOpImpl2.init(mock(Config.class), mockTaskContext); + opImpl.registerNextOperator(mockNextOpImpl2); + + // send a timer tick to this operator + MessageCollector mockCollector = mock(MessageCollector.class); + TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); + opImpl.onTimer(mockCollector, mockCoordinator); + + // verify that it propagates its handleTimer results to next operators + verify(mockNextOpImpl1, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator); + verify(mockNextOpImpl2, times(1)).handleMessage(mockTestOpImplOutput, mockCollector, mockCoordinator); + + // verify that it propagates the timer tick to next operators + verify(mockNextOpImpl1, times(1)).handleTimer(mockCollector, mockCoordinator); + verify(mockNextOpImpl2, times(1)).handleTimer(mockCollector, mockCoordinator); + } @Test - public void testSubscribers() { - this.curInputMsg = null; - this.curCollector = null; - this.curCoordinator = null; - OperatorImpl opImpl = new OperatorImpl() { - @Override - public void onNext(TestMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) { - TestOperatorImpl.this.curInputMsg = message; - TestOperatorImpl.this.curCollector = collector; - TestOperatorImpl.this.curCoordinator = coordinator; - } - @Override - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { - - } - - }; - // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext() - OperatorImpl mockSub = mock(OperatorImpl.class); - opImpl.registerNextOperator(mockSub); - TestOutputMessageEnvelope xOutput = mock(TestOutputMessageEnvelope.class); + public void testOnTimerUpdatesMetrics() { + TaskContext mockTaskContext = mock(TaskContext.class); + MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class); + when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry); + Counter mockMessageCounter = mock(Counter.class); + Timer mockTimer = mock(Timer.class); + when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockMessageCounter); + when(mockMetricsRegistry.newTimer(anyString(), anyString())).thenReturn(mockTimer); + + Object mockTestOpImplOutput = mock(Object.class); + OperatorImpl opImpl = new TestOpImpl(mockTestOpImplOutput); + opImpl.init(mock(Config.class), mockTaskContext); + + // send a message to this operator MessageCollector mockCollector = mock(MessageCollector.class); TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); - opImpl.propagateResult(xOutput, mockCollector, mockCoordinator); - verify(mockSub, times(1)).onNext( - argThat(new IsEqual<>(xOutput)), - argThat(new IsEqual<>(mockCollector)), - argThat(new IsEqual<>(mockCoordinator)) - ); - // verify onNext() is invoked correctly - TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class); - opImpl.onNext(mockInput, mockCollector, mockCoordinator); - assertEquals(mockInput, this.curInputMsg); - assertEquals(mockCollector, this.curCollector); - assertEquals(mockCoordinator, this.curCoordinator); + opImpl.onTimer(mockCollector, mockCoordinator); + + // verify that it updates metrics + verify(mockMessageCounter, times(0)).inc(); + verify(mockTimer, times(1)).update(anyLong()); + } + + private static class TestOpImpl extends OperatorImpl { + private final Object mockOutput; + + TestOpImpl(Object mockOutput) { + this.mockOutput = mockOutput; + } + + @Override + protected void doInit(Config config, TaskContext context) {} + + @Override + public Collection handleMessage(Object message, + MessageCollector collector, TaskCoordinator coordinator) { + return Collections.singletonList(mockOutput); + } + + @Override + public Collection handleTimer(MessageCollector collector, TaskCoordinator coordinator) { + return Collections.singletonList(mockOutput); + } + + @Override + protected OperatorSpec getOpSpec() { + return new TestOpSpec(); + } + } + + private static class TestOpSpec implements OperatorSpec { + @Override + public MessageStreamImpl getNextStream() { + return null; + } + + @Override + public OpCode getOpCode() { + return OpCode.INPUT; + } + + @Override + public int getOpId() { + return -1; + } } } + diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java index 267cdfcfb8..a75fadbfa1 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.impl; import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.TestMessageStreamImplUtil; @@ -26,11 +27,10 @@ import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.PartialJoinOperatorSpec; +import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.operators.windows.Windows; @@ -62,14 +62,15 @@ public class TestOperatorImpls { @Before public void prep() throws NoSuchFieldException, NoSuchMethodException { - nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators"); + nextOperatorsField = OperatorImpl.class.getDeclaredField("registeredOperators"); nextOperatorsField.setAccessible(true); - createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class, + createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl", OperatorSpec.class, Config.class, TaskContext.class); createOpMethod.setAccessible(true); - createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class); + createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, + Config.class, TaskContext.class); createOpsMethod.setAccessible(true); } @@ -79,13 +80,12 @@ public void testCreateOperator() throws NoSuchFieldException, IllegalAccessExcep WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class); WindowInternal windowInternal = new WindowInternal<>(null, null, null, null, null, WindowType.TUMBLING); when(mockWnd.getWindow()).thenReturn(windowInternal); - MessageStreamImpl mockStream = mock(MessageStreamImpl.class); Config mockConfig = mock(Config.class); TaskContext mockContext = mock(TaskContext.class); OperatorImplGraph opGraph = new OperatorImplGraph(); OperatorImpl opImpl = (OperatorImpl) - createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext); + createOpMethod.invoke(opGraph, mockWnd, mockConfig, mockContext); assertTrue(opImpl instanceof WindowOperatorImpl); Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window"); wndInternalField.setAccessible(true); @@ -96,7 +96,7 @@ public void testCreateOperator() throws NoSuchFieldException, IllegalAccessExcep StreamOperatorSpec mockSimpleOp = mock(StreamOperatorSpec.class); FlatMapFunction mockTxfmFn = mock(FlatMapFunction.class); when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn); - opImpl = (OperatorImpl) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext); + opImpl = (OperatorImpl) createOpMethod.invoke(opGraph, mockSimpleOp, mockConfig, mockContext); assertTrue(opImpl instanceof StreamOperatorImpl); Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn"); txfmFnField.setAccessible(true); @@ -106,7 +106,7 @@ public void testCreateOperator() throws NoSuchFieldException, IllegalAccessExcep SinkFunction sinkFn = (m, mc, tc) -> { }; SinkOperatorSpec sinkOp = mock(SinkOperatorSpec.class); when(sinkOp.getSinkFn()).thenReturn(sinkFn); - opImpl = (OperatorImpl) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext); + opImpl = (OperatorImpl) createOpMethod.invoke(opGraph, sinkOp, mockConfig, mockContext); assertTrue(opImpl instanceof SinkOperatorImpl); Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn"); sinkFnField.setAccessible(true); @@ -114,8 +114,7 @@ public void testCreateOperator() throws NoSuchFieldException, IllegalAccessExcep // get join operator PartialJoinOperatorSpec joinOp = mock(PartialJoinOperatorSpec.class); - PartialJoinFunction joinFn = mock(PartialJoinFunction.class); - opImpl = (OperatorImpl) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext); + opImpl = (OperatorImpl) createOpMethod.invoke(opGraph, joinOp, mockConfig, mockContext); assertTrue(opImpl instanceof PartialJoinOperatorImpl); } @@ -124,6 +123,7 @@ public void testEmptyChain() throws InvocationTargetException, IllegalAccessExce // test creation of empty chain MessageStreamImpl testStream = mock(MessageStreamImpl.class); TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); Config mockConfig = mock(Config.class); OperatorImplGraph opGraph = new OperatorImplGraph(); RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext); @@ -134,8 +134,9 @@ public void testEmptyChain() throws InvocationTargetException, IllegalAccessExce public void testLinearChain() throws IllegalAccessException, InvocationTargetException { // test creation of linear chain StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - MessageStreamImpl testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); + MessageStreamImpl testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); Config mockConfig = mock(Config.class); testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10))); OperatorImplGraph opGraph = new OperatorImplGraph(); @@ -154,8 +155,9 @@ public void testLinearChain() throws IllegalAccessException, InvocationTargetExc public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException { // test creation of broadcast chain StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - MessageStreamImpl testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); + MessageStreamImpl testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); Config mockConfig = mock(Config.class); testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } }); testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m); @@ -187,6 +189,7 @@ public void testJoinChain() throws IllegalAccessException, InvocationTargetExcep MessageStreamImpl input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); MessageStreamImpl input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph); TaskContext mockContext = mock(TaskContext.class); + when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap()); Config mockConfig = mock(Config.class); input1 .join(input2, diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java index abd7740303..1c01e57f97 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java @@ -27,7 +27,10 @@ import org.apache.samza.task.TaskCoordinator; import org.junit.Test; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestSinkOperatorImpl { @@ -44,7 +47,7 @@ public void testSinkOperator() { MessageCollector mockCollector = mock(MessageCollector.class); TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); - sinkImpl.onNext(mockMsg, mockCollector, mockCoordinator); + sinkImpl.handleMessage(mockMsg, mockCollector, mockCoordinator); verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator); } } diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java index 9dd161a6fb..36d7b92f06 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java @@ -18,10 +18,7 @@ */ package org.apache.samza.operators.impl; -import java.util.ArrayList; -import java.util.Collection; import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.data.TestMessageEnvelope; import org.apache.samza.operators.data.TestOutputMessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; @@ -31,7 +28,15 @@ import org.apache.samza.task.TaskCoordinator; import org.junit.Test; -import static org.mockito.Mockito.*; +import java.util.ArrayList; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestStreamOperatorImpl { @@ -42,10 +47,10 @@ public void testSimpleOperator() { StreamOperatorSpec mockOp = mock(StreamOperatorSpec.class); FlatMapFunction txfmFn = mock(FlatMapFunction.class); when(mockOp.getTransformFn()).thenReturn(txfmFn); - MessageStreamImpl mockInput = mock(MessageStreamImpl.class); Config mockConfig = mock(Config.class); TaskContext mockContext = mock(TaskContext.class); - StreamOperatorImpl opImpl = spy(new StreamOperatorImpl<>(mockOp, mockInput, mockConfig, mockContext)); + StreamOperatorImpl opImpl = + spy(new StreamOperatorImpl<>(mockOp, mockConfig, mockContext)); TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class); TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class); Collection mockOutputs = new ArrayList() { { @@ -54,8 +59,9 @@ public void testSimpleOperator() { when(txfmFn.apply(inMsg)).thenReturn(mockOutputs); MessageCollector mockCollector = mock(MessageCollector.class); TaskCoordinator mockCoordinator = mock(TaskCoordinator.class); - opImpl.onNext(inMsg, mockCollector, mockCoordinator); + Collection results = opImpl + .handleMessage(inMsg, mockCollector, mockCoordinator); verify(txfmFn, times(1)).apply(inMsg); - verify(opImpl, times(1)).propagateResult(outMsg, mockCollector, mockCoordinator); + assertEquals(results, mockOutputs); } } diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java index 1e2acb258d..006ac77140 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java @@ -44,7 +44,8 @@ public class RepartitionWindowApp implements StreamApplication { public void init(StreamGraph graph, Config config) { MessageStream pageViews = graph.getInputStream("page-views", (k, v) -> v); - Function keyFn = pageView -> new PageView(pageView).getUserId(); + Function keyFn = pageView -> + new PageView(pageView).getUserId(); OutputStream>> outputStream = graph .getOutputStream(TestRepartitionWindowApp.OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString()); From abb6fadbe7be28adf8ebaecd24027110b31fb231 Mon Sep 17 00:00:00 2001 From: Prateek Maheshwari Date: Wed, 26 Apr 2017 15:45:11 -0700 Subject: [PATCH 2/4] Minor cleanup. --- .../org/apache/samza/test/operator/RepartitionWindowApp.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java index 006ac77140..1e2acb258d 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java @@ -44,8 +44,7 @@ public class RepartitionWindowApp implements StreamApplication { public void init(StreamGraph graph, Config config) { MessageStream pageViews = graph.getInputStream("page-views", (k, v) -> v); - Function keyFn = pageView -> - new PageView(pageView).getUserId(); + Function keyFn = pageView -> new PageView(pageView).getUserId(); OutputStream>> outputStream = graph .getOutputStream(TestRepartitionWindowApp.OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString()); From 1805cd04f55fe53c1179efd5104a6f5b48a19994 Mon Sep 17 00:00:00 2001 From: Prateek Maheshwari Date: Thu, 27 Apr 2017 10:43:58 -0700 Subject: [PATCH 3/4] Addressed feedback from Jagadish. --- .../samza/operators/impl/OperatorImpl.java | 60 ++++++++-------- .../operators/impl/OperatorImplGraph.java | 3 +- .../impl/PartialJoinOperatorImpl.java | 13 ++-- .../operators/impl/RootOperatorImpl.java | 4 +- .../operators/impl/SinkOperatorImpl.java | 4 +- .../operators/impl/StreamOperatorImpl.java | 4 +- .../operators/impl/WindowOperatorImpl.java | 68 +++++++++++-------- .../samza/operators/spec/OperatorSpec.java | 8 +++ .../operators/impl/TestOperatorImpl.java | 12 ++-- 9 files changed, 103 insertions(+), 73 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 042dfd96fc..d547869c0d 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.impl; import org.apache.samza.config.Config; +import org.apache.samza.config.MetricsConfig; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; @@ -26,6 +27,7 @@ import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.util.HighResolutionClock; import java.util.Collection; import java.util.Collections; @@ -39,14 +41,12 @@ public abstract class OperatorImpl { private static final String METRICS_GROUP = OperatorImpl.class.getName(); - private Set> registeredOperators; - private boolean initialized; - private Counter messageCounter; - private Timer handleMessageTimer; - private Timer handleTimerTimer; - - public OperatorImpl() {} + private Set> registeredOperators; + private HighResolutionClock highResClock; + private Counter numMessage; + private Timer handleMessageNs; + private Timer handleTimerNs; /** * Initialize this {@link OperatorImpl} and its user-defined functions. @@ -55,19 +55,20 @@ public OperatorImpl() {} * @param context the {@link TaskContext} for the task */ public final void init(Config config, TaskContext context) { - String opName = getOpName(); + String opName = getOperatorSpec().getOpName(); if (initialized) { throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opName)); } - this.registeredOperators = new HashSet<>(); + this.highResClock = createHighResClock(config); + registeredOperators = new HashSet<>(); MetricsRegistry metricsRegistry = context.getMetricsRegistry(); - this.messageCounter = metricsRegistry.newCounter(METRICS_GROUP, opName + "-messages"); - this.handleMessageTimer = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-message-ns"); - this.handleTimerTimer = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-timer-ns"); + this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opName + "-messages"); + this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-message-ns"); + this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-timer-ns"); - doInit(config, context); + handleInit(config, context); initialized = true; } @@ -78,7 +79,7 @@ public final void init(Config config, TaskContext context) { * @param config the {@link Config} for the task * @param context the {@link TaskContext} for the task */ - protected abstract void doInit(Config config, TaskContext context); + protected abstract void handleInit(Config config, TaskContext context); /** * Register an operator that this operator should propagate its results to. @@ -88,7 +89,8 @@ public final void init(Config config, TaskContext context) { void registerNextOperator(OperatorImpl nextOperator) { if (!initialized) { throw new IllegalStateException( - String.format("Attempted to register next operator before initializing operator %s.", getOpName())); + String.format("Attempted to register next operator before initializing operator %s.", + getOperatorSpec().getOpName())); } this.registeredOperators.add(nextOperator); } @@ -103,11 +105,11 @@ void registerNextOperator(OperatorImpl nextOperator) { * @param coordinator the {@link TaskCoordinator} for this message */ public final void onMessage(M message, MessageCollector collector, TaskCoordinator coordinator) { - this.messageCounter.inc(); - long startNs = System.nanoTime(); + this.numMessage.inc(); + long startNs = this.highResClock.nanoTime(); Collection results = handleMessage(message, collector, coordinator); - long endNs = System.nanoTime(); - this.handleMessageTimer.update(endNs - startNs); + long endNs = this.highResClock.nanoTime(); + this.handleMessageNs.update(endNs - startNs); results.forEach(rm -> this.registeredOperators.forEach(op -> @@ -134,15 +136,16 @@ protected abstract Collection handleMessage(M message, MessageCollector coll * @param coordinator the {@link TaskCoordinator} in the context */ public final void onTimer(MessageCollector collector, TaskCoordinator coordinator) { - long startNs = System.nanoTime(); + long startNs = this.highResClock.nanoTime(); Collection results = handleTimer(collector, coordinator); - long endNs = System.nanoTime(); - this.handleTimerTimer.update(endNs - startNs); + long endNs = this.highResClock.nanoTime(); + this.handleTimerNs.update(endNs - startNs); results.forEach(rm -> this.registeredOperators.forEach(op -> op.onMessage(rm, collector, coordinator))); - this.registeredOperators.forEach(op -> op.onTimer(collector, coordinator)); + this.registeredOperators.forEach(op -> + op.onTimer(collector, coordinator)); } /** @@ -163,10 +166,13 @@ protected Collection handleTimer(MessageCollector collector, TaskCoordinator * * @return the {@link OperatorSpec} for this {@link OperatorImpl} */ - protected abstract OperatorSpec getOpSpec(); + protected abstract OperatorSpec getOperatorSpec(); - private String getOpName() { - OperatorSpec opSpec = getOpSpec(); - return String.format("%s-%s", opSpec.getOpCode().name().toLowerCase(), opSpec.getOpId()); + private HighResolutionClock createHighResClock(Config config) { + if (new MetricsConfig(config).getMetricsTimerEnabled()) { + return System::nanoTime; + } else { + return () -> 0; + } } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index e2b1cd5aaa..d8ea5920af 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -172,8 +172,7 @@ private RootOperatorImpl createOperatorImpls(MessageStreamImpl source, */ private OperatorImpl createOperatorImpl(OperatorSpec operatorSpec, Config config, TaskContext context) { if (operatorSpec instanceof StreamOperatorSpec) { - StreamOperatorSpec streamOpSpec = (StreamOperatorSpec) operatorSpec; - return new StreamOperatorImpl<>(streamOpSpec, config, context); + return new StreamOperatorImpl<>((StreamOperatorSpec) operatorSpec, config, context); } else if (operatorSpec instanceof SinkOperatorSpec) { return new SinkOperatorImpl<>((SinkOperatorSpec) operatorSpec, config, context); } else if (operatorSpec instanceof WindowOperatorSpec) { diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java index 1569606732..c7bdc22e5d 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.impl; import org.apache.samza.config.Config; +import org.apache.samza.metrics.Counter; import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction.PartialJoinMessage; import org.apache.samza.operators.spec.OperatorSpec; @@ -56,6 +57,8 @@ class PartialJoinOperatorImpl extends OperatorImpl { private final long ttlMs; private final Clock clock; + private Counter keysRemoved; + PartialJoinOperatorImpl(PartialJoinOperatorSpec partialJoinOpSpec, Config config, TaskContext context, Clock clock) { this.partialJoinOpSpec = partialJoinOpSpec; @@ -66,7 +69,9 @@ class PartialJoinOperatorImpl extends OperatorImpl { } @Override - protected void doInit(Config config, TaskContext context) { + protected void handleInit(Config config, TaskContext context) { + keysRemoved = context.getMetricsRegistry() + .newCounter(OperatorImpl.class.getName(), this.partialJoinOpSpec.getOpName() + "-keys-removed"); this.thisPartialJoinFn.init(config, context); } @@ -96,18 +101,18 @@ public Collection handleTimer(MessageCollector collector, TaskCoordinator co if (entry.getValue().getReceivedTimeMs() < now - ttlMs) { keysToRemove.add(entry.getKey()); } else { - break; + break; // InternalInMemoryStore uses a LinkedHashMap and will return entries in insertion order } } iterator.close(); thisState.deleteAll(keysToRemove); - + keysRemoved.inc(keysToRemove.size()); return Collections.emptyList(); } @Override - protected OperatorSpec getOpSpec() { + protected OperatorSpec getOperatorSpec() { return partialJoinOpSpec; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java index 2d8387dd43..0f18e973ef 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java @@ -36,7 +36,7 @@ public final class RootOperatorImpl extends OperatorImpl { @Override - protected void doInit(Config config, TaskContext context) { + protected void handleInit(Config config, TaskContext context) { } @Override @@ -46,7 +46,7 @@ public Collection handleMessage(M message, MessageCollector collector, TaskCo // TODO: SAMZA-1221 - Change to InputOperatorSpec that also builds the message @Override - protected OperatorSpec getOpSpec() { + protected OperatorSpec getOperatorSpec() { return new OperatorSpec() { @Override public MessageStreamImpl getNextStream() { diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java index 026f249d7a..e82737fb3c 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java @@ -44,7 +44,7 @@ class SinkOperatorImpl extends OperatorImpl { } @Override - protected void doInit(Config config, TaskContext context) { + protected void handleInit(Config config, TaskContext context) { this.sinkFn.init(config, context); } @@ -57,7 +57,7 @@ public Collection handleMessage(M message, MessageCollector collector, } @Override - protected OperatorSpec getOpSpec() { + protected OperatorSpec getOperatorSpec() { return sinkOpSpec; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java index dbb2ea4cad..bd4dce105c 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java @@ -47,7 +47,7 @@ class StreamOperatorImpl extends OperatorImpl { } @Override - protected void doInit(Config config, TaskContext context) { + protected void handleInit(Config config, TaskContext context) { transformFn.init(config, context); } @@ -58,7 +58,7 @@ public Collection handleMessage(M message, MessageCollector collector, } @Override - protected OperatorSpec getOpSpec() { + protected OperatorSpec getOperatorSpec() { return streamOpSpec; } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index fbaedbb37f..2ce0d04c42 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -46,7 +46,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -84,9 +83,6 @@ public class WindowOperatorImpl extends OperatorImpl, WindowState> store = new InternalInMemoryStore<>(); private TriggerScheduler triggerScheduler; - // Results to be returned for the current handleMessage and handleTimer call. - private List> currentResults = new ArrayList<>(); - // The trigger state corresponding to each {@link TriggerKey}. private final Map, TriggerImplHandler> triggers = new HashMap<>(); @@ -98,7 +94,7 @@ public WindowOperatorImpl(WindowOperatorSpec windowOpSpec, Clock cloc } @Override - protected void doInit(Config config, TaskContext context) { + protected void handleInit(Config config, TaskContext context) { WindowInternal window = windowOpSpec.getWindow(); if (window.getFoldLeftFunction() != null) { window.getFoldLeftFunction().init(config, context); @@ -106,52 +102,62 @@ protected void doInit(Config config, TaskContext context) { } @Override - public Collection> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) { + public Collection> handleMessage( + M message, MessageCollector collector, TaskCoordinator coordinator) { LOG.trace("Processing message envelope: {}", message); + List> results = new ArrayList<>(); WindowKey storeKey = getStoreKey(message); WindowState existingState = store.get(storeKey); WindowState newState = applyFoldFunction(existingState, message); - LOG.trace("New window value: {}, earliest timestamp: {}", newState.getWindowValue(), newState.getEarliestTimestamp()); + LOG.trace("New window value: {}, earliest timestamp: {}", + newState.getWindowValue(), newState.getEarliestTimestamp()); store.put(storeKey, newState); if (window.getEarlyTrigger() != null) { TriggerKey triggerKey = new TriggerKey<>(FiringType.EARLY, storeKey); - getOrCreateTriggerImplWrapper(triggerKey, window.getEarlyTrigger()) - .onMessage(triggerKey, message, collector, coordinator); + TriggerImplHandler triggerImplHandler = getOrCreateTriggerImplHandler(triggerKey, window.getEarlyTrigger()); + WindowPane triggeredPane = triggerImplHandler.onMessage(triggerKey, message, collector, coordinator); + if (triggeredPane != null) { + results.add(triggeredPane); + } } if (window.getDefaultTrigger() != null) { TriggerKey triggerKey = new TriggerKey<>(FiringType.DEFAULT, storeKey); - getOrCreateTriggerImplWrapper(triggerKey, window.getDefaultTrigger()) - .onMessage(triggerKey, message, collector, coordinator); + TriggerImplHandler triggerImplHandler = getOrCreateTriggerImplHandler(triggerKey, window.getDefaultTrigger()); + WindowPane triggeredPane = triggerImplHandler.onMessage(triggerKey, message, collector, coordinator); + if (triggeredPane != null) { + results.add(triggeredPane); + } } - List> results = currentResults; - currentResults = new ArrayList<>(); - return Collections.unmodifiableList(results); + return results; } @Override public Collection> handleTimer(MessageCollector collector, TaskCoordinator coordinator) { + List> results = new ArrayList<>(); + List> keys = triggerScheduler.runPendingCallbacks(); for (TriggerKey key : keys) { TriggerImplHandler triggerImplHandler = triggers.get(key); if (triggerImplHandler != null) { - triggerImplHandler.onTimer(key, collector, coordinator); + WindowPane triggeredPane = triggerImplHandler.onTimer(key, collector, coordinator); + if (triggeredPane != null) { + results.add(triggeredPane); + } } } - List> results = currentResults; - currentResults = new ArrayList<>(); - return Collections.unmodifiableList(results); + return results; } @Override - protected OperatorSpec> getOpSpec() { + protected OperatorSpec> getOperatorSpec() { return windowOpSpec; } @@ -197,7 +203,7 @@ private WindowState applyFoldFunction(WindowState existingState, M messa return newState; } - private TriggerImplHandler getOrCreateTriggerImplWrapper(TriggerKey triggerKey, Trigger trigger) { + private TriggerImplHandler getOrCreateTriggerImplHandler(TriggerKey triggerKey, Trigger trigger) { TriggerImplHandler wrapper = triggers.get(triggerKey); if (wrapper != null) { LOG.trace("Returning existing trigger wrapper for {}", triggerKey); @@ -216,7 +222,8 @@ private TriggerImplHandler getOrCreateTriggerImplWrapper(TriggerKey triggerK /** * Handles trigger firings, and propagates results to downstream operators. */ - private void onTriggerFired(TriggerKey triggerKey, MessageCollector collector, TaskCoordinator coordinator) { + private WindowPane onTriggerFired( + TriggerKey triggerKey, MessageCollector collector, TaskCoordinator coordinator) { LOG.trace("Trigger key {} fired." , triggerKey); TriggerImplHandler wrapper = triggers.get(triggerKey); @@ -225,11 +232,10 @@ private void onTriggerFired(TriggerKey triggerKey, MessageCollector collecto if (state == null) { LOG.trace("No state found for triggerKey: {}", triggerKey); - return; + return null; } WindowPane paneOutput = computePaneOutput(triggerKey, state); - currentResults.add(paneOutput); // Handle accumulation modes. if (window.getAccumulationMode() == AccumulationMode.DISCARDING) { @@ -257,6 +263,8 @@ private void onTriggerFired(TriggerKey triggerKey, MessageCollector collecto if (triggerKey.getType() == FiringType.EARLY && !wrapper.isRepeating()) { cancelTrigger(triggerKey, false); } + + return paneOutput; } /** @@ -277,7 +285,8 @@ private WindowPane computePaneOutput(TriggerKey triggerKey, WindowSt windowVal = (WV) new ArrayList<>((Collection) windowVal); } - WindowPane paneOutput = new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType()); + WindowPane paneOutput = + new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType()); LOG.trace("Emitting pane output for trigger key {}", triggerKey); return paneOutput; } @@ -308,7 +317,8 @@ public TriggerImplHandler(TriggerKey key, TriggerImpl impl) { this.impl = impl; } - public void onMessage(TriggerKey triggerKey, M message, MessageCollector collector, TaskCoordinator coordinator) { + public WindowPane onMessage(TriggerKey triggerKey, M message, + MessageCollector collector, TaskCoordinator coordinator) { if (!isCancelled) { LOG.trace("Forwarding callbacks for {}", message); impl.onMessage(message, triggerScheduler); @@ -318,12 +328,13 @@ public void onMessage(TriggerKey triggerKey, M message, MessageCollector col if (impl instanceof RepeatingTriggerImpl) { ((RepeatingTriggerImpl) impl).clear(); } - onTriggerFired(triggerKey, collector, coordinator); + return onTriggerFired(triggerKey, collector, coordinator); } } + return null; } - public void onTimer(TriggerKey key, MessageCollector collector, TaskCoordinator coordinator) { + public WindowPane onTimer(TriggerKey key, MessageCollector collector, TaskCoordinator coordinator) { if (impl.shouldFire() && !isCancelled) { LOG.trace("Triggering timer triggers"); @@ -331,8 +342,9 @@ public void onTimer(TriggerKey key, MessageCollector collector, TaskCoordina if (impl instanceof RepeatingTriggerImpl) { ((RepeatingTriggerImpl) impl).clear(); } - onTriggerFired(key, collector, coordinator); + return onTriggerFired(key, collector, coordinator); } + return null; } public void cancel() { diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index 9699ced020..cc3c4ab065 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -62,4 +62,12 @@ enum OpCode { */ int getOpId(); + /** + * Get the name for this operator based on its opCode and opId. + * @return the name for this operator + */ + default String getOpName() { + return String.format("%s-%s", getOpCode().name().toLowerCase(), getOpId()); + } + } diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index 47ca0254cd..bd18f0b00c 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -70,13 +70,13 @@ public void testOnMessagePropagatesResults() { // register a couple of operators OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class); - when(mockNextOpImpl1.getOpSpec()).thenReturn(new TestOpSpec()); + when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec()); when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); mockNextOpImpl1.init(mock(Config.class), mockTaskContext); opImpl.registerNextOperator(mockNextOpImpl1); OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class); - when(mockNextOpImpl2.getOpSpec()).thenReturn(new TestOpSpec()); + when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec()); when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); mockNextOpImpl2.init(mock(Config.class), mockTaskContext); opImpl.registerNextOperator(mockNextOpImpl2); @@ -126,13 +126,13 @@ public void testOnTimerPropagatesResultsAndTimer() { // register a couple of operators OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class); - when(mockNextOpImpl1.getOpSpec()).thenReturn(new TestOpSpec()); + when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec()); when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); mockNextOpImpl1.init(mock(Config.class), mockTaskContext); opImpl.registerNextOperator(mockNextOpImpl1); OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class); - when(mockNextOpImpl2.getOpSpec()).thenReturn(new TestOpSpec()); + when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec()); when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList()); mockNextOpImpl2.init(mock(Config.class), mockTaskContext); opImpl.registerNextOperator(mockNextOpImpl2); @@ -183,7 +183,7 @@ private static class TestOpImpl extends OperatorImpl { } @Override - protected void doInit(Config config, TaskContext context) {} + protected void handleInit(Config config, TaskContext context) {} @Override public Collection handleMessage(Object message, @@ -197,7 +197,7 @@ public Collection handleTimer(MessageCollector collector, TaskCoordinato } @Override - protected OperatorSpec getOpSpec() { + protected OperatorSpec getOperatorSpec() { return new TestOpSpec(); } } From 27be112eeffe0ba57c58a97bcadaf6a9b025d2db Mon Sep 17 00:00:00 2001 From: Prateek Maheshwari Date: Thu, 27 Apr 2017 12:55:38 -0700 Subject: [PATCH 4/4] Minor cleanup. --- .../operators/impl/WindowOperatorImpl.java | 43 +++++++++---------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index 2ce0d04c42..b99f7191ae 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -49,6 +49,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; /** @@ -64,9 +65,8 @@ * for the trigger and invokes {@link TriggerImplHandler#onMessage(TriggerKey, Object, MessageCollector, TaskCoordinator)}. * The {@link TriggerImplHandler} maintains the {@link TriggerImpl} instance along with whether it has been canceled yet * or not. Then, the {@link TriggerImplHandler} invokes onMessage on underlying its {@link TriggerImpl} instance. A - * {@link TriggerImpl} instance is scoped to a window and its firing determines when results for its window are emitted. The - * {@link WindowOperatorImpl} checks if the trigger fired, and propagates the result of the firing to its downstream - * operators. + * {@link TriggerImpl} instance is scoped to a window and its firing determines when results for its window are emitted. + * The {@link WindowOperatorImpl} checks if the trigger fired and returns the result of the firing. * * @param the type of the incoming message * @param the type of the key in this {@link org.apache.samza.operators.MessageStream} @@ -119,19 +119,17 @@ public Collection> handleMessage( TriggerKey triggerKey = new TriggerKey<>(FiringType.EARLY, storeKey); TriggerImplHandler triggerImplHandler = getOrCreateTriggerImplHandler(triggerKey, window.getEarlyTrigger()); - WindowPane triggeredPane = triggerImplHandler.onMessage(triggerKey, message, collector, coordinator); - if (triggeredPane != null) { - results.add(triggeredPane); - } + Optional> maybeTriggeredPane = + triggerImplHandler.onMessage(triggerKey, message, collector, coordinator); + maybeTriggeredPane.ifPresent(results::add); } if (window.getDefaultTrigger() != null) { TriggerKey triggerKey = new TriggerKey<>(FiringType.DEFAULT, storeKey); TriggerImplHandler triggerImplHandler = getOrCreateTriggerImplHandler(triggerKey, window.getDefaultTrigger()); - WindowPane triggeredPane = triggerImplHandler.onMessage(triggerKey, message, collector, coordinator); - if (triggeredPane != null) { - results.add(triggeredPane); - } + Optional> maybeTriggeredPane = + triggerImplHandler.onMessage(triggerKey, message, collector, coordinator); + maybeTriggeredPane.ifPresent(results::add); } return results; @@ -146,10 +144,8 @@ public Collection> handleTimer(MessageCollector collector, Ta for (TriggerKey key : keys) { TriggerImplHandler triggerImplHandler = triggers.get(key); if (triggerImplHandler != null) { - WindowPane triggeredPane = triggerImplHandler.onTimer(key, collector, coordinator); - if (triggeredPane != null) { - results.add(triggeredPane); - } + Optional> maybeTriggeredPane = triggerImplHandler.onTimer(key, collector, coordinator); + maybeTriggeredPane.ifPresent(results::add); } } @@ -220,9 +216,9 @@ private TriggerImplHandler getOrCreateTriggerImplHandler(TriggerKey triggerK } /** - * Handles trigger firings, and propagates results to downstream operators. + * Handles trigger firings and returns the optional result. */ - private WindowPane onTriggerFired( + private Optional> onTriggerFired( TriggerKey triggerKey, MessageCollector collector, TaskCoordinator coordinator) { LOG.trace("Trigger key {} fired." , triggerKey); @@ -232,7 +228,7 @@ private WindowPane onTriggerFired( if (state == null) { LOG.trace("No state found for triggerKey: {}", triggerKey); - return null; + return Optional.empty(); } WindowPane paneOutput = computePaneOutput(triggerKey, state); @@ -264,7 +260,7 @@ private WindowPane onTriggerFired( cancelTrigger(triggerKey, false); } - return paneOutput; + return Optional.of(paneOutput); } /** @@ -317,7 +313,7 @@ public TriggerImplHandler(TriggerKey key, TriggerImpl impl) { this.impl = impl; } - public WindowPane onMessage(TriggerKey triggerKey, M message, + public Optional> onMessage(TriggerKey triggerKey, M message, MessageCollector collector, TaskCoordinator coordinator) { if (!isCancelled) { LOG.trace("Forwarding callbacks for {}", message); @@ -331,10 +327,11 @@ public WindowPane onMessage(TriggerKey triggerKey, M message, return onTriggerFired(triggerKey, collector, coordinator); } } - return null; + return Optional.empty(); } - public WindowPane onTimer(TriggerKey key, MessageCollector collector, TaskCoordinator coordinator) { + public Optional> onTimer( + TriggerKey key, MessageCollector collector, TaskCoordinator coordinator) { if (impl.shouldFire() && !isCancelled) { LOG.trace("Triggering timer triggers"); @@ -344,7 +341,7 @@ public WindowPane onTimer(TriggerKey key, MessageCollector collector } return onTriggerFired(key, collector, coordinator); } - return null; + return Optional.empty(); } public void cancel() {