From c53b2ec450f6d0a03ce88310c77f687e7c46058e Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Mon, 25 Sep 2017 14:18:52 +0200 Subject: [PATCH 1/3] [FLINK-7684][streaming] Add OptimizationTarget to the ExecutionConfig --- .../flink/api/common/ExecutionConfig.java | 20 +++++++++ .../flink/api/common/OptimizationTarget.java | 42 +++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/OptimizationTarget.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index fc66ccdbe0741..73ae5c91b1207 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -121,6 +121,8 @@ public class ExecutionConfig implements Serializable, Archiveable Date: Thu, 6 Jul 2017 13:56:13 +0200 Subject: [PATCH 2/3] [FLINK-7684][streaming] Serialize MergingWindowSet to ValueState> This avoids an unnecessary data copy --- .../operators/windowing/MergingWindowSet.java | 31 ++--- .../operators/windowing/WindowOperator.java | 80 ++++++++++-- .../windowing/MergingWindowSetTest.java | 121 +++++++++++------- 3 files changed, 154 insertions(+), 78 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java index 10ac2a6a3fb7d..cc1a18ab0e04d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java @@ -18,8 +18,7 @@ package org.apache.flink.streaming.runtime.operators.windowing; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.common.state.ValueState; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.windows.Window; @@ -68,7 +67,7 @@ public class MergingWindowSet { */ private final Map initialMapping; - private final ListState> state; + private final ValueState> state; /** * Our window assigner. @@ -78,21 +77,13 @@ public class MergingWindowSet { /** * Restores a {@link MergingWindowSet} from the given state. */ - public MergingWindowSet(MergingWindowAssigner windowAssigner, ListState> state) throws Exception { + public MergingWindowSet( + MergingWindowAssigner windowAssigner, + ValueState> state) throws Exception { this.windowAssigner = windowAssigner; - mapping = new HashMap<>(); - - Iterable> windowState = state.get(); - if (windowState != null) { - for (Tuple2 window: windowState) { - mapping.put(window.f0, window.f1); - } - } - + this.mapping = (state.value() == null) ? new HashMap<>() : state.value(); this.state = state; - - initialMapping = new HashMap<>(); - initialMapping.putAll(mapping); + this.initialMapping = new HashMap<>(mapping); } /** @@ -101,9 +92,11 @@ public MergingWindowSet(MergingWindowAssigner windowAssigner, ListState window : mapping.entrySet()) { - state.add(new Tuple2<>(window.getKey(), window.getValue())); + if (mapping.isEmpty()) { + state.clear(); + } + else { + state.update(mapping); } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index fd90e65548a8b..e5c77379c05c5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -37,6 +37,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.MapSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -47,6 +48,7 @@ import org.apache.flink.runtime.state.internal.InternalAppendingState; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMergingState; +import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.InternalTimer; @@ -66,9 +68,14 @@ import java.io.Serializable; import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * An operator that implements the logic for windowing based on a {@link WindowAssigner} and @@ -146,7 +153,7 @@ public class WindowOperator private transient InternalMergingState windowMergingState; /** The state that holds the merging window metadata (the sets that describe what is merged). */ - private transient InternalListState> mergingSetsState; + private transient InternalValueState> mergingSetsState; /** * This is given to the {@code InternalWindowFunction} for emitting elements with a given @@ -211,7 +218,7 @@ public void open() throws Exception { timestampedCollector = new TimestampedCollector<>(output); internalTimerService = - getInternalTimerService("window-timers", windowSerializer, this); + getInternalTimerService("window-timers", windowSerializer, this); triggerContext = new Context(null, null); processContext = new WindowContext(null); @@ -243,24 +250,71 @@ public long getCurrentProcessingTime() { // throw new IllegalStateException( // "The window uses a merging assigner, but the window state is not mergeable."); // } + initializeMergingWindowSet(windowSerializer); + } + } + + private void initializeMergingWindowSet(TypeSerializer windowSerializer) throws Exception { + final MapSerializer mapSerializer = new MapSerializer<>(windowSerializer, windowSerializer); + final ValueStateDescriptor> mergingSetsStateDescriptor = + new ValueStateDescriptor<>("merging-window-map", mapSerializer); - @SuppressWarnings("unchecked") - final Class> typedTuple = (Class>) (Class) Tuple2.class; + mergingSetsState = (InternalValueState>) getOrCreateKeyedState( + VoidNamespaceSerializer.INSTANCE, + mergingSetsStateDescriptor); + + mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE); + + migrateMergingWindowState(); + } - final TupleSerializer> tupleSerializer = new TupleSerializer<>( - typedTuple, - new TypeSerializer[] {windowSerializer, windowSerializer}); + private void migrateMergingWindowState() throws Exception { + InternalListState> deprecatedState = getDeprecatedMergingWindowSetState(windowSerializer); - final ListStateDescriptor> mergingSetsStateDescriptor = - new ListStateDescriptor<>("merging-window-set", tupleSerializer); + try (Stream keysStream = getKeyedStateBackend().getKeys("merging-window-set", VoidNamespace.INSTANCE)){ + Iterator keys = keysStream.iterator(); + while (keys.hasNext()) { + Object key = keys.next(); + getKeyedStateBackend().setCurrentKey(key); + + if (deprecatedState.get() == null) { + continue; + } + + if (mergingSetsState.value() == null) { + mergingSetsState.update(new HashMap<>()); + } - // get the state that stores the merging sets - mergingSetsState = (InternalListState>) - getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor); - mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE); + Iterator> deprecatedStateIterator = deprecatedState.get().iterator(); + checkState(mergingSetsState.value().isEmpty() || !deprecatedStateIterator.hasNext(), "Both deprecated and new states are present"); + while (deprecatedStateIterator.hasNext()) { + Tuple2 tuple = deprecatedStateIterator.next(); + mergingSetsState.value().put(tuple.f0, tuple.f1); + } + + deprecatedState.clear(); + } } } + private InternalListState> getDeprecatedMergingWindowSetState(TypeSerializer windowSerializer) throws Exception { + @SuppressWarnings("unchecked") + final Class> typedTuple = (Class>) (Class) Tuple2.class; + + final TupleSerializer> tupleSerializer = new TupleSerializer<>( + typedTuple, + new TypeSerializer[] {windowSerializer, windowSerializer}); + + final ListStateDescriptor> oldMergingSetsStateDescriptor = + new ListStateDescriptor<>("merging-window-set", tupleSerializer); + + InternalListState> oldMergingSetsState = + (InternalListState>) getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, oldMergingSetsStateDescriptor); + + oldMergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE); + return oldMergingSetsState; + } + @Override public void close() throws Exception { super.close(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java index 019facabf5975..fe7d5427b1d6b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java @@ -19,9 +19,8 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; @@ -30,15 +29,15 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; - import org.junit.Test; -import org.mockito.Matchers; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.hasItem; @@ -51,11 +50,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; /** * Tests for verifying that {@link MergingWindowSet} correctly merges windows in various situations @@ -71,10 +65,10 @@ public class MergingWindowSetTest { @Test public void testNonEagerMerging() throws Exception { @SuppressWarnings("unchecked") - ListState> mockState = mock(ListState.class); + ValueState> mockState = new MockState(); MergingWindowSet windowSet = - new MergingWindowSet<>(new NonEagerlyMergingWindowAssigner(3000), mockState); + new MergingWindowSet<>(new NonEagerlyMergingWindowAssigner(3000), mockState, false); TestingMergeFunction mergeFunction = new TestingMergeFunction(); @@ -100,10 +94,10 @@ public void testNonEagerMerging() throws Exception { @Test public void testIncrementalMerging() throws Exception { @SuppressWarnings("unchecked") - ListState> mockState = mock(ListState.class); + ValueState> mockState = new MockState(); MergingWindowSet windowSet = - new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState); + new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState, false); TestingMergeFunction mergeFunction = new TestingMergeFunction(); @@ -202,9 +196,9 @@ public void testIncrementalMerging() throws Exception { @Test public void testLateMerging() throws Exception { @SuppressWarnings("unchecked") - ListState> mockState = mock(ListState.class); + ValueState> mockState = new MockState(); - MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState); + MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState, false); TestingMergeFunction mergeFunction = new TestingMergeFunction(); @@ -275,9 +269,9 @@ public void testLateMerging() throws Exception { @Test public void testMergeLargeWindowCoveringSingleWindow() throws Exception { @SuppressWarnings("unchecked") - ListState> mockState = mock(ListState.class); + ValueState> mockState = new MockState(); - MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState); + MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState, false); TestingMergeFunction mergeFunction = new TestingMergeFunction(); @@ -303,9 +297,9 @@ public void testMergeLargeWindowCoveringSingleWindow() throws Exception { @Test public void testAddingIdenticalWindows() throws Exception { @SuppressWarnings("unchecked") - ListState> mockState = mock(ListState.class); + ValueState> mockState = new MockState(); - MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState); + MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState, false); TestingMergeFunction mergeFunction = new TestingMergeFunction(); @@ -327,9 +321,9 @@ public void testAddingIdenticalWindows() throws Exception { @Test public void testMergeLargeWindowCoveringMultipleWindows() throws Exception { @SuppressWarnings("unchecked") - ListState> mockState = mock(ListState.class); + ValueState> mockState = new MockState(); - MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState); + MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState, false); TestingMergeFunction mergeFunction = new TestingMergeFunction(); @@ -365,13 +359,11 @@ public void testMergeLargeWindowCoveringMultipleWindows() throws Exception { @Test public void testRestoreFromState() throws Exception { @SuppressWarnings("unchecked") - ListState> mockState = mock(ListState.class); - when(mockState.get()).thenReturn(Lists.newArrayList( - new Tuple2<>(new TimeWindow(17, 42), new TimeWindow(42, 17)), - new Tuple2<>(new TimeWindow(1, 2), new TimeWindow(3, 4)) - )); + MockState mockState = new MockState(); + mockState.value.put(new TimeWindow(17, 42), new TimeWindow(42, 17)); + mockState.value.put(new TimeWindow(1, 2), new TimeWindow(3, 4)); - MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState); + MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState, false); assertEquals(new TimeWindow(42, 17), windowSet.getStateWindow(new TimeWindow(17, 42))); assertEquals(new TimeWindow(3, 4), windowSet.getStateWindow(new TimeWindow(1, 2))); @@ -380,10 +372,9 @@ public void testRestoreFromState() throws Exception { @Test public void testPersist() throws Exception { @SuppressWarnings("unchecked") - ListState> mockState = mock(ListState.class); - - MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState); + MockState mockState = new MockState(); + MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState, false); TestingMergeFunction mergeFunction = new TestingMergeFunction(); windowSet.addWindow(new TimeWindow(1, 2), mergeFunction); @@ -394,30 +385,33 @@ public void testPersist() throws Exception { windowSet.persist(); - verify(mockState).add(eq(new Tuple2<>(new TimeWindow(1, 2), new TimeWindow(1, 2)))); - verify(mockState).add(eq(new Tuple2<>(new TimeWindow(17, 42), new TimeWindow(17, 42)))); - - verify(mockState, times(2)).add(Matchers.>anyObject()); + assertTrue(mockState.value().containsKey(new TimeWindow(1, 2))); + assertTrue(mockState.value().containsKey(new TimeWindow(17, 42))); + assertEquals(2, mockState.size()); } @Test - public void testPersistOnlyIfHaveUpdates() throws Exception { + public void testAlwaysPersist() throws Exception { @SuppressWarnings("unchecked") - ListState> mockState = mock(ListState.class); - when(mockState.get()).thenReturn(Lists.newArrayList( - new Tuple2<>(new TimeWindow(17, 42), new TimeWindow(42, 17)), - new Tuple2<>(new TimeWindow(1, 2), new TimeWindow(3, 4)) - )); - - MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState); + MockState mockState = new MockState(); + mockState.value.put(new TimeWindow(1, 10), new TimeWindow(1, 10)); - assertEquals(new TimeWindow(42, 17), windowSet.getStateWindow(new TimeWindow(17, 42))); - assertEquals(new TimeWindow(3, 4), windowSet.getStateWindow(new TimeWindow(1, 2))); + MergingWindowSet windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState, false); + TestingMergeFunction mergeFunction = new TestingMergeFunction(); + windowSet.addWindow(new TimeWindow(5, 6), mergeFunction); windowSet.persist(); - verify(mockState, times(0)).add(Matchers.>anyObject()); + assertEquals(new TimeWindow(1, 10), mockState.value().get(new TimeWindow(1, 10))); + assertFalse(mockState.wasUpdated()); + windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState, true); + + windowSet.addWindow(new TimeWindow(5, 6), mergeFunction); + windowSet.persist(); + + assertEquals(new TimeWindow(1, 10), mockState.value().get(new TimeWindow(1, 10))); + assertTrue(mockState.wasUpdated()); } private static class TestingMergeFunction implements MergingWindowSet.MergeFunction { @@ -532,4 +526,39 @@ public void mergeWindows(Collection windows, MergingWindowAssigner.M } } } + + private static class MockState implements ValueState> { + private Map value = new HashMap<>(); + private boolean updated = false; + + public int size() { + return value != null ? value.size() : 0; + } + + @Override + public void clear() { + value = null; + } + + @Override + public Map value() throws IOException { + // simulate serialization + return new HashMap<>(value); + } + + @Override + public void update(Map newValue) throws IOException { + // simulate serialization + updated = true; + value = new HashMap<>(newValue); + } + + public boolean wasUpdated() { + return updated; + } + + public void resetUpdated() { + updated = false; + } + } } From bfc8858fc4b9125b8fc7acd03cb3f95c000926b2 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Mon, 25 Sep 2017 15:28:01 +0200 Subject: [PATCH 3/3] [FLINK-7684][streaming] Add always persist flag to MegringWindowSet --- .../operators/windowing/MergingWindowSet.java | 17 +++++++++++++---- .../operators/windowing/WindowOperator.java | 6 +++++- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java index cc1a18ab0e04d..bef8bbc24ecc6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; /** * Utility for keeping track of merging {@link Window Windows} when using a @@ -65,7 +66,7 @@ public class MergingWindowSet { * Mapping when we created the {@code MergingWindowSet}. We use this to decide whether * we need to persist any changes to state. */ - private final Map initialMapping; + private final Optional> initialMapping; private final ValueState> state; @@ -76,14 +77,22 @@ public class MergingWindowSet { /** * Restores a {@link MergingWindowSet} from the given state. + * + * @param windowAssigner + * @param state + * @param alwaysPersist when set to true disables modification checking before persisting state. On a one hand + * modification check adds a constant per processed element CPU overhead while on the other + * hand unnecessary state writes can add additional state (IO) operations. + * @throws Exception */ public MergingWindowSet( MergingWindowAssigner windowAssigner, - ValueState> state) throws Exception { + ValueState> state, + boolean alwaysPersist) throws Exception { this.windowAssigner = windowAssigner; this.mapping = (state.value() == null) ? new HashMap<>() : state.value(); this.state = state; - this.initialMapping = new HashMap<>(mapping); + this.initialMapping = alwaysPersist ? Optional.empty() : Optional.of(new HashMap<>(mapping)); } /** @@ -91,7 +100,7 @@ public MergingWindowSet( * initialization. */ public void persist() throws Exception { - if (!mapping.equals(initialMapping)) { + if (!initialMapping.isPresent() || !mapping.equals(initialMapping.get())) { if (mapping.isEmpty()) { state.clear(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index e5c77379c05c5..a12ba2bbca2f5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.OptimizationTarget; import org.apache.flink.api.common.state.AppendingState; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; @@ -608,7 +609,10 @@ protected void sideOutput(StreamRecord element){ protected MergingWindowSet getMergingWindowSet() throws Exception { @SuppressWarnings("unchecked") MergingWindowAssigner mergingAssigner = (MergingWindowAssigner) windowAssigner; - return new MergingWindowSet<>(mergingAssigner, mergingSetsState); + return new MergingWindowSet<>( + mergingAssigner, + mergingSetsState, + getExecutionConfig().getOptimizationTarget() == OptimizationTarget.CPU); } /**