From 9e95e1a5f36b6c4f0b021f38b2a2a4c2f8dacf3a Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Mon, 11 Jun 2018 14:48:06 +0200 Subject: [PATCH 1/8] Refactor/generalize key-group partitioning. --- .../AbstractKeyGroupPartitionedSnapshot.java | 76 ++++++ .../state/AbstractKeyGroupPartitioner.java | 227 ++++++++++++++++++ .../flink/runtime/state/StateSnapshot.java | 67 ++++++ .../heap/AbstractStateTableSnapshot.java | 9 +- .../state/heap/CopyOnWriteStateTable.java | 3 +- .../heap/CopyOnWriteStateTableSnapshot.java | 188 +++++++++------ .../state/heap/HeapKeyedStateBackend.java | 11 +- .../state/heap/NestedMapsStateTable.java | 14 +- .../flink/runtime/state/heap/StateTable.java | 3 +- .../state/heap/StateTableSnapshot.java | 45 ---- ...stractKeyGroupPartitionedSnapshotTest.java | 104 ++++++++ .../AbstractKeyGroupPartitionerTestBase.java | 82 +++++++ .../state/heap/CopyOnWriteStateTableTest.java | 7 +- .../StateTableKeyGroupPartitionerTest.java | 72 ++++++ .../StateTableSnapshotCompatibilityTest.java | 10 +- 15 files changed, 785 insertions(+), 133 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableSnapshot.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshotTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionerTestBase.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableKeyGroupPartitionerTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java new file mode 100644 index 0000000000000..4b562f8f6b139 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.core.memory.DataOutputView; + +import javax.annotation.Nonnull; + +import java.io.IOException; + + +/** + * Abstract base class for implementations of + * {@link org.apache.flink.runtime.state.StateSnapshot.KeyGroupPartitionedSnapshot} based on the result of a + * {@link AbstractKeyGroupPartitioner}. + * + * @param type of the written elements. + */ +public abstract class AbstractKeyGroupPartitionedSnapshot implements StateSnapshot.KeyGroupPartitionedSnapshot { + + /** The partitioning result to be written by key-group. */ + @Nonnull + private final AbstractKeyGroupPartitioner.PartitioningResult partitioningResult; + + public AbstractKeyGroupPartitionedSnapshot( + @Nonnull AbstractKeyGroupPartitioner.PartitioningResult partitioningResult) { + this.partitioningResult = partitioningResult; + } + + @Override + public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException { + + final T[] groupedOut = partitioningResult.getPartitionedElements(); + + int startOffset = partitioningResult.getKeyGroupStartOffsetInclusive(keyGroupId); + int endOffset = partitioningResult.getKeyGroupEndOffsetExclusive(keyGroupId); + + // write number of mappings in key-group + dov.writeInt(endOffset - startOffset); + + // write mappings + for (int i = startOffset; i < endOffset; ++i) { + if(groupedOut[i] == null) { + throw new IllegalStateException(); + } + writeElement(groupedOut[i], dov); + groupedOut[i] = null; // free asap for GC + } + } + + /** + * This method defines how to write a single element to the output. + * + * @param element the element to be written. + * @param dov the output view to write the element. + * @throws IOException on write-related problems. + */ + protected abstract void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) throws IOException; +} + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java new file mode 100644 index 0000000000000..79874a23add42 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +/** + * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works + * with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a + * single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but + * better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity. + */ +public abstract class AbstractKeyGroupPartitioner { + + /** Total number of input elements. */ + @Nonnegative + protected final int numberOfElements; + + /** The total number of key-groups in the job. */ + @Nonnegative + protected final int totalKeyGroups; + + /** The key-group range for the input data, covered in this partitioning. */ + @Nonnull + protected final KeyGroupRange keyGroupRange; + + /** + * This bookkeeping array is used to count the elements in each key-group. In a second step, it is transformed into + * a histogram by accumulation. + */ + @Nonnull + protected final int[] counterHistogram; + + /** + * This is a helper array that caches the key-group for each element, so we do not have to compute them twice. + */ + @Nonnull + protected final int[] elementKeyGroups; + + /** Cached value of keyGroupRange#firstKeyGroup. */ + @Nonnegative + protected final int firstKeyGroup; + + /** Cached result. */ + protected PartitioningResult computedResult; + + /** + * @param keyGroupRange the key-group range of the data that will be partitioned by this instance. + * @param totalKeyGroups the total number of key groups in the job. + */ + public AbstractKeyGroupPartitioner( + @Nonnegative int numberOfElements, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalKeyGroups) { + + this.numberOfElements = numberOfElements; + this.keyGroupRange = keyGroupRange; + this.totalKeyGroups = totalKeyGroups; + this.firstKeyGroup = keyGroupRange.getStartKeyGroup(); + this.elementKeyGroups = new int[numberOfElements]; + this.counterHistogram = new int[keyGroupRange.getNumberOfKeyGroups()]; + this.computedResult = null; + } + + /** + * Partitions the data into key-groups and returns the result via {@link PartitioningResult}. + */ + public PartitioningResult partitionByKeyGroup() { + if (computedResult == null) { + reportAllElementKeyGroups(); + buildHistogramFromCounts(); + executePartitioning(); + } + return computedResult; + } + + /** + * This method iterates over the input data and reports the key-group for each element. + */ + protected void reportAllElementKeyGroups() { + final T[] input = getPartitioningInput(); + + Preconditions.checkState(input.length >= numberOfElements); + + for (int i = 0; i < numberOfElements; ++i) { + int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement(input[i]), totalKeyGroups); + reportKeyGroupOfElementAtIndex(i, keyGroup); + } + } + + /** + * Returns the key for the given element by which the key-group can be computed. + */ + @Nonnull + protected abstract Object extractKeyFromElement(T element); + + /** + * Returns the input data for the partitioning. All elements to consider must be densely in the index interval + * [0, {@link #numberOfElements}[, without null values. + */ + @Nonnull + protected abstract T[] getPartitioningInput(); + + /** + * Returns the output array for the partitioning. The size must be {@link #numberOfElements} (or bigger). + */ + @Nonnull + protected abstract T[] getPartitioningOutput(); + + /** + * This method reports in the bookkeeping data that the element at the given index belongs to the given key-group. + */ + protected void reportKeyGroupOfElementAtIndex(int index, int keyGroup) { + final int keyGroupIndex = keyGroup - firstKeyGroup; + elementKeyGroups[index] = keyGroupIndex; + ++counterHistogram[keyGroupIndex]; + } + + /** + * This method creates a histogram from the counts per key-group in {@link #counterHistogram}. + */ + private void buildHistogramFromCounts() { + int sum = 0; + for (int i = 0; i < counterHistogram.length; ++i) { + int currentSlotValue = counterHistogram[i]; + counterHistogram[i] = sum; + sum += currentSlotValue; + } + } + + private void executePartitioning() { + + final T[] in = getPartitioningInput(); + final T[] out = getPartitioningOutput(); + + Preconditions.checkState(in != out); + Preconditions.checkState(in.length >= numberOfElements); + Preconditions.checkState(out.length >= numberOfElements); + + // We repartition the entries by their pre-computed key-groups, using the histogram values as write indexes + for (int inIdx = 0; inIdx < numberOfElements; ++inIdx) { + int effectiveKgIdx = elementKeyGroups[inIdx]; + int outIdx = counterHistogram[effectiveKgIdx]++; + out[outIdx] = in[inIdx]; + } + + this.computedResult = new PartitioningResult<>(firstKeyGroup, counterHistogram, out); + } + + /** + * This represents the result of key-group partitioning. The data in {@link #partitionedElements} is partitioned + * w.r.t. {@link AbstractKeyGroupPartitioner#keyGroupRange}. + */ + public static class PartitioningResult { + + /** + * The exclusive-end-offsets for all key-groups of the covered range for the partitioning. Exclusive-end-offset + * for key-group n is under keyGroupOffsets[n - firstKeyGroup]. + */ + @Nonnull + private final int[] keyGroupOffsets; + + /** + * Array with elements that are partitioned w.r.t. the covered key-group range. The start offset for each + * key-group is in {@link #keyGroupOffsets}. + */ + @Nonnull + private final T[] partitionedElements; + + private final int firstKeyGroup; + + PartitioningResult( + @Nonnegative int firstKeyGroup, + @Nonnull int[] keyGroupEndOffsets, + @Nonnull T[] partitionedElements) { + + this.firstKeyGroup = firstKeyGroup; + this.keyGroupOffsets = keyGroupEndOffsets; + this.partitionedElements = partitionedElements; + } + + @Nonnull + public T[] getPartitionedElements() { + return partitionedElements; + } + + @Nonnegative + public int getKeyGroupStartOffsetInclusive(int keyGroup) { + int idx = keyGroup - firstKeyGroup - 1; + return idx < 0 ? 0 : keyGroupOffsets[idx]; + } + + @Nonnegative + public int getKeyGroupEndOffsetExclusive(int keyGroup) { + return keyGroupOffsets[keyGroup - firstKeyGroup]; + } + + @Nonnegative + public int getFirstKeyGroup() { + return firstKeyGroup; + } + + @Nonnegative + public int getNumberOfKeyGroups() { + return keyGroupOffsets.length; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java new file mode 100644 index 0000000000000..1fcac5c140baa --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.core.memory.DataOutputView; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.io.IOException; + +/** + * General interface for state snapshots that should be written partitioned by key-groups. + * All snapshots should be released after usage. This interface outlines the asynchronous snapshot life-cycle, which + * typically looks as follows. In the synchronous part of a checkpoint, an instance of {@link StateSnapshot} is produced + * for a state and captures the state at this point in time. Then, in the asynchronous part of the checkpoint, the user + * calls {@link #partitionByKeyGroup()} to ensure that the snapshot is partitioned into key-groups. For state that is + * already partitioned, this can be a NOP. The returned {@link KeyGroupPartitionedSnapshot} can be used by the caller + * to write the state by key-group. As a last step, when the state is completely written, the user calls + * {@link #release()}. + */ +public interface StateSnapshot { + + /** + * This method partitions the snapshot by key-group and then returns a {@link KeyGroupPartitionedSnapshot}. + */ + @Nonnull + KeyGroupPartitionedSnapshot partitionByKeyGroup(); + + /** + * Release the snapshot. All snapshots should be released when they are no longer used because some implementation + * can only release resources after a release. Produced {@link KeyGroupPartitionedSnapshot} should no longer be used + * after calling this method. + */ + void release(); + + /** + * Interface for writing a snapshot after it is partitioned into key-groups. + */ + interface KeyGroupPartitionedSnapshot { + /** + * Writes the data for the specified key-group to the output. You must call {@link #partitionByKeyGroup()} once + * before first calling this method. + * + * @param dov the output. + * @param keyGroupId the key-group to write. + * @throws IOException on write-related problems. + */ + void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, @Nonnegative int keyGroupId) throws IOException; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java index b0d7727c360ec..03f253ba6a8bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java @@ -19,14 +19,15 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.state.StateSnapshot; import org.apache.flink.util.Preconditions; /** - * Abstract class to encapsulate the logic to take snapshots of {@link StateTable} implementations and also defines how - * the snapshot is written during the serialization phase of checkpointing. + * Abstract base class for snapshots of a {@link StateTable}. Offers a way to serialize the snapshot (by key-group). + * All snapshots should be released after usage. */ @Internal -abstract class AbstractStateTableSnapshot> implements StateTableSnapshot { +abstract class AbstractStateTableSnapshot> implements StateSnapshot { /** * The {@link StateTable} from which this snapshot was created. @@ -48,4 +49,4 @@ abstract class AbstractStateTableSnapshot @Override public void release() { } -} \ No newline at end of file +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java index 4ecb0edd98474..01be0ede80fda 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java @@ -878,7 +878,8 @@ void releaseSnapshot(CopyOnWriteStateTableSnapshot snapshotToRelease) { * @param type of namespace. * @param type of state. */ - static class StateTableEntry implements StateEntry { + @VisibleForTesting + protected static class StateTableEntry implements StateEntry { /** * The key. Assumed to be immutable and not null. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java index 2ac88b3e78f27..afb9c51c28c20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java @@ -19,11 +19,18 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.AbstractKeyGroupPartitionedSnapshot; +import org.apache.flink.runtime.state.AbstractKeyGroupPartitioner; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.io.IOException; /** @@ -52,39 +59,43 @@ public class CopyOnWriteStateTableSnapshot */ private final int snapshotVersion; - /** - * The number of entries in the {@link CopyOnWriteStateTable} at the time of creating this snapshot. - */ - private final int stateTableSize; - /** * The state table entries, as by the time this snapshot was created. Objects in this array may or may not be deep * copies of the current entries in the {@link CopyOnWriteStateTable} that created this snapshot. This depends for each entry * on whether or not it was subject to copy-on-write operations by the {@link CopyOnWriteStateTable}. */ + @Nonnull private final CopyOnWriteStateTable.StateTableEntry[] snapshotData; - /** - * Offsets for the individual key-groups. This is lazily created when the snapshot is grouped by key-group during - * the process of writing this snapshot to an output as part of checkpointing. - */ - private int[] keyGroupOffsets; + /** The number of (non-null) entries in snapshotData. */ + @Nonnegative + private final int numberOfEntriesInSnapshotData; /** * A local duplicate of the table's key serializer. */ + @Nonnull private final TypeSerializer localKeySerializer; /** * A local duplicate of the table's namespace serializer. */ + @Nonnull private final TypeSerializer localNamespaceSerializer; /** * A local duplicate of the table's state serializer. */ + @Nonnull private final TypeSerializer localStateSerializer; + /** + * Result of partitioning the snapshot by key-group. This is lazily created in the process of writing this snapshot + * to an output as part of checkpointing. + */ + @Nullable + private PartitionedStateTableSnapshot partitionedStateTableSnapshot; + /** * Creates a new {@link CopyOnWriteStateTableSnapshot}. * @@ -95,7 +106,8 @@ public class CopyOnWriteStateTableSnapshot super(owningStateTable); this.snapshotData = owningStateTable.snapshotTableArrays(); this.snapshotVersion = owningStateTable.getStateTableVersion(); - this.stateTableSize = owningStateTable.size(); + this.numberOfEntriesInSnapshotData = owningStateTable.size(); + // We create duplicates of the serializers for the async snapshot, because TypeSerializer // might be stateful and shared with the event processing thread. @@ -103,7 +115,7 @@ public class CopyOnWriteStateTableSnapshot this.localNamespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer().duplicate(); this.localStateSerializer = owningStateTable.metaInfo.getStateSerializer().duplicate(); - this.keyGroupOffsets = null; + this.partitionedStateTableSnapshot = null; } /** @@ -123,47 +135,27 @@ int getSnapshotVersion() { * As a possible future optimization, we could perform the repartitioning in-place, using a scheme similar to the * cuckoo cycles in cuckoo hashing. This can trade some performance for a smaller memory footprint. */ + @Nonnull @SuppressWarnings("unchecked") - private void partitionEntriesByKeyGroup() { + @Override + public KeyGroupPartitionedSnapshot partitionByKeyGroup() { - // We only have to perform this step once before the first key-group is written - if (null != keyGroupOffsets) { - return; - } + if (partitionedStateTableSnapshot == null) { - final KeyGroupRange keyGroupRange = owningStateTable.keyContext.getKeyGroupRange(); - final int totalKeyGroups = owningStateTable.keyContext.getNumberOfKeyGroups(); - final int baseKgIdx = keyGroupRange.getStartKeyGroup(); - final int[] histogram = new int[keyGroupRange.getNumberOfKeyGroups() + 1]; - - CopyOnWriteStateTable.StateTableEntry[] unfold = new CopyOnWriteStateTable.StateTableEntry[stateTableSize]; - - // 1) In this step we i) 'unfold' the linked list of entries to a flat array and ii) build a histogram for key-groups - int unfoldIndex = 0; - for (CopyOnWriteStateTable.StateTableEntry entry : snapshotData) { - while (null != entry) { - int effectiveKgIdx = - KeyGroupRangeAssignment.computeKeyGroupForKeyHash(entry.key.hashCode(), totalKeyGroups) - baseKgIdx + 1; - ++histogram[effectiveKgIdx]; - unfold[unfoldIndex++] = entry; - entry = entry.next; - } - } + final InternalKeyContext keyContext = owningStateTable.keyContext; + final KeyGroupRange keyGroupRange = keyContext.getKeyGroupRange(); + final int numberOfKeyGroups = keyContext.getNumberOfKeyGroups(); - // 2) We accumulate the histogram bins to obtain key-group ranges in the final array - for (int i = 1; i < histogram.length; ++i) { - histogram[i] += histogram[i - 1]; - } + StateTableKeyGroupPartitioner keyGroupPartitioner = new StateTableKeyGroupPartitioner<>( + snapshotData, + numberOfEntriesInSnapshotData, + keyGroupRange, + numberOfKeyGroups); - // 3) We repartition the entries by key-group, using the histogram values as write indexes - for (CopyOnWriteStateTable.StateTableEntry t : unfold) { - int effectiveKgIdx = - KeyGroupRangeAssignment.computeKeyGroupForKeyHash(t.key.hashCode(), totalKeyGroups) - baseKgIdx; - snapshotData[histogram[effectiveKgIdx]++] = t; + partitionedStateTableSnapshot = new PartitionedStateTableSnapshot(keyGroupPartitioner.partitionByKeyGroup()); } - // 4) As byproduct, we also created the key-group offsets - this.keyGroupOffsets = histogram; + return partitionedStateTableSnapshot; } @Override @@ -171,36 +163,94 @@ public void release() { owningStateTable.releaseSnapshot(this); } - @Override - public void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException { + /** + * Returns true iff the given state table is the owner of this snapshot object. + */ + boolean isOwner(CopyOnWriteStateTable stateTable) { + return stateTable == owningStateTable; + } - if (null == keyGroupOffsets) { - partitionEntriesByKeyGroup(); + /** + * This class is the implementation of {@link AbstractKeyGroupPartitioner} for {@link CopyOnWriteStateTable}. + * + * @param type of key. + * @param type of namespace. + * @param type of state value. + */ + @VisibleForTesting + protected static final class StateTableKeyGroupPartitioner + extends AbstractKeyGroupPartitioner> { + + @Nonnull + private final CopyOnWriteStateTable.StateTableEntry[] snapshotData; + + @Nonnull + private final CopyOnWriteStateTable.StateTableEntry[] flattenedData; + + @SuppressWarnings("unchecked") + StateTableKeyGroupPartitioner( + @Nonnull CopyOnWriteStateTable.StateTableEntry[] snapshotData, + @Nonnegative int stateTableSize, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalKeyGroups) { + + super(stateTableSize, keyGroupRange, totalKeyGroups); + this.snapshotData = snapshotData; + this.flattenedData = new CopyOnWriteStateTable.StateTableEntry[numberOfElements]; } - final CopyOnWriteStateTable.StateTableEntry[] groupedOut = snapshotData; - KeyGroupRange keyGroupRange = owningStateTable.keyContext.getKeyGroupRange(); - int keyGroupOffsetIdx = keyGroupId - keyGroupRange.getStartKeyGroup() - 1; - int startOffset = keyGroupOffsetIdx < 0 ? 0 : keyGroupOffsets[keyGroupOffsetIdx]; - int endOffset = keyGroupOffsets[keyGroupOffsetIdx + 1]; - - // write number of mappings in key-group - dov.writeInt(endOffset - startOffset); - - // write mappings - for (int i = startOffset; i < endOffset; ++i) { - CopyOnWriteStateTable.StateTableEntry toWrite = groupedOut[i]; - groupedOut[i] = null; // free asap for GC - localNamespaceSerializer.serialize(toWrite.namespace, dov); - localKeySerializer.serialize(toWrite.key, dov); - localStateSerializer.serialize(toWrite.state, dov); + @Override + protected void reportAllElementKeyGroups() { + // In this step we i) 'flatten' the linked list of entries to a second array and ii) report key-groups. + int flattenIndex = 0; + for (CopyOnWriteStateTable.StateTableEntry entry : snapshotData) { + while (null != entry) { + final int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(entry.key, totalKeyGroups); + reportKeyGroupOfElementAtIndex(flattenIndex, keyGroup); + flattenedData[flattenIndex++] = entry; + entry = entry.next; + } + } + } + + @Nonnull + @Override + protected Object extractKeyFromElement(CopyOnWriteStateTable.StateTableEntry element) { + return element.getKey(); + } + + @Nonnull + @Override + protected CopyOnWriteStateTable.StateTableEntry[] getPartitioningInput() { + return flattenedData; + } + + @Nonnull + @Override + protected CopyOnWriteStateTable.StateTableEntry[] getPartitioningOutput() { + return snapshotData; } } /** - * Returns true iff the given state table is the owner of this snapshot object. + * This class represents a {@link org.apache.flink.runtime.state.StateSnapshot.KeyGroupPartitionedSnapshot} for + * {@link CopyOnWriteStateTable}. */ - boolean isOwner(CopyOnWriteStateTable stateTable) { - return stateTable == owningStateTable; + private final class PartitionedStateTableSnapshot + extends AbstractKeyGroupPartitionedSnapshot> { + + public PartitionedStateTableSnapshot( + @Nonnull AbstractKeyGroupPartitioner.PartitioningResult> partitioningResult) { + super(partitioningResult); + } + + @Override + protected void writeElement( + @Nonnull CopyOnWriteStateTable.StateTableEntry element, + @Nonnull DataOutputView dov) throws IOException { + localNamespaceSerializer.serialize(element.namespace, dov); + localKeySerializer.serialize(element.key, dov); + localStateSerializer.serialize(element.state, dov); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index ab91ee175ed3c..76479b0bec19a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -55,6 +55,7 @@ import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.SnapshotStrategy; +import org.apache.flink.runtime.state.StateSnapshot; import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; @@ -599,7 +600,7 @@ public RunnableFuture> performSnapshot( final Map kVStateToId = new HashMap<>(stateTables.size()); - final Map cowStateStableSnapshots = + final Map cowStateStableSnapshots = new HashMap<>(stateTables.size()); for (Map.Entry> kvState : stateTables.entrySet()) { @@ -653,7 +654,7 @@ protected void releaseResources() { unregisterAndCloseStreamAndResultExtractor(); - for (StateTableSnapshot tableSnapshot : cowStateStableSnapshots.values()) { + for (StateSnapshot tableSnapshot : cowStateStableSnapshots.values()) { tableSnapshot.release(); } } @@ -689,12 +690,14 @@ protected SnapshotResult performOperation() throws Exception { keyGroupRangeOffsets[keyGroupPos] = localStream.getPos(); outView.writeInt(keyGroupId); - for (Map.Entry kvState : cowStateStableSnapshots.entrySet()) { + for (Map.Entry kvState : cowStateStableSnapshots.entrySet()) { + StateSnapshot.KeyGroupPartitionedSnapshot partitionedSnapshot = + kvState.getValue().partitionByKeyGroup(); try (OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(localStream)) { String stateName = kvState.getKey(); DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut); kgCompressionView.writeShort(kVStateToId.get(stateName)); - kvState.getValue().writeMappingsInKeyGroup(kgCompressionView, keyGroupId); + partitionedSnapshot.writeMappingsInKeyGroup(kgCompressionView, keyGroupId); } // this will just close the outer compression stream } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java index 870ecbf76db27..18551b574ae78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java @@ -23,9 +23,12 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateSnapshot; import org.apache.flink.runtime.state.StateTransformationFunction; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; + import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -333,12 +336,19 @@ public NestedMapsStateTableSnapshot createSnapshot() { * @param type of state. */ static class NestedMapsStateTableSnapshot - extends AbstractStateTableSnapshot> { + extends AbstractStateTableSnapshot> + implements StateSnapshot.KeyGroupPartitionedSnapshot { NestedMapsStateTableSnapshot(NestedMapsStateTable owningTable) { super(owningTable); } + @Nonnull + @Override + public KeyGroupPartitionedSnapshot partitionByKeyGroup() { + return this; + } + /** * Implementation note: we currently chose the same format between {@link NestedMapsStateTable} and * {@link CopyOnWriteStateTable}. @@ -349,7 +359,7 @@ static class NestedMapsStateTableSnapshot * implementations). */ @Override - public void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException { + public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException { final Map> keyGroupMap = owningStateTable.getMapForKeyGroup(keyGroupId); if (null != keyGroupMap) { TypeSerializer keySerializer = owningStateTable.keyContext.getKeySerializer(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java index 8c07b2523b7a6..de2290a837d98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateSnapshot; import org.apache.flink.runtime.state.StateTransformationFunction; import org.apache.flink.util.Preconditions; @@ -182,7 +183,7 @@ public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo metaInfo) { // Snapshot / Restore ------------------------------------------------------------------------- - abstract StateTableSnapshot createSnapshot(); + abstract StateSnapshot createSnapshot(); public abstract void put(K key, int keyGroup, N namespace, S state); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableSnapshot.java deleted file mode 100644 index d4244d7e5d358..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableSnapshot.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.heap; - -import org.apache.flink.core.memory.DataOutputView; - -import java.io.IOException; - -/** - * Interface for the snapshots of a {@link StateTable}. Offers a way to serialize the snapshot (by key-group). All - * snapshots should be released after usage. - */ -interface StateTableSnapshot { - - /** - * Writes the data for the specified key-group to the output. - * - * @param dov the output - * @param keyGroupId the key-group to write - * @throws IOException on write related problems - */ - void writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException; - - /** - * Release the snapshot. All snapshots should be released when they are no longer used because some implementation - * can only release resources after a release. - */ - void release(); -} \ No newline at end of file diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshotTest.java new file mode 100644 index 0000000000000..c575352b5126d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshotTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.io.IOException; + +/** + * Tests for {@link AbstractKeyGroupPartitionedSnapshot}. + */ +public class AbstractKeyGroupPartitionedSnapshotTest { + + @Test + public void testWriteMappingsInKeyGroup() throws IOException { + doTestWriteMappingsInKeyGroup(KeyGroupRange.of(0, 0)); + doTestWriteMappingsInKeyGroup(KeyGroupRange.of(0, 3)); + doTestWriteMappingsInKeyGroup(KeyGroupRange.of(1, 2)); + doTestWriteMappingsInKeyGroup(KeyGroupRange.of(3, 8)); + } + + private void doTestWriteMappingsInKeyGroup(KeyGroupRange testRange) throws IOException { + int[] offsets = new int[testRange.getNumberOfKeyGroups()]; + Integer[] data = new Integer[gaussSumZeroToX(offsets.length)]; + + int pos = 0; + for (int i = 0; i < offsets.length; ++i) { + + int endIdx = gaussSumZeroToX(i); + offsets[i] = endIdx; + + while (pos < endIdx) { + data[pos] = i; + ++pos; + } + } + + AbstractKeyGroupPartitioner.PartitioningResult result = + new AbstractKeyGroupPartitioner.PartitioningResult<>(testRange.getStartKeyGroup(), offsets, data); + + + TestAbstractKeyGroupPartitionedSnapshot testInstance = new TestAbstractKeyGroupPartitionedSnapshot(result); + DataOutputView dummyOut = new DataOutputViewStreamWrapper(new ByteArrayOutputStreamWithPos()); + + + for (Integer keyGroup : testRange) { + testInstance.writeMappingsInKeyGroup(dummyOut, keyGroup); + } + + testInstance.validateCounts(); + } + + /** + * Simple test implementation with validation of {@link AbstractKeyGroupPartitionedSnapshot}. + */ + static final class TestAbstractKeyGroupPartitionedSnapshot extends AbstractKeyGroupPartitionedSnapshot { + + @Nonnull + private final int[] countCheck; + + TestAbstractKeyGroupPartitionedSnapshot( + @Nonnull AbstractKeyGroupPartitioner.PartitioningResult partitioningResult) { + super(partitioningResult); + this.countCheck = new int[partitioningResult.getNumberOfKeyGroups()]; + } + + @Override + protected void writeElement(@Nonnull Integer element, @Nonnull DataOutputView dov) throws IOException { + ++countCheck[element]; + } + + void validateCounts() { + for (int i = 0; i < countCheck.length; ++i) { + Assert.assertEquals("Key-group count does not match expectation.", i, countCheck[i]); + } + } + } + + private static int gaussSumZeroToX(int x) { + return (x * (x + 1)) / 2; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionerTestBase.java new file mode 100644 index 0000000000000..16736573f7d75 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionerTestBase.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Random; + +/** + * Abstract test base for implementations of {@link AbstractKeyedStateBackend}. + */ +public abstract class AbstractKeyGroupPartitionerTestBase extends TestLogger { + + @Test + public void testPartitionByKeyGroup() { + + final Random random = new Random(0x42); + testPartitionByKeyGroupForSize(0, random); + testPartitionByKeyGroupForSize(1, random); + testPartitionByKeyGroupForSize(2, random); + testPartitionByKeyGroupForSize(1000, random); + } + + @SuppressWarnings("unchecked") + private void testPartitionByKeyGroupForSize(int testSize, Random random) { + + final T[] data = generateTestData(random, testSize); + + // Test with 5 key-groups. + final KeyGroupRange range = new KeyGroupRange(0, 4); + final int numberOfKeyGroups = range.getNumberOfKeyGroups(); + final AbstractKeyGroupPartitioner testInstance = createPartitioner(data, testSize, range, numberOfKeyGroups); + final AbstractKeyGroupPartitioner.PartitioningResult result = testInstance.partitionByKeyGroup(); + + final T[] partitionedElements = result.getPartitionedElements(); + + Assert.assertEquals(numberOfKeyGroups, result.getNumberOfKeyGroups()); + Assert.assertEquals(range.getStartKeyGroup(), result.getFirstKeyGroup()); + Assert.assertEquals(data.length, partitionedElements.length); + Assert.assertEquals(0, result.getKeyGroupStartOffsetInclusive(range.getStartKeyGroup())); + Assert.assertEquals(testSize, result.getKeyGroupEndOffsetExclusive(range.getEndKeyGroup())); + + for (int keyGroup = 0; keyGroup < result.getNumberOfKeyGroups(); ++keyGroup) { + int start = result.getKeyGroupStartOffsetInclusive(keyGroup); + int end = result.getKeyGroupEndOffsetExclusive(keyGroup); + for (int i = start; i < end; ++i) { + Assert.assertEquals("Mismatch at index " + i, + keyGroup, + KeyGroupRangeAssignment.assignToKeyGroup( + testInstance.extractKeyFromElement(partitionedElements[i]), + numberOfKeyGroups)); + } + } + } + + abstract protected T[] generateTestData(Random random, int numElementsToGenerate); + + abstract protected AbstractKeyGroupPartitioner createPartitioner( + T[] data, + int numElements, + KeyGroupRange keyGroupRange, + int totalKeyGroups); +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java index 09a0620be96d5..4f36d62592249 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.ArrayListSerializer; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateSnapshot; import org.apache.flink.runtime.state.StateTransformationFunction; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -442,15 +443,15 @@ public TypeSerializer getKeySerializer() { table.put(2, 0, 1, 2); - CopyOnWriteStateTableSnapshot snapshot = table.createSnapshot(); + final CopyOnWriteStateTableSnapshot snapshot = table.createSnapshot(); try { - + final StateSnapshot.KeyGroupPartitionedSnapshot partitionedSnapshot = snapshot.partitionByKeyGroup(); namespaceSerializer.disable(); keySerializer.disable(); stateSerializer.disable(); - snapshot.writeMappingsInKeyGroup( + partitionedSnapshot.writeMappingsInKeyGroup( new DataOutputViewStreamWrapper( new ByteArrayOutputStreamWithPos(1024)), 0); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableKeyGroupPartitionerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableKeyGroupPartitionerTest.java new file mode 100644 index 0000000000000..177a604ba155b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableKeyGroupPartitionerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.runtime.state.AbstractKeyGroupPartitioner; +import org.apache.flink.runtime.state.AbstractKeyGroupPartitionerTestBase; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.StateTableEntry; + +import java.util.Random; + +/** + * Test for {@link org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.StateTableKeyGroupPartitioner}. + */ +public class StateTableKeyGroupPartitionerTest extends + AbstractKeyGroupPartitionerTestBase> { + + @Override + @SuppressWarnings("unchecked") + protected StateTableEntry[] generateTestData( + Random random, + int numElementsToGenerate) { + + // we let the array size differ a bit from the test size to check this works + final int arraySize = numElementsToGenerate > 1 ? numElementsToGenerate + 5 : numElementsToGenerate; + final StateTableEntry[] data = new StateTableEntry[arraySize]; + for (int i = 0; i < numElementsToGenerate; ++i) { + Integer key = random.nextInt() & Integer.MAX_VALUE; + data[i] = new CopyOnWriteStateTable.StateTableEntry<>( + key, + VoidNamespace.INSTANCE, + 42, + key.hashCode(), + null, + 0, + 0); + } + + return data; + } + + @Override + protected AbstractKeyGroupPartitioner> createPartitioner( + StateTableEntry[] data, + int numElements, + KeyGroupRange keyGroupRange, + int totalKeyGroups) { + + return new CopyOnWriteStateTableSnapshot.StateTableKeyGroupPartitioner<>( + data, + numElements, + keyGroupRange, + totalKeyGroups); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java index 85bc1774b58f9..0c8e8fe6480cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java @@ -28,6 +28,8 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateSnapshot; + import org.junit.Assert; import org.junit.Test; @@ -67,7 +69,7 @@ public void checkCompatibleSerializationFormats() throws IOException { cowStateTable.put(r.nextInt(10), r.nextInt(2), list); } - StateTableSnapshot snapshot = cowStateTable.createSnapshot(); + StateSnapshot snapshot = cowStateTable.createSnapshot(); final NestedMapsStateTable> nestedMapsStateTable = new NestedMapsStateTable<>(keyContext, metaInfo); @@ -95,14 +97,14 @@ public void checkCompatibleSerializationFormats() throws IOException { private static void restoreStateTableFromSnapshot( StateTable stateTable, - StateTableSnapshot snapshot, + StateSnapshot snapshot, KeyGroupRange keyGroupRange) throws IOException { final ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(1024 * 1024); final DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out); - + final StateSnapshot.KeyGroupPartitionedSnapshot keyGroupPartitionedSnapshot = snapshot.partitionByKeyGroup(); for (Integer keyGroup : keyGroupRange) { - snapshot.writeMappingsInKeyGroup(dov, keyGroup); + keyGroupPartitionedSnapshot.writeMappingsInKeyGroup(dov, keyGroup); } final ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(out.getBuf()); From 800abd9743c5483f50835d0cf938ad50f550ae49 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Mon, 11 Jun 2018 17:44:10 +0200 Subject: [PATCH 2/8] Introduce TimerHeap snapshots and key-group-partitioning --- .../api/operators/InternalTimerHeap.java | 18 ++- .../operators/InternalTimerHeapSnapshot.java | 151 ++++++++++++++++++ .../api/operators/TimerPartitionerTest.java | 55 +++++++ 3 files changed, 221 insertions(+), 3 deletions(-) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerPartitionerTest.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java index 94b3572e0e999..0abc29db2bdba 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.StateSnapshot; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; @@ -54,7 +55,7 @@ * @param type of the key of the internal timers managed by this priority queue. * @param type of the namespace of the internal timers managed by this priority queue. */ -public class InternalTimerHeap implements Iterable> {//implements Queue>, Set> { +public class InternalTimerHeap implements Iterable> { /** * A safe maximum size for arrays in the JVM. @@ -136,7 +137,7 @@ public InternalTimer peek() { * @param namespace the timer namespace. * @return true iff a new timer with given timestamp, key, and namespace was added to the heap. */ - public boolean scheduleTimer(long timestamp, K key, N namespace) { + public boolean scheduleTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) { return addInternal(new TimerHeapInternalTimer<>(timestamp, key, namespace)); } @@ -148,7 +149,7 @@ public boolean scheduleTimer(long timestamp, K key, N namespace) { * @param namespace the timer namespace. * @return true iff a timer with given timestamp, key, and namespace was found and removed from the heap. */ - public boolean stopTimer(long timestamp, K key, N namespace) { + public boolean stopTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) { return removeInternal(new TimerHeapInternalTimer<>(timestamp, key, namespace)); } @@ -210,12 +211,14 @@ void bulkAddRestoredTimers(Collection> restoredTim /** * Returns an unmodifiable set of all timers in the given key-group. */ + @Nonnull Set> getTimersForKeyGroup(@Nonnegative int keyGroupIdx) { return Collections.unmodifiableSet(getDedupMapForKeyGroup(keyGroupIdx).keySet()); } @VisibleForTesting @SuppressWarnings("unchecked") + @Nonnull List>> getTimersByKeyGroup() { List>> result = new ArrayList<>(deduplicationMapsByKeyGroup.length); for (int i = 0; i < deduplicationMapsByKeyGroup.length; ++i) { @@ -224,6 +227,15 @@ List>> getTimersByKeyGroup() { return result; } + @Nonnull + StateSnapshot snapshot(TimerHeapInternalTimer.TimerSerializer serializer) { + return new InternalTimerHeapSnapshot<>( + Arrays.copyOfRange(queue, 1, size + 1), + serializer, + keyGroupRange, + totalNumberOfKeyGroups); + } + private boolean addInternal(TimerHeapInternalTimer timer) { if (getDedupMapForTimer(timer).putIfAbsent(timer, timer) == null) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java new file mode 100644 index 0000000000000..cd00b7d442f03 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.AbstractKeyGroupPartitionedSnapshot; +import org.apache.flink.runtime.state.AbstractKeyGroupPartitioner; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.StateSnapshot; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * This class represents the snapshot of a {@link InternalTimerHeap}. + * + * @param type of key. + * @param type of namespace. + */ +public class InternalTimerHeapSnapshot implements StateSnapshot { + + /** Copy of the heap array containing all the (immutable timers). */ + @Nonnull + private final TimerHeapInternalTimer[] timerHeapArrayCopy; + + /** The timer serializer. */ + @Nonnull + private final TimerHeapInternalTimer.TimerSerializer timerSerializer; + + /** The key-group range covered by this snapshot. */ + @Nonnull + private final KeyGroupRange keyGroupRange; + + /** The total number of key-groups in the job. */ + @Nonnegative + private final int totalKeyGroups; + + /** Result of partitioning the snapshot by key-group. */ + @Nullable + private PartitionedInternalTimerHeapSnapshot partitionedSnapshot; + + InternalTimerHeapSnapshot( + @Nonnull TimerHeapInternalTimer[] timerHeapArrayCopy, + @Nonnull TimerHeapInternalTimer.TimerSerializer timerSerializer, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalKeyGroups) { + + this.timerHeapArrayCopy = timerHeapArrayCopy; + this.timerSerializer = timerSerializer; + this.keyGroupRange = keyGroupRange; + this.totalKeyGroups = totalKeyGroups; + } + + @Nonnull + @Override + public KeyGroupPartitionedSnapshot partitionByKeyGroup() { + + if (partitionedSnapshot == null) { + TimerPartitioner timerPartitioner = + new TimerPartitioner<>(timerHeapArrayCopy, keyGroupRange, totalKeyGroups); + partitionedSnapshot = new PartitionedInternalTimerHeapSnapshot(timerPartitioner.partitionByKeyGroup()); + } + + return partitionedSnapshot; + } + + @Override + public void release() { + } + + /** + * Implementation of {@link AbstractKeyGroupPartitioner} for {@link TimerHeapInternalTimer}. + * + * @param type of key. + * @param type of namespace. + */ + static final class TimerPartitioner extends AbstractKeyGroupPartitioner> { + + @Nonnull + private final TimerHeapInternalTimer[] timersIn; + + @Nonnull + private final TimerHeapInternalTimer[] timersOut; + + @SuppressWarnings("unchecked") + TimerPartitioner( + @Nonnull TimerHeapInternalTimer[] timers, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalKeyGroups) { + super(timers.length, keyGroupRange, totalKeyGroups); + this.timersIn = timers; + this.timersOut = new TimerHeapInternalTimer[timers.length]; + } + + @Nonnull + @Override + protected Object extractKeyFromElement(TimerHeapInternalTimer element) { + return element.getKey(); + } + + @Nonnull + @Override + protected TimerHeapInternalTimer[] getPartitioningInput() { + return timersIn; + } + + @Nonnull + @Override + protected TimerHeapInternalTimer[] getPartitioningOutput() { + return timersOut; + } + } + + /** + * Implementation of {@link AbstractKeyGroupPartitionedSnapshot} for {@link InternalTimerHeap}. + */ + private final class PartitionedInternalTimerHeapSnapshot + extends AbstractKeyGroupPartitionedSnapshot> { + + PartitionedInternalTimerHeapSnapshot( + @Nonnull AbstractKeyGroupPartitioner.PartitioningResult> partitioningResult) { + super(partitioningResult); + } + + @Override + protected void writeElement( + @Nonnull TimerHeapInternalTimer element, + @Nonnull DataOutputView dov) throws IOException { + timerSerializer.serialize(element, dov); + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerPartitionerTest.java new file mode 100644 index 0000000000000..5caf3179250ab --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerPartitionerTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.state.AbstractKeyGroupPartitioner; +import org.apache.flink.runtime.state.AbstractKeyGroupPartitionerTestBase; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; + +import java.util.Random; + +/** + * Test for {@link org.apache.flink.streaming.api.operators.InternalTimerHeapSnapshot.TimerPartitioner}. + */ +public class TimerPartitionerTest + extends AbstractKeyGroupPartitionerTestBase> { + + @Override + @SuppressWarnings("unchecked") + protected TimerHeapInternalTimer[] generateTestData( + Random random, + int numElementsToGenerate) { + final TimerHeapInternalTimer[] data = new TimerHeapInternalTimer[numElementsToGenerate]; + for (int i = 0; i < numElementsToGenerate; ++i) { + Integer key = random.nextInt() & Integer.MAX_VALUE; + data[i] = new TimerHeapInternalTimer<>(42L, key, VoidNamespace.INSTANCE); + } + return data; + } + + @Override + protected AbstractKeyGroupPartitioner> createPartitioner( + TimerHeapInternalTimer[] data, + int numElements, + KeyGroupRange keyGroupRange, + int totalKeyGroups) { + return new InternalTimerHeapSnapshot.TimerPartitioner<>(data, keyGroupRange, totalKeyGroups); + } +} From caacceecc5dcb6596b7309c2171cf87590551b71 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Wed, 13 Jun 2018 20:31:43 +0200 Subject: [PATCH 3/8] PR fixups --- .../runtime/state/AbstractKeyGroupPartitionedSnapshot.java | 3 --- .../flink/runtime/state/AbstractKeyGroupPartitioner.java | 2 ++ 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java index 4b562f8f6b139..9700c6ab57167 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java @@ -56,9 +56,6 @@ public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) // write mappings for (int i = startOffset; i < endOffset; ++i) { - if(groupedOut[i] == null) { - throw new IllegalStateException(); - } writeElement(groupedOut[i], dov); groupedOut[i] = null; // free asap for GC } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java index 79874a23add42..6d31111cb343e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java @@ -22,6 +22,7 @@ import javax.annotation.Nonnegative; import javax.annotation.Nonnull; +import javax.annotation.Nullable; /** * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works @@ -61,6 +62,7 @@ public abstract class AbstractKeyGroupPartitioner { protected final int firstKeyGroup; /** Cached result. */ + @Nullable protected PartitioningResult computedResult; /** From 5b8137ceb1abcff92605287b9a15cfdbf3c092d4 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Fri, 15 Jun 2018 10:48:23 +0200 Subject: [PATCH 4/8] Unify partitioned snapshots and pass writer strategy in constructor. --- ...a => KeyGroupPartitionedSnapshotImpl.java} | 33 +++++++++++----- .../heap/CopyOnWriteStateTableSnapshot.java | 38 +++++-------------- ...stractKeyGroupPartitionedSnapshotTest.java | 23 ++++++----- .../operators/InternalTimerHeapSnapshot.java | 30 +++------------ 4 files changed, 50 insertions(+), 74 deletions(-) rename flink-runtime/src/main/java/org/apache/flink/runtime/state/{AbstractKeyGroupPartitionedSnapshot.java => KeyGroupPartitionedSnapshotImpl.java} (66%) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitionedSnapshotImpl.java similarity index 66% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitionedSnapshotImpl.java index 9700c6ab57167..a668354156a50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitionedSnapshotImpl.java @@ -32,15 +32,21 @@ * * @param type of the written elements. */ -public abstract class AbstractKeyGroupPartitionedSnapshot implements StateSnapshot.KeyGroupPartitionedSnapshot { +public class KeyGroupPartitionedSnapshotImpl implements StateSnapshot.KeyGroupPartitionedSnapshot { /** The partitioning result to be written by key-group. */ @Nonnull private final AbstractKeyGroupPartitioner.PartitioningResult partitioningResult; - public AbstractKeyGroupPartitionedSnapshot( - @Nonnull AbstractKeyGroupPartitioner.PartitioningResult partitioningResult) { + /** This function defines how one element from the result is written to a {@link DataOutputView}. */ + @Nonnull + private final ElementWriterFunction elementWriterFunction; + + public KeyGroupPartitionedSnapshotImpl( + @Nonnull AbstractKeyGroupPartitioner.PartitioningResult partitioningResult, + @Nonnull ElementWriterFunction elementWriterFunction) { this.partitioningResult = partitioningResult; + this.elementWriterFunction = elementWriterFunction; } @Override @@ -56,18 +62,27 @@ public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) // write mappings for (int i = startOffset; i < endOffset; ++i) { - writeElement(groupedOut[i], dov); + elementWriterFunction.writeElement(groupedOut[i], dov); groupedOut[i] = null; // free asap for GC } } /** - * This method defines how to write a single element to the output. + * This functional interface defines how one element is written to a {@link DataOutputView}. * - * @param element the element to be written. - * @param dov the output view to write the element. - * @throws IOException on write-related problems. + * @param type of the written elements. */ - protected abstract void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) throws IOException; + @FunctionalInterface + public interface ElementWriterFunction { + + /** + * This method defines how to write a single element to the output. + * + * @param element the element to be written. + * @param dov the output view to write the element. + * @throws IOException on write-related problems. + */ + void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) throws IOException; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java index afb9c51c28c20..e5d6c2c09f2b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java @@ -21,18 +21,16 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.state.AbstractKeyGroupPartitionedSnapshot; import org.apache.flink.runtime.state.AbstractKeyGroupPartitioner; +import org.apache.flink.runtime.state.KeyGroupPartitionedSnapshotImpl; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.StateSnapshot; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.IOException; - /** * This class represents the snapshot of a {@link CopyOnWriteStateTable} and has a role in operator state checkpointing. Besides * holding the {@link CopyOnWriteStateTable}s internal entries at the time of the snapshot, this class is also responsible for @@ -94,7 +92,7 @@ public class CopyOnWriteStateTableSnapshot * to an output as part of checkpointing. */ @Nullable - private PartitionedStateTableSnapshot partitionedStateTableSnapshot; + private StateSnapshot.KeyGroupPartitionedSnapshot partitionedStateTableSnapshot; /** * Creates a new {@link CopyOnWriteStateTableSnapshot}. @@ -152,7 +150,13 @@ public KeyGroupPartitionedSnapshot partitionByKeyGroup() { keyGroupRange, numberOfKeyGroups); - partitionedStateTableSnapshot = new PartitionedStateTableSnapshot(keyGroupPartitioner.partitionByKeyGroup()); + partitionedStateTableSnapshot = new KeyGroupPartitionedSnapshotImpl<>( + keyGroupPartitioner.partitionByKeyGroup(), + (element, dov) -> { + localNamespaceSerializer.serialize(element.namespace, dov); + localKeySerializer.serialize(element.key, dov); + localStateSerializer.serialize(element.state, dov); + }); } return partitionedStateTableSnapshot; @@ -231,26 +235,4 @@ protected CopyOnWriteStateTable.StateTableEntry[] getPartitioningOutput return snapshotData; } } - - /** - * This class represents a {@link org.apache.flink.runtime.state.StateSnapshot.KeyGroupPartitionedSnapshot} for - * {@link CopyOnWriteStateTable}. - */ - private final class PartitionedStateTableSnapshot - extends AbstractKeyGroupPartitionedSnapshot> { - - public PartitionedStateTableSnapshot( - @Nonnull AbstractKeyGroupPartitioner.PartitioningResult> partitioningResult) { - super(partitioningResult); - } - - @Override - protected void writeElement( - @Nonnull CopyOnWriteStateTable.StateTableEntry element, - @Nonnull DataOutputView dov) throws IOException { - localNamespaceSerializer.serialize(element.namespace, dov); - localKeySerializer.serialize(element.key, dov); - localStateSerializer.serialize(element.state, dov); - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshotTest.java index c575352b5126d..d882a62901333 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshotTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshotTest.java @@ -24,12 +24,13 @@ import org.junit.Assert; import org.junit.Test; +import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import java.io.IOException; /** - * Tests for {@link AbstractKeyGroupPartitionedSnapshot}. + * Tests for {@link KeyGroupPartitionedSnapshotImpl}. */ public class AbstractKeyGroupPartitionedSnapshotTest { @@ -60,34 +61,32 @@ private void doTestWriteMappingsInKeyGroup(KeyGroupRange testRange) throws IOExc AbstractKeyGroupPartitioner.PartitioningResult result = new AbstractKeyGroupPartitioner.PartitioningResult<>(testRange.getStartKeyGroup(), offsets, data); - - TestAbstractKeyGroupPartitionedSnapshot testInstance = new TestAbstractKeyGroupPartitionedSnapshot(result); + final TestElementWriter testElementWriter = new TestElementWriter(result.getNumberOfKeyGroups()); + StateSnapshot.KeyGroupPartitionedSnapshot testInstance = + new KeyGroupPartitionedSnapshotImpl<>(result, testElementWriter); DataOutputView dummyOut = new DataOutputViewStreamWrapper(new ByteArrayOutputStreamWithPos()); - for (Integer keyGroup : testRange) { testInstance.writeMappingsInKeyGroup(dummyOut, keyGroup); } - testInstance.validateCounts(); + testElementWriter.validateCounts(); } /** - * Simple test implementation with validation of {@link AbstractKeyGroupPartitionedSnapshot}. + * Simple test implementation with validation of {@link KeyGroupPartitionedSnapshotImpl.ElementWriterFunction}. */ - static final class TestAbstractKeyGroupPartitionedSnapshot extends AbstractKeyGroupPartitionedSnapshot { + static final class TestElementWriter implements KeyGroupPartitionedSnapshotImpl.ElementWriterFunction { @Nonnull private final int[] countCheck; - TestAbstractKeyGroupPartitionedSnapshot( - @Nonnull AbstractKeyGroupPartitioner.PartitioningResult partitioningResult) { - super(partitioningResult); - this.countCheck = new int[partitioningResult.getNumberOfKeyGroups()]; + TestElementWriter(@Nonnegative int numberOfKeyGroups) { + this.countCheck = new int[numberOfKeyGroups]; } @Override - protected void writeElement(@Nonnull Integer element, @Nonnull DataOutputView dov) throws IOException { + public void writeElement(@Nonnull Integer element, @Nonnull DataOutputView dov) { ++countCheck[element]; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java index cd00b7d442f03..e512a091a5193 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java @@ -18,9 +18,8 @@ package org.apache.flink.streaming.api.operators; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.state.AbstractKeyGroupPartitionedSnapshot; import org.apache.flink.runtime.state.AbstractKeyGroupPartitioner; +import org.apache.flink.runtime.state.KeyGroupPartitionedSnapshotImpl; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.StateSnapshot; @@ -28,8 +27,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.IOException; - /** * This class represents the snapshot of a {@link InternalTimerHeap}. * @@ -56,7 +53,7 @@ public class InternalTimerHeapSnapshot implements StateSnapshot { /** Result of partitioning the snapshot by key-group. */ @Nullable - private PartitionedInternalTimerHeapSnapshot partitionedSnapshot; + private KeyGroupPartitionedSnapshot partitionedSnapshot; InternalTimerHeapSnapshot( @Nonnull TimerHeapInternalTimer[] timerHeapArrayCopy, @@ -77,7 +74,9 @@ public KeyGroupPartitionedSnapshot partitionByKeyGroup() { if (partitionedSnapshot == null) { TimerPartitioner timerPartitioner = new TimerPartitioner<>(timerHeapArrayCopy, keyGroupRange, totalKeyGroups); - partitionedSnapshot = new PartitionedInternalTimerHeapSnapshot(timerPartitioner.partitionByKeyGroup()); + partitionedSnapshot = new KeyGroupPartitionedSnapshotImpl<>( + timerPartitioner.partitionByKeyGroup(), + timerSerializer::serialize); } return partitionedSnapshot; @@ -129,23 +128,4 @@ protected TimerHeapInternalTimer[] getPartitioningOutput() { return timersOut; } } - - /** - * Implementation of {@link AbstractKeyGroupPartitionedSnapshot} for {@link InternalTimerHeap}. - */ - private final class PartitionedInternalTimerHeapSnapshot - extends AbstractKeyGroupPartitionedSnapshot> { - - PartitionedInternalTimerHeapSnapshot( - @Nonnull AbstractKeyGroupPartitioner.PartitioningResult> partitioningResult) { - super(partitioningResult); - } - - @Override - protected void writeElement( - @Nonnull TimerHeapInternalTimer element, - @Nonnull DataOutputView dov) throws IOException { - timerSerializer.serialize(element, dov); - } - } } From 22371e23482ba0417574c135a80a255e4ed401a3 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Fri, 15 Jun 2018 17:01:59 +0200 Subject: [PATCH 5/8] Additional refactorings to remove unecessary classes. --- .../KeyGroupPartitionedSnapshotImpl.java | 88 --------- ...titioner.java => KeyGroupPartitioner.java} | 176 ++++++++++++------ .../state/heap/CopyOnWriteStateTable.java | 41 ++-- .../heap/CopyOnWriteStateTableSnapshot.java | 62 +++--- ...stractKeyGroupPartitionedSnapshotTest.java | 103 ---------- .../AbstractKeyGroupPartitionerTestBase.java | 82 -------- .../state/KeyGroupPartitionerTestBase.java | 176 ++++++++++++++++++ .../StateTableKeyGroupPartitionerTest.java | 68 +++++-- .../operators/InternalTimerHeapSnapshot.java | 68 ++----- .../KeyGroupPartitionerForTimersTest.java | 36 ++++ .../api/operators/TimerPartitionerTest.java | 55 ------ 11 files changed, 445 insertions(+), 510 deletions(-) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitionedSnapshotImpl.java rename flink-runtime/src/main/java/org/apache/flink/runtime/state/{AbstractKeyGroupPartitioner.java => KeyGroupPartitioner.java} (54%) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshotTest.java delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionerTestBase.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyGroupPartitionerForTimersTest.java delete mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerPartitionerTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitionedSnapshotImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitionedSnapshotImpl.java deleted file mode 100644 index a668354156a50..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitionedSnapshotImpl.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.core.memory.DataOutputView; - -import javax.annotation.Nonnull; - -import java.io.IOException; - - -/** - * Abstract base class for implementations of - * {@link org.apache.flink.runtime.state.StateSnapshot.KeyGroupPartitionedSnapshot} based on the result of a - * {@link AbstractKeyGroupPartitioner}. - * - * @param type of the written elements. - */ -public class KeyGroupPartitionedSnapshotImpl implements StateSnapshot.KeyGroupPartitionedSnapshot { - - /** The partitioning result to be written by key-group. */ - @Nonnull - private final AbstractKeyGroupPartitioner.PartitioningResult partitioningResult; - - /** This function defines how one element from the result is written to a {@link DataOutputView}. */ - @Nonnull - private final ElementWriterFunction elementWriterFunction; - - public KeyGroupPartitionedSnapshotImpl( - @Nonnull AbstractKeyGroupPartitioner.PartitioningResult partitioningResult, - @Nonnull ElementWriterFunction elementWriterFunction) { - this.partitioningResult = partitioningResult; - this.elementWriterFunction = elementWriterFunction; - } - - @Override - public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException { - - final T[] groupedOut = partitioningResult.getPartitionedElements(); - - int startOffset = partitioningResult.getKeyGroupStartOffsetInclusive(keyGroupId); - int endOffset = partitioningResult.getKeyGroupEndOffsetExclusive(keyGroupId); - - // write number of mappings in key-group - dov.writeInt(endOffset - startOffset); - - // write mappings - for (int i = startOffset; i < endOffset; ++i) { - elementWriterFunction.writeElement(groupedOut[i], dov); - groupedOut[i] = null; // free asap for GC - } - } - - /** - * This functional interface defines how one element is written to a {@link DataOutputView}. - * - * @param type of the written elements. - */ - @FunctionalInterface - public interface ElementWriterFunction { - - /** - * This method defines how to write a single element to the output. - * - * @param element the element to be written. - * @param dov the output view to write the element. - * @throws IOException on write-related problems. - */ - void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) throws IOException; - } -} - diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java similarity index 54% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java index 6d31111cb343e..9d1065b13b629 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java @@ -18,19 +18,35 @@ package org.apache.flink.runtime.state; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.IOException; + /** * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works * with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a * single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but * better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity. */ -public abstract class AbstractKeyGroupPartitioner { +public class KeyGroupPartitioner { + + /** + * The input data for the partitioning. All elements to consider must be densely in the index interval + * [0, {@link #numberOfElements}[, without null values. + */ + @Nonnull + protected final T[] partitioningSource; + + /** + * The output array for the partitioning. The size must be {@link #numberOfElements} (or bigger). + */ + @Nonnull + protected final T[] partitioningDestination; /** Total number of input elements. */ @Nonnegative @@ -61,22 +77,49 @@ public abstract class AbstractKeyGroupPartitioner { @Nonnegative protected final int firstKeyGroup; + /** Function to extract the key from a given element. */ + @Nonnull + protected final KeyExtractorFunction keyExtractorFunction; + + /** Function to write an element to a {@link DataOutputView}. */ + @Nonnull + protected final ElementWriterFunction elementWriterFunction; + /** Cached result. */ @Nullable - protected PartitioningResult computedResult; + protected StateSnapshot.KeyGroupPartitionedSnapshot computedResult; /** + * Creates a new {@link KeyGroupPartitioner}. + * + * @param partitioningSource the input for the partitioning. All elements must be densely packed in the index + * interval [0, {@link #numberOfElements}[, without null values. + * @param numberOfElements the number of elements to consider from the input, starting at input index 0. + * @param partitioningDestination the output of the partitioning. Must have capacity of at least numberOfElements. * @param keyGroupRange the key-group range of the data that will be partitioned by this instance. * @param totalKeyGroups the total number of key groups in the job. + * @param keyExtractorFunction this function extracts the partition key from an element. */ - public AbstractKeyGroupPartitioner( + public KeyGroupPartitioner( + @Nonnull T[] partitioningSource, @Nonnegative int numberOfElements, + @Nonnull T[] partitioningDestination, @Nonnull KeyGroupRange keyGroupRange, - @Nonnegative int totalKeyGroups) { + @Nonnegative int totalKeyGroups, + @Nonnull KeyExtractorFunction keyExtractorFunction, + @Nonnull ElementWriterFunction elementWriterFunction) { + Preconditions.checkState(partitioningSource != partitioningDestination); + Preconditions.checkState(partitioningSource.length >= numberOfElements); + Preconditions.checkState(partitioningDestination.length >= numberOfElements); + + this.partitioningSource = partitioningSource; + this.partitioningDestination = partitioningDestination; this.numberOfElements = numberOfElements; this.keyGroupRange = keyGroupRange; this.totalKeyGroups = totalKeyGroups; + this.keyExtractorFunction = keyExtractorFunction; + this.elementWriterFunction = elementWriterFunction; this.firstKeyGroup = keyGroupRange.getStartKeyGroup(); this.elementKeyGroups = new int[numberOfElements]; this.counterHistogram = new int[keyGroupRange.getNumberOfKeyGroups()]; @@ -86,10 +129,10 @@ public AbstractKeyGroupPartitioner( /** * Partitions the data into key-groups and returns the result via {@link PartitioningResult}. */ - public PartitioningResult partitionByKeyGroup() { + public StateSnapshot.KeyGroupPartitionedSnapshot partitionByKeyGroup() { if (computedResult == null) { reportAllElementKeyGroups(); - buildHistogramFromCounts(); + buildHistogramByAccumulatingCounts(); executePartitioning(); } return computedResult; @@ -99,35 +142,16 @@ public PartitioningResult partitionByKeyGroup() { * This method iterates over the input data and reports the key-group for each element. */ protected void reportAllElementKeyGroups() { - final T[] input = getPartitioningInput(); - Preconditions.checkState(input.length >= numberOfElements); + Preconditions.checkState(partitioningSource.length >= numberOfElements); for (int i = 0; i < numberOfElements; ++i) { - int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement(input[i]), totalKeyGroups); + int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup( + keyExtractorFunction.extractKeyFromElement(partitioningSource[i]), totalKeyGroups); reportKeyGroupOfElementAtIndex(i, keyGroup); } } - /** - * Returns the key for the given element by which the key-group can be computed. - */ - @Nonnull - protected abstract Object extractKeyFromElement(T element); - - /** - * Returns the input data for the partitioning. All elements to consider must be densely in the index interval - * [0, {@link #numberOfElements}[, without null values. - */ - @Nonnull - protected abstract T[] getPartitioningInput(); - - /** - * Returns the output array for the partitioning. The size must be {@link #numberOfElements} (or bigger). - */ - @Nonnull - protected abstract T[] getPartitioningOutput(); - /** * This method reports in the bookkeeping data that the element at the given index belongs to the given key-group. */ @@ -140,39 +164,45 @@ protected void reportKeyGroupOfElementAtIndex(int index, int keyGroup) { /** * This method creates a histogram from the counts per key-group in {@link #counterHistogram}. */ - private void buildHistogramFromCounts() { + private void buildHistogramByAccumulatingCounts() { int sum = 0; for (int i = 0; i < counterHistogram.length; ++i) { int currentSlotValue = counterHistogram[i]; counterHistogram[i] = sum; sum += currentSlotValue; } + + // sanity check that the sum matches the expected number of elements. + Preconditions.checkState(sum == numberOfElements); } private void executePartitioning() { - final T[] in = getPartitioningInput(); - final T[] out = getPartitioningOutput(); - - Preconditions.checkState(in != out); - Preconditions.checkState(in.length >= numberOfElements); - Preconditions.checkState(out.length >= numberOfElements); - // We repartition the entries by their pre-computed key-groups, using the histogram values as write indexes for (int inIdx = 0; inIdx < numberOfElements; ++inIdx) { int effectiveKgIdx = elementKeyGroups[inIdx]; int outIdx = counterHistogram[effectiveKgIdx]++; - out[outIdx] = in[inIdx]; + partitioningDestination[outIdx] = partitioningSource[inIdx]; } - this.computedResult = new PartitioningResult<>(firstKeyGroup, counterHistogram, out); + this.computedResult = new PartitioningResult<>( + elementWriterFunction, + firstKeyGroup, + counterHistogram, + partitioningDestination); } /** * This represents the result of key-group partitioning. The data in {@link #partitionedElements} is partitioned - * w.r.t. {@link AbstractKeyGroupPartitioner#keyGroupRange}. + * w.r.t. {@link KeyGroupPartitioner#keyGroupRange}. */ - public static class PartitioningResult { + public static class PartitioningResult implements StateSnapshot.KeyGroupPartitionedSnapshot { + + /** + * Function to write one element to a {@link DataOutputView}. + */ + @Nonnull + private final ElementWriterFunction elementWriterFunction; /** * The exclusive-end-offsets for all key-groups of the covered range for the partitioning. Exclusive-end-offset @@ -188,42 +218,78 @@ public static class PartitioningResult { @Nonnull private final T[] partitionedElements; + /** + * The first key-group of the range covered in the partitioning. + */ + @Nonnegative private final int firstKeyGroup; PartitioningResult( + @Nonnull ElementWriterFunction elementWriterFunction, @Nonnegative int firstKeyGroup, @Nonnull int[] keyGroupEndOffsets, @Nonnull T[] partitionedElements) { - + this.elementWriterFunction = elementWriterFunction; this.firstKeyGroup = firstKeyGroup; this.keyGroupOffsets = keyGroupEndOffsets; this.partitionedElements = partitionedElements; } - @Nonnull - public T[] getPartitionedElements() { - return partitionedElements; - } - @Nonnegative - public int getKeyGroupStartOffsetInclusive(int keyGroup) { + private int getKeyGroupStartOffsetInclusive(int keyGroup) { int idx = keyGroup - firstKeyGroup - 1; return idx < 0 ? 0 : keyGroupOffsets[idx]; } @Nonnegative - public int getKeyGroupEndOffsetExclusive(int keyGroup) { + private int getKeyGroupEndOffsetExclusive(int keyGroup) { return keyGroupOffsets[keyGroup - firstKeyGroup]; } - @Nonnegative - public int getFirstKeyGroup() { - return firstKeyGroup; - } + @Override + public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException { - @Nonnegative - public int getNumberOfKeyGroups() { - return keyGroupOffsets.length; + int startOffset = getKeyGroupStartOffsetInclusive(keyGroupId); + int endOffset = getKeyGroupEndOffsetExclusive(keyGroupId); + + // write number of mappings in key-group + dov.writeInt(endOffset - startOffset); + + // write mappings + for (int i = startOffset; i < endOffset; ++i) { + elementWriterFunction.writeElement(partitionedElements[i], dov); + } } } + + /** + * @param type of the element from which we extract the key. + */ + @FunctionalInterface + public interface KeyExtractorFunction { + + /** + * Returns the key for the given element by which the key-group can be computed. + */ + @Nonnull + Object extractKeyFromElement(@Nonnull T element); + } + + /** + * This functional interface defines how one element is written to a {@link DataOutputView}. + * + * @param type of the written elements. + */ + @FunctionalInterface + public interface ElementWriterFunction { + + /** + * This method defines how to write a single element to the output. + * + * @param element the element to be written. + * @param dov the output view to write the element. + * @throws IOException on write-related problems. + */ + void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) throws IOException; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java index 01be0ede80fda..d28ed4673ace9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java @@ -28,6 +28,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.ConcurrentModificationException; import java.util.Iterator; @@ -129,7 +132,8 @@ public class CopyOnWriteStateTable extends StateTable implemen /** * Empty entry that we use to bootstrap our {@link CopyOnWriteStateTable.StateEntryIterator}. */ - private static final StateTableEntry ITERATOR_BOOTSTRAP_ENTRY = new StateTableEntry<>(); + private static final StateTableEntry ITERATOR_BOOTSTRAP_ENTRY = + new StateTableEntry<>(new Object(), new Object(), new Object(), 0, null, 0, 0); /** * Maintains an ordered set of version ids that are still in use by unreleased snapshots. @@ -291,10 +295,9 @@ public S get(K key, N namespace) { @Override public Stream getKeys(N namespace) { - Iterable> iterable = () -> iterator(); - return StreamSupport.stream(iterable.spliterator(), false) + return StreamSupport.stream(spliterator(), false) .filter(entry -> entry.getNamespace().equals(namespace)) - .map(entry -> entry.getKey()); + .map(StateEntry::getKey); } @Override @@ -554,6 +557,7 @@ public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo metaInfo) { // Iteration ------------------------------------------------------------------------------------------------------ + @Nonnull @Override public Iterator> iterator() { return new StateEntryIterator(); @@ -884,22 +888,26 @@ protected static class StateTableEntry implements StateEntry { /** * The key. Assumed to be immutable and not null. */ + @Nonnull final K key; /** * The namespace. Assumed to be immutable and not null. */ + @Nonnull final N namespace; /** * The state. This is not final to allow exchanging the object for copy-on-write. Can be null. */ + @Nullable S state; /** * Link to another {@link StateTableEntry}. This is used to resolve collisions in the * {@link CopyOnWriteStateTable} through chaining. */ + @Nullable StateTableEntry next; /** @@ -917,22 +925,18 @@ protected static class StateTableEntry implements StateEntry { */ final int hash; - StateTableEntry() { - this(null, null, null, 0, null, 0, 0); - } - StateTableEntry(StateTableEntry other, int entryVersion) { this(other.key, other.namespace, other.state, other.hash, other.next, entryVersion, other.stateVersion); } StateTableEntry( - K key, - N namespace, - S state, - int hash, - StateTableEntry next, - int entryVersion, - int stateVersion) { + @Nonnull K key, + @Nonnull N namespace, + @Nullable S state, + int hash, + @Nullable StateTableEntry next, + int entryVersion, + int stateVersion) { this.key = key; this.namespace = namespace; this.hash = hash; @@ -942,7 +946,7 @@ protected static class StateTableEntry implements StateEntry { this.stateVersion = stateVersion; } - public final void setState(S value, int mapVersion) { + public final void setState(@Nullable S value, int mapVersion) { // naturally, we can update the state version every time we replace the old state with a different object if (value != state) { this.state = value; @@ -950,16 +954,19 @@ public final void setState(S value, int mapVersion) { } } + @Nonnull @Override public K getKey() { return key; } + @Nonnull @Override public N getNamespace() { return namespace; } + @Nullable @Override public S getState() { return state; @@ -1011,7 +1018,7 @@ class StateEntryIterator implements Iterator> { private StateTableEntry[] activeTable; private int nextTablePosition; private StateTableEntry nextEntry; - private int expectedModCount = modCount; + private int expectedModCount; StateEntryIterator() { this.activeTable = primaryTable; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java index e5d6c2c09f2b6..cf0056ecff81d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java @@ -21,8 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.state.AbstractKeyGroupPartitioner; -import org.apache.flink.runtime.state.KeyGroupPartitionedSnapshotImpl; +import org.apache.flink.runtime.state.KeyGroupPartitioner; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.StateSnapshot; @@ -144,19 +143,18 @@ public KeyGroupPartitionedSnapshot partitionByKeyGroup() { final KeyGroupRange keyGroupRange = keyContext.getKeyGroupRange(); final int numberOfKeyGroups = keyContext.getNumberOfKeyGroups(); - StateTableKeyGroupPartitioner keyGroupPartitioner = new StateTableKeyGroupPartitioner<>( + final StateTableKeyGroupPartitioner keyGroupPartitioner = new StateTableKeyGroupPartitioner<>( snapshotData, numberOfEntriesInSnapshotData, keyGroupRange, - numberOfKeyGroups); - - partitionedStateTableSnapshot = new KeyGroupPartitionedSnapshotImpl<>( - keyGroupPartitioner.partitionByKeyGroup(), + numberOfKeyGroups, (element, dov) -> { localNamespaceSerializer.serialize(element.namespace, dov); localKeySerializer.serialize(element.key, dov); localStateSerializer.serialize(element.state, dov); - }); + }); + + partitionedStateTableSnapshot = keyGroupPartitioner.partitionByKeyGroup(); } return partitionedStateTableSnapshot; @@ -175,7 +173,9 @@ boolean isOwner(CopyOnWriteStateTable stateTable) { } /** - * This class is the implementation of {@link AbstractKeyGroupPartitioner} for {@link CopyOnWriteStateTable}. + * This class is the implementation of {@link KeyGroupPartitioner} for {@link CopyOnWriteStateTable}. This class + * swaps input and output in {@link #reportAllElementKeyGroups()} for performance reasons, so that we can reuse + * the non-flattened original snapshot array as partitioning output. * * @param type of key. * @param type of namespace. @@ -183,56 +183,38 @@ boolean isOwner(CopyOnWriteStateTable stateTable) { */ @VisibleForTesting protected static final class StateTableKeyGroupPartitioner - extends AbstractKeyGroupPartitioner> { - - @Nonnull - private final CopyOnWriteStateTable.StateTableEntry[] snapshotData; - - @Nonnull - private final CopyOnWriteStateTable.StateTableEntry[] flattenedData; + extends KeyGroupPartitioner> { @SuppressWarnings("unchecked") StateTableKeyGroupPartitioner( @Nonnull CopyOnWriteStateTable.StateTableEntry[] snapshotData, @Nonnegative int stateTableSize, @Nonnull KeyGroupRange keyGroupRange, - @Nonnegative int totalKeyGroups) { + @Nonnegative int totalKeyGroups, + @Nonnull ElementWriterFunction> elementWriterFunction) { - super(stateTableSize, keyGroupRange, totalKeyGroups); - this.snapshotData = snapshotData; - this.flattenedData = new CopyOnWriteStateTable.StateTableEntry[numberOfElements]; + super( + new CopyOnWriteStateTable.StateTableEntry[stateTableSize], + stateTableSize, + snapshotData, + keyGroupRange, + totalKeyGroups, + CopyOnWriteStateTable.StateTableEntry::getKey, + elementWriterFunction); } @Override protected void reportAllElementKeyGroups() { // In this step we i) 'flatten' the linked list of entries to a second array and ii) report key-groups. int flattenIndex = 0; - for (CopyOnWriteStateTable.StateTableEntry entry : snapshotData) { + for (CopyOnWriteStateTable.StateTableEntry entry : partitioningDestination) { while (null != entry) { final int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(entry.key, totalKeyGroups); reportKeyGroupOfElementAtIndex(flattenIndex, keyGroup); - flattenedData[flattenIndex++] = entry; + partitioningSource[flattenIndex++] = entry; entry = entry.next; } } } - - @Nonnull - @Override - protected Object extractKeyFromElement(CopyOnWriteStateTable.StateTableEntry element) { - return element.getKey(); - } - - @Nonnull - @Override - protected CopyOnWriteStateTable.StateTableEntry[] getPartitioningInput() { - return flattenedData; - } - - @Nonnull - @Override - protected CopyOnWriteStateTable.StateTableEntry[] getPartitioningOutput() { - return snapshotData; - } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshotTest.java deleted file mode 100644 index d882a62901333..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshotTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.runtime.state; - -import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; - -import org.junit.Assert; -import org.junit.Test; - -import javax.annotation.Nonnegative; -import javax.annotation.Nonnull; - -import java.io.IOException; - -/** - * Tests for {@link KeyGroupPartitionedSnapshotImpl}. - */ -public class AbstractKeyGroupPartitionedSnapshotTest { - - @Test - public void testWriteMappingsInKeyGroup() throws IOException { - doTestWriteMappingsInKeyGroup(KeyGroupRange.of(0, 0)); - doTestWriteMappingsInKeyGroup(KeyGroupRange.of(0, 3)); - doTestWriteMappingsInKeyGroup(KeyGroupRange.of(1, 2)); - doTestWriteMappingsInKeyGroup(KeyGroupRange.of(3, 8)); - } - - private void doTestWriteMappingsInKeyGroup(KeyGroupRange testRange) throws IOException { - int[] offsets = new int[testRange.getNumberOfKeyGroups()]; - Integer[] data = new Integer[gaussSumZeroToX(offsets.length)]; - - int pos = 0; - for (int i = 0; i < offsets.length; ++i) { - - int endIdx = gaussSumZeroToX(i); - offsets[i] = endIdx; - - while (pos < endIdx) { - data[pos] = i; - ++pos; - } - } - - AbstractKeyGroupPartitioner.PartitioningResult result = - new AbstractKeyGroupPartitioner.PartitioningResult<>(testRange.getStartKeyGroup(), offsets, data); - - final TestElementWriter testElementWriter = new TestElementWriter(result.getNumberOfKeyGroups()); - StateSnapshot.KeyGroupPartitionedSnapshot testInstance = - new KeyGroupPartitionedSnapshotImpl<>(result, testElementWriter); - DataOutputView dummyOut = new DataOutputViewStreamWrapper(new ByteArrayOutputStreamWithPos()); - - for (Integer keyGroup : testRange) { - testInstance.writeMappingsInKeyGroup(dummyOut, keyGroup); - } - - testElementWriter.validateCounts(); - } - - /** - * Simple test implementation with validation of {@link KeyGroupPartitionedSnapshotImpl.ElementWriterFunction}. - */ - static final class TestElementWriter implements KeyGroupPartitionedSnapshotImpl.ElementWriterFunction { - - @Nonnull - private final int[] countCheck; - - TestElementWriter(@Nonnegative int numberOfKeyGroups) { - this.countCheck = new int[numberOfKeyGroups]; - } - - @Override - public void writeElement(@Nonnull Integer element, @Nonnull DataOutputView dov) { - ++countCheck[element]; - } - - void validateCounts() { - for (int i = 0; i < countCheck.length; ++i) { - Assert.assertEquals("Key-group count does not match expectation.", i, countCheck[i]); - } - } - } - - private static int gaussSumZeroToX(int x) { - return (x * (x + 1)) / 2; - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionerTestBase.java deleted file mode 100644 index 16736573f7d75..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionerTestBase.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.util.TestLogger; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.Random; - -/** - * Abstract test base for implementations of {@link AbstractKeyedStateBackend}. - */ -public abstract class AbstractKeyGroupPartitionerTestBase extends TestLogger { - - @Test - public void testPartitionByKeyGroup() { - - final Random random = new Random(0x42); - testPartitionByKeyGroupForSize(0, random); - testPartitionByKeyGroupForSize(1, random); - testPartitionByKeyGroupForSize(2, random); - testPartitionByKeyGroupForSize(1000, random); - } - - @SuppressWarnings("unchecked") - private void testPartitionByKeyGroupForSize(int testSize, Random random) { - - final T[] data = generateTestData(random, testSize); - - // Test with 5 key-groups. - final KeyGroupRange range = new KeyGroupRange(0, 4); - final int numberOfKeyGroups = range.getNumberOfKeyGroups(); - final AbstractKeyGroupPartitioner testInstance = createPartitioner(data, testSize, range, numberOfKeyGroups); - final AbstractKeyGroupPartitioner.PartitioningResult result = testInstance.partitionByKeyGroup(); - - final T[] partitionedElements = result.getPartitionedElements(); - - Assert.assertEquals(numberOfKeyGroups, result.getNumberOfKeyGroups()); - Assert.assertEquals(range.getStartKeyGroup(), result.getFirstKeyGroup()); - Assert.assertEquals(data.length, partitionedElements.length); - Assert.assertEquals(0, result.getKeyGroupStartOffsetInclusive(range.getStartKeyGroup())); - Assert.assertEquals(testSize, result.getKeyGroupEndOffsetExclusive(range.getEndKeyGroup())); - - for (int keyGroup = 0; keyGroup < result.getNumberOfKeyGroups(); ++keyGroup) { - int start = result.getKeyGroupStartOffsetInclusive(keyGroup); - int end = result.getKeyGroupEndOffsetExclusive(keyGroup); - for (int i = start; i < end; ++i) { - Assert.assertEquals("Mismatch at index " + i, - keyGroup, - KeyGroupRangeAssignment.assignToKeyGroup( - testInstance.extractKeyFromElement(partitionedElements[i]), - numberOfKeyGroups)); - } - } - } - - abstract protected T[] generateTestData(Random random, int numElementsToGenerate); - - abstract protected AbstractKeyGroupPartitioner createPartitioner( - T[] data, - int numElements, - KeyGroupRange keyGroupRange, - int totalKeyGroups); -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java new file mode 100644 index 0000000000000..9298743b95d75 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.lang.reflect.Array; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Random; +import java.util.Set; +import java.util.function.Function; + +/** + * Abstract test base for implementations of {@link KeyGroupPartitioner}. + */ +public abstract class KeyGroupPartitionerTestBase extends TestLogger { + + private static final DataOutputView DUMMY_OUT_VIEW = + new DataOutputViewStreamWrapper(new ByteArrayOutputStreamWithPos(0)); + + @Nonnull + protected final KeyGroupPartitioner.KeyExtractorFunction keyExtractorFunction; + + @Nonnull + protected final Function elementGenerator; + + protected KeyGroupPartitionerTestBase( + @Nonnull Function elementGenerator, + @Nonnull KeyGroupPartitioner.KeyExtractorFunction keyExtractorFunction) { + + this.elementGenerator = elementGenerator; + this.keyExtractorFunction = keyExtractorFunction; + } + + @Test + public void testPartitionByKeyGroup() throws IOException { + + final Random random = new Random(0x42); + testPartitionByKeyGroupForSize(0, random); + testPartitionByKeyGroupForSize(1, random); + testPartitionByKeyGroupForSize(2, random); + testPartitionByKeyGroupForSize(10, random); + } + + @SuppressWarnings("unchecked") + private void testPartitionByKeyGroupForSize(int testSize, Random random) throws IOException { + + final Set allElementsIdentitySet = Collections.newSetFromMap(new IdentityHashMap<>()); + final T[] data = generateTestInput(random, testSize, allElementsIdentitySet); + + Assert.assertEquals(testSize, allElementsIdentitySet.size()); + + // Test with 5 key-groups. + final KeyGroupRange range = new KeyGroupRange(0, 4); + final int numberOfKeyGroups = range.getNumberOfKeyGroups(); + + final ValidatingElementWriterDummy validatingElementWriter = + new ValidatingElementWriterDummy<>(keyExtractorFunction, numberOfKeyGroups, allElementsIdentitySet); + + final KeyGroupPartitioner testInstance = createPartitioner(data, testSize, range, numberOfKeyGroups, validatingElementWriter); + final StateSnapshot.KeyGroupPartitionedSnapshot result = testInstance.partitionByKeyGroup(); + + for (int keyGroup = 0; keyGroup < numberOfKeyGroups; ++keyGroup) { + validatingElementWriter.setCurrentKeyGroup(keyGroup); + result.writeMappingsInKeyGroup(DUMMY_OUT_VIEW, keyGroup); + } + + validatingElementWriter.validateAllElementsSeen(); + } + + @SuppressWarnings("unchecked") + protected T[] generateTestInput(Random random, int numElementsToGenerate, Set allElementsIdentitySet) { + + final int arraySize = numElementsToGenerate > 1 ? numElementsToGenerate + 5 : numElementsToGenerate; + T element = elementGenerator.apply(random); + final T[] partitioningIn = (T[]) Array.newInstance(element.getClass(), arraySize); + + for (int i = 0; i < numElementsToGenerate; ++i) { + partitioningIn[i] = element; + allElementsIdentitySet.add(element); + element = elementGenerator.apply(random); + } + + Assert.assertEquals(numElementsToGenerate, allElementsIdentitySet.size()); + return partitioningIn; + } + + @SuppressWarnings("unchecked") + protected KeyGroupPartitioner createPartitioner( + T[] data, + int numElements, + KeyGroupRange keyGroupRange, + int totalKeyGroups, + KeyGroupPartitioner.ElementWriterFunction elementWriterFunction) { + + final T[] partitioningOut = (T[]) Array.newInstance(data.getClass().getComponentType(), numElements); + return new KeyGroupPartitioner<>( + data, + numElements, + partitioningOut, + keyGroupRange, + totalKeyGroups, + keyExtractorFunction, + elementWriterFunction); + } + + + /** + * Simple test implementation with validation . + */ + static final class ValidatingElementWriterDummy implements KeyGroupPartitioner.ElementWriterFunction { + + @Nonnull + private final KeyGroupPartitioner.KeyExtractorFunction keyExtractorFunction; + @Nonnegative + private final int numberOfKeyGroups; + @Nonnull + private final Set allElementsSet; + @Nonnegative + private int currentKeyGroup; + + ValidatingElementWriterDummy( + @Nonnull KeyGroupPartitioner.KeyExtractorFunction keyExtractorFunction, + @Nonnegative int numberOfKeyGroups, + @Nonnull Set allElementsSet) { + this.keyExtractorFunction = keyExtractorFunction; + this.numberOfKeyGroups = numberOfKeyGroups; + this.allElementsSet = allElementsSet; + } + + @Override + public void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) { + Assert.assertTrue(allElementsSet.remove(element)); + Assert.assertEquals( + currentKeyGroup, + KeyGroupRangeAssignment.assignToKeyGroup( + keyExtractorFunction.extractKeyFromElement(element), + numberOfKeyGroups)); + } + + void validateAllElementsSeen() { + Assert.assertTrue(allElementsSet.isEmpty()); + } + + void setCurrentKeyGroup(int currentKeyGroup) { + this.currentKeyGroup = currentKeyGroup; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableKeyGroupPartitionerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableKeyGroupPartitionerTest.java index 177a604ba155b..745719a445b75 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableKeyGroupPartitionerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableKeyGroupPartitionerTest.java @@ -18,55 +18,85 @@ package org.apache.flink.runtime.state.heap; -import org.apache.flink.runtime.state.AbstractKeyGroupPartitioner; -import org.apache.flink.runtime.state.AbstractKeyGroupPartitionerTestBase; +import org.apache.flink.runtime.state.KeyGroupPartitioner; +import org.apache.flink.runtime.state.KeyGroupPartitionerTestBase; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.StateTableEntry; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.util.Random; +import java.util.Set; /** * Test for {@link org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.StateTableKeyGroupPartitioner}. */ public class StateTableKeyGroupPartitionerTest extends - AbstractKeyGroupPartitionerTestBase> { + KeyGroupPartitionerTestBase> { + + public StateTableKeyGroupPartitionerTest() { + super(random -> generateElement(random, null), StateTableEntry::getKey); + } - @Override @SuppressWarnings("unchecked") - protected StateTableEntry[] generateTestData( + @Override + protected StateTableEntry[] generateTestInput( Random random, - int numElementsToGenerate) { + int numElementsToGenerate, + Set> allElementsIdentitySet) { // we let the array size differ a bit from the test size to check this works final int arraySize = numElementsToGenerate > 1 ? numElementsToGenerate + 5 : numElementsToGenerate; final StateTableEntry[] data = new StateTableEntry[arraySize]; - for (int i = 0; i < numElementsToGenerate; ++i) { - Integer key = random.nextInt() & Integer.MAX_VALUE; - data[i] = new CopyOnWriteStateTable.StateTableEntry<>( - key, - VoidNamespace.INSTANCE, - 42, - key.hashCode(), - null, - 0, - 0); + + while (numElementsToGenerate > 0) { + + final int generateAsChainCount = Math.min(1 + random.nextInt(3) , numElementsToGenerate); + + StateTableEntry element = null; + for (int i = 0; i < generateAsChainCount; ++i) { + element = generateElement(random, element); + allElementsIdentitySet. add(element); + } + + data[data.length - numElementsToGenerate + random.nextInt(generateAsChainCount)] = element; + numElementsToGenerate -= generateAsChainCount; } return data; } @Override - protected AbstractKeyGroupPartitioner> createPartitioner( + protected KeyGroupPartitioner> createPartitioner( StateTableEntry[] data, int numElements, KeyGroupRange keyGroupRange, - int totalKeyGroups) { + int totalKeyGroups, + KeyGroupPartitioner.ElementWriterFunction< + StateTableEntry> elementWriterFunction) { return new CopyOnWriteStateTableSnapshot.StateTableKeyGroupPartitioner<>( data, numElements, keyGroupRange, - totalKeyGroups); + totalKeyGroups, + elementWriterFunction); + } + + private static StateTableEntry generateElement( + @Nonnull Random random, + @Nullable StateTableEntry next) { + + Integer generatedKey = random.nextInt() & Integer.MAX_VALUE; + return new StateTableEntry<>( + generatedKey, + VoidNamespace.INSTANCE, + random.nextInt(), + generatedKey.hashCode(), + next, + 0, + 0); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java index e512a091a5193..6eb40578ecb00 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java @@ -18,8 +18,7 @@ package org.apache.flink.streaming.api.operators; -import org.apache.flink.runtime.state.AbstractKeyGroupPartitioner; -import org.apache.flink.runtime.state.KeyGroupPartitionedSnapshotImpl; +import org.apache.flink.runtime.state.KeyGroupPartitioner; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.StateSnapshot; @@ -28,7 +27,7 @@ import javax.annotation.Nullable; /** - * This class represents the snapshot of a {@link InternalTimerHeap}. + * This class represents the snapshot of an {@link InternalTimerHeap}. * * @param type of key. * @param type of namespace. @@ -67,16 +66,26 @@ public class InternalTimerHeapSnapshot implements StateSnapshot { this.totalKeyGroups = totalKeyGroups; } + @SuppressWarnings("unchecked") @Nonnull @Override public KeyGroupPartitionedSnapshot partitionByKeyGroup() { if (partitionedSnapshot == null) { - TimerPartitioner timerPartitioner = - new TimerPartitioner<>(timerHeapArrayCopy, keyGroupRange, totalKeyGroups); - partitionedSnapshot = new KeyGroupPartitionedSnapshotImpl<>( - timerPartitioner.partitionByKeyGroup(), - timerSerializer::serialize); + + TimerHeapInternalTimer[] partitioningOutput = new TimerHeapInternalTimer[timerHeapArrayCopy.length]; + + KeyGroupPartitioner> timerPartitioner = + new KeyGroupPartitioner<>( + timerHeapArrayCopy, + timerHeapArrayCopy.length, + partitioningOutput, + keyGroupRange, + totalKeyGroups, + TimerHeapInternalTimer::getKey, + timerSerializer::serialize); + + partitionedSnapshot = timerPartitioner.partitionByKeyGroup(); } return partitionedSnapshot; @@ -85,47 +94,4 @@ public KeyGroupPartitionedSnapshot partitionByKeyGroup() { @Override public void release() { } - - /** - * Implementation of {@link AbstractKeyGroupPartitioner} for {@link TimerHeapInternalTimer}. - * - * @param type of key. - * @param type of namespace. - */ - static final class TimerPartitioner extends AbstractKeyGroupPartitioner> { - - @Nonnull - private final TimerHeapInternalTimer[] timersIn; - - @Nonnull - private final TimerHeapInternalTimer[] timersOut; - - @SuppressWarnings("unchecked") - TimerPartitioner( - @Nonnull TimerHeapInternalTimer[] timers, - @Nonnull KeyGroupRange keyGroupRange, - @Nonnegative int totalKeyGroups) { - super(timers.length, keyGroupRange, totalKeyGroups); - this.timersIn = timers; - this.timersOut = new TimerHeapInternalTimer[timers.length]; - } - - @Nonnull - @Override - protected Object extractKeyFromElement(TimerHeapInternalTimer element) { - return element.getKey(); - } - - @Nonnull - @Override - protected TimerHeapInternalTimer[] getPartitioningInput() { - return timersIn; - } - - @Nonnull - @Override - protected TimerHeapInternalTimer[] getPartitioningOutput() { - return timersOut; - } - } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyGroupPartitionerForTimersTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyGroupPartitionerForTimersTest.java new file mode 100644 index 0000000000000..8ad71155dd71d --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyGroupPartitionerForTimersTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.state.KeyGroupPartitioner; +import org.apache.flink.runtime.state.KeyGroupPartitionerTestBase; +import org.apache.flink.runtime.state.VoidNamespace; + +/** + * Test of {@link KeyGroupPartitioner} for timers. + */ +public class KeyGroupPartitionerForTimersTest + extends KeyGroupPartitionerTestBase> { + + public KeyGroupPartitionerForTimersTest() { + super( + (random -> new TimerHeapInternalTimer<>(42L, random.nextInt() & Integer.MAX_VALUE, VoidNamespace.INSTANCE)), + TimerHeapInternalTimer::getKey); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerPartitionerTest.java deleted file mode 100644 index 5caf3179250ab..0000000000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerPartitionerTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators; - -import org.apache.flink.runtime.state.AbstractKeyGroupPartitioner; -import org.apache.flink.runtime.state.AbstractKeyGroupPartitionerTestBase; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.VoidNamespace; - -import java.util.Random; - -/** - * Test for {@link org.apache.flink.streaming.api.operators.InternalTimerHeapSnapshot.TimerPartitioner}. - */ -public class TimerPartitionerTest - extends AbstractKeyGroupPartitionerTestBase> { - - @Override - @SuppressWarnings("unchecked") - protected TimerHeapInternalTimer[] generateTestData( - Random random, - int numElementsToGenerate) { - final TimerHeapInternalTimer[] data = new TimerHeapInternalTimer[numElementsToGenerate]; - for (int i = 0; i < numElementsToGenerate; ++i) { - Integer key = random.nextInt() & Integer.MAX_VALUE; - data[i] = new TimerHeapInternalTimer<>(42L, key, VoidNamespace.INSTANCE); - } - return data; - } - - @Override - protected AbstractKeyGroupPartitioner> createPartitioner( - TimerHeapInternalTimer[] data, - int numElements, - KeyGroupRange keyGroupRange, - int totalKeyGroups) { - return new InternalTimerHeapSnapshot.TimerPartitioner<>(data, keyGroupRange, totalKeyGroups); - } -} From d744742b1d43d65cb471188039fcb6ffb4a3d3ea Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Fri, 15 Jun 2018 17:12:43 +0200 Subject: [PATCH 6/8] Introduce hard limit for CopyOnWriteStateTable size. --- .../state/heap/CopyOnWriteStateTable.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java index d28ed4673ace9..a57f0c137d67b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java @@ -105,6 +105,9 @@ public class CopyOnWriteStateTable extends StateTable implemen */ private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class); + /** Maximum save array size to allocate in a JVM. */ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + /** * Min capacity (other than zero) for a {@link CopyOnWriteStateTable}. Must be a power of two * greater than 1 (and less than 1 << 30). @@ -630,13 +633,23 @@ StateTableEntry[] snapshotTableArrays() { */ private StateTableEntry[] makeTable(int newCapacity) { - if (MAXIMUM_CAPACITY == newCapacity) { - LOG.warn("Maximum capacity of 2^30 in StateTable reached. Cannot increase hash table size. This can lead " + - "to more collisions and lower performance. Please consider scaling-out your job or using a " + + if (newCapacity < MAXIMUM_CAPACITY) { + threshold = (newCapacity >> 1) + (newCapacity >> 2); // 3/4 capacity + } else { + if (size() >= MAX_ARRAY_SIZE) { + + throw new IllegalStateException("Maximum capacity of CopyOnWriteStateTable is reached and the job " + + "cannot continue. Please consider scaling-out your job or using a different keyed state backend " + + "implementation!"); + } else { + + LOG.warn("Maximum capacity of 2^30 in StateTable reached. Cannot increase hash table size. This can " + + "lead to more collisions and lower performance. Please consider scaling-out your job or using a " + "different keyed state backend implementation!"); + threshold = MAX_ARRAY_SIZE; + } } - threshold = (newCapacity >> 1) + (newCapacity >> 2); // 3/4 capacity @SuppressWarnings("unchecked") StateTableEntry[] newTable = (StateTableEntry[]) new StateTableEntry[newCapacity]; return newTable; From 671a0240872c1a5644b7b5430c16fde1bddbf50d Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Fri, 15 Jun 2018 17:16:34 +0200 Subject: [PATCH 7/8] Add annotation description in KeyGroupPartitioner. --- .../org/apache/flink/runtime/state/KeyGroupPartitioner.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java index 9d1065b13b629..673a0ef13140d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java @@ -32,6 +32,8 @@ * with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a * single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but * better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity. + * + * @param type of the partitioned elements. */ public class KeyGroupPartitioner { From 579ba404f94ac565625665777bb0e26ba520872c Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Fri, 15 Jun 2018 17:18:39 +0200 Subject: [PATCH 8/8] Minor fixup --- .../apache/flink/runtime/state/heap/CopyOnWriteStateTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java index a57f0c137d67b..bb90c37d3c33d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java @@ -636,7 +636,7 @@ private StateTableEntry[] makeTable(int newCapacity) { if (newCapacity < MAXIMUM_CAPACITY) { threshold = (newCapacity >> 1) + (newCapacity >> 2); // 3/4 capacity } else { - if (size() >= MAX_ARRAY_SIZE) { + if (size() > MAX_ARRAY_SIZE) { throw new IllegalStateException("Maximum capacity of CopyOnWriteStateTable is reached and the job " + "cannot continue. Please consider scaling-out your job or using a different keyed state backend " +