Skip to content

Commit

Permalink
[FLINK-9487][state] Prepare InternalTimerHeap for asynchronous snapshots
Browse files Browse the repository at this point in the history
This closes #6159.
  • Loading branch information
StefanRRichter committed Jun 15, 2018
1 parent 4a0dc82 commit 7e0eafa
Show file tree
Hide file tree
Showing 15 changed files with 921 additions and 139 deletions.
@@ -0,0 +1,297 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;

/**
* Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
* with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
* single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
* better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
*
* @param <T> type of the partitioned elements.
*/
public class KeyGroupPartitioner<T> {

/**
* The input data for the partitioning. All elements to consider must be densely in the index interval
* [0, {@link #numberOfElements}[, without null values.
*/
@Nonnull
protected final T[] partitioningSource;

/**
* The output array for the partitioning. The size must be {@link #numberOfElements} (or bigger).
*/
@Nonnull
protected final T[] partitioningDestination;

/** Total number of input elements. */
@Nonnegative
protected final int numberOfElements;

/** The total number of key-groups in the job. */
@Nonnegative
protected final int totalKeyGroups;

/** The key-group range for the input data, covered in this partitioning. */
@Nonnull
protected final KeyGroupRange keyGroupRange;

/**
* This bookkeeping array is used to count the elements in each key-group. In a second step, it is transformed into
* a histogram by accumulation.
*/
@Nonnull
protected final int[] counterHistogram;

/**
* This is a helper array that caches the key-group for each element, so we do not have to compute them twice.
*/
@Nonnull
protected final int[] elementKeyGroups;

/** Cached value of keyGroupRange#firstKeyGroup. */
@Nonnegative
protected final int firstKeyGroup;

/** Function to extract the key from a given element. */
@Nonnull
protected final KeyExtractorFunction<T> keyExtractorFunction;

/** Function to write an element to a {@link DataOutputView}. */
@Nonnull
protected final ElementWriterFunction<T> elementWriterFunction;

/** Cached result. */
@Nullable
protected StateSnapshot.KeyGroupPartitionedSnapshot computedResult;

/**
* Creates a new {@link KeyGroupPartitioner}.
*
* @param partitioningSource the input for the partitioning. All elements must be densely packed in the index
* interval [0, {@link #numberOfElements}[, without null values.
* @param numberOfElements the number of elements to consider from the input, starting at input index 0.
* @param partitioningDestination the output of the partitioning. Must have capacity of at least numberOfElements.
* @param keyGroupRange the key-group range of the data that will be partitioned by this instance.
* @param totalKeyGroups the total number of key groups in the job.
* @param keyExtractorFunction this function extracts the partition key from an element.
*/
public KeyGroupPartitioner(
@Nonnull T[] partitioningSource,
@Nonnegative int numberOfElements,
@Nonnull T[] partitioningDestination,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalKeyGroups,
@Nonnull KeyExtractorFunction<T> keyExtractorFunction,
@Nonnull ElementWriterFunction<T> elementWriterFunction) {

Preconditions.checkState(partitioningSource != partitioningDestination);
Preconditions.checkState(partitioningSource.length >= numberOfElements);
Preconditions.checkState(partitioningDestination.length >= numberOfElements);

this.partitioningSource = partitioningSource;
this.partitioningDestination = partitioningDestination;
this.numberOfElements = numberOfElements;
this.keyGroupRange = keyGroupRange;
this.totalKeyGroups = totalKeyGroups;
this.keyExtractorFunction = keyExtractorFunction;
this.elementWriterFunction = elementWriterFunction;
this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
this.elementKeyGroups = new int[numberOfElements];
this.counterHistogram = new int[keyGroupRange.getNumberOfKeyGroups()];
this.computedResult = null;
}

/**
* Partitions the data into key-groups and returns the result via {@link PartitioningResult}.
*/
public StateSnapshot.KeyGroupPartitionedSnapshot partitionByKeyGroup() {
if (computedResult == null) {
reportAllElementKeyGroups();
buildHistogramByAccumulatingCounts();
executePartitioning();
}
return computedResult;
}

/**
* This method iterates over the input data and reports the key-group for each element.
*/
protected void reportAllElementKeyGroups() {

Preconditions.checkState(partitioningSource.length >= numberOfElements);

for (int i = 0; i < numberOfElements; ++i) {
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(
keyExtractorFunction.extractKeyFromElement(partitioningSource[i]), totalKeyGroups);
reportKeyGroupOfElementAtIndex(i, keyGroup);
}
}

/**
* This method reports in the bookkeeping data that the element at the given index belongs to the given key-group.
*/
protected void reportKeyGroupOfElementAtIndex(int index, int keyGroup) {
final int keyGroupIndex = keyGroup - firstKeyGroup;
elementKeyGroups[index] = keyGroupIndex;
++counterHistogram[keyGroupIndex];
}

/**
* This method creates a histogram from the counts per key-group in {@link #counterHistogram}.
*/
private void buildHistogramByAccumulatingCounts() {
int sum = 0;
for (int i = 0; i < counterHistogram.length; ++i) {
int currentSlotValue = counterHistogram[i];
counterHistogram[i] = sum;
sum += currentSlotValue;
}

// sanity check that the sum matches the expected number of elements.
Preconditions.checkState(sum == numberOfElements);
}

private void executePartitioning() {

// We repartition the entries by their pre-computed key-groups, using the histogram values as write indexes
for (int inIdx = 0; inIdx < numberOfElements; ++inIdx) {
int effectiveKgIdx = elementKeyGroups[inIdx];
int outIdx = counterHistogram[effectiveKgIdx]++;
partitioningDestination[outIdx] = partitioningSource[inIdx];
}

this.computedResult = new PartitioningResult<>(
elementWriterFunction,
firstKeyGroup,
counterHistogram,
partitioningDestination);
}

/**
* This represents the result of key-group partitioning. The data in {@link #partitionedElements} is partitioned
* w.r.t. {@link KeyGroupPartitioner#keyGroupRange}.
*/
public static class PartitioningResult<T> implements StateSnapshot.KeyGroupPartitionedSnapshot {

/**
* Function to write one element to a {@link DataOutputView}.
*/
@Nonnull
private final ElementWriterFunction<T> elementWriterFunction;

/**
* The exclusive-end-offsets for all key-groups of the covered range for the partitioning. Exclusive-end-offset
* for key-group n is under keyGroupOffsets[n - firstKeyGroup].
*/
@Nonnull
private final int[] keyGroupOffsets;

/**
* Array with elements that are partitioned w.r.t. the covered key-group range. The start offset for each
* key-group is in {@link #keyGroupOffsets}.
*/
@Nonnull
private final T[] partitionedElements;

/**
* The first key-group of the range covered in the partitioning.
*/
@Nonnegative
private final int firstKeyGroup;

PartitioningResult(
@Nonnull ElementWriterFunction<T> elementWriterFunction,
@Nonnegative int firstKeyGroup,
@Nonnull int[] keyGroupEndOffsets,
@Nonnull T[] partitionedElements) {
this.elementWriterFunction = elementWriterFunction;
this.firstKeyGroup = firstKeyGroup;
this.keyGroupOffsets = keyGroupEndOffsets;
this.partitionedElements = partitionedElements;
}

@Nonnegative
private int getKeyGroupStartOffsetInclusive(int keyGroup) {
int idx = keyGroup - firstKeyGroup - 1;
return idx < 0 ? 0 : keyGroupOffsets[idx];
}

@Nonnegative
private int getKeyGroupEndOffsetExclusive(int keyGroup) {
return keyGroupOffsets[keyGroup - firstKeyGroup];
}

@Override
public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {

int startOffset = getKeyGroupStartOffsetInclusive(keyGroupId);
int endOffset = getKeyGroupEndOffsetExclusive(keyGroupId);

// write number of mappings in key-group
dov.writeInt(endOffset - startOffset);

// write mappings
for (int i = startOffset; i < endOffset; ++i) {
elementWriterFunction.writeElement(partitionedElements[i], dov);
}
}
}

/**
* @param <T> type of the element from which we extract the key.
*/
@FunctionalInterface
public interface KeyExtractorFunction<T> {

/**
* Returns the key for the given element by which the key-group can be computed.
*/
@Nonnull
Object extractKeyFromElement(@Nonnull T element);
}

/**
* This functional interface defines how one element is written to a {@link DataOutputView}.
*
* @param <T> type of the written elements.
*/
@FunctionalInterface
public interface ElementWriterFunction<T> {

/**
* This method defines how to write a single element to the output.
*
* @param element the element to be written.
* @param dov the output view to write the element.
* @throws IOException on write-related problems.
*/
void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) throws IOException;
}
}
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import org.apache.flink.core.memory.DataOutputView;

import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;

import java.io.IOException;

/**
* General interface for state snapshots that should be written partitioned by key-groups.
* All snapshots should be released after usage. This interface outlines the asynchronous snapshot life-cycle, which
* typically looks as follows. In the synchronous part of a checkpoint, an instance of {@link StateSnapshot} is produced
* for a state and captures the state at this point in time. Then, in the asynchronous part of the checkpoint, the user
* calls {@link #partitionByKeyGroup()} to ensure that the snapshot is partitioned into key-groups. For state that is
* already partitioned, this can be a NOP. The returned {@link KeyGroupPartitionedSnapshot} can be used by the caller
* to write the state by key-group. As a last step, when the state is completely written, the user calls
* {@link #release()}.
*/
public interface StateSnapshot {

/**
* This method partitions the snapshot by key-group and then returns a {@link KeyGroupPartitionedSnapshot}.
*/
@Nonnull
KeyGroupPartitionedSnapshot partitionByKeyGroup();

/**
* Release the snapshot. All snapshots should be released when they are no longer used because some implementation
* can only release resources after a release. Produced {@link KeyGroupPartitionedSnapshot} should no longer be used
* after calling this method.
*/
void release();

/**
* Interface for writing a snapshot after it is partitioned into key-groups.
*/
interface KeyGroupPartitionedSnapshot {
/**
* Writes the data for the specified key-group to the output. You must call {@link #partitionByKeyGroup()} once
* before first calling this method.
*
* @param dov the output.
* @param keyGroupId the key-group to write.
* @throws IOException on write-related problems.
*/
void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, @Nonnegative int keyGroupId) throws IOException;
}
}
Expand Up @@ -19,14 +19,15 @@
package org.apache.flink.runtime.state.heap; package org.apache.flink.runtime.state.heap;


import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;


/** /**
* Abstract class to encapsulate the logic to take snapshots of {@link StateTable} implementations and also defines how * Abstract base class for snapshots of a {@link StateTable}. Offers a way to serialize the snapshot (by key-group).
* the snapshot is written during the serialization phase of checkpointing. * All snapshots should be released after usage.
*/ */
@Internal @Internal
abstract class AbstractStateTableSnapshot<K, N, S, T extends StateTable<K, N, S>> implements StateTableSnapshot { abstract class AbstractStateTableSnapshot<K, N, S, T extends StateTable<K, N, S>> implements StateSnapshot {


/** /**
* The {@link StateTable} from which this snapshot was created. * The {@link StateTable} from which this snapshot was created.
Expand All @@ -48,4 +49,4 @@ abstract class AbstractStateTableSnapshot<K, N, S, T extends StateTable<K, N, S>
@Override @Override
public void release() { public void release() {
} }
} }

0 comments on commit 7e0eafa

Please sign in to comment.