Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions flink-core/src/main/java/org/apache/flink/util/MathUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
* Collection of simple mathematical routines.
*/
public final class MathUtils {

/**
* Computes the logarithm of the given value to the base of 2, rounded down. It corresponds to the
* position of the highest non-zero bit. The position is counted, starting with 0 from the least
* significant bit to the most significant bit. For example, <code>log2floor(16) = 4</code>, and
* <code>log2floor(10) = 3</code>.
*
*
* @param value The value to compute the logarithm for.
* @return The logarithm (rounded down) to the base of 2.
* @throws ArithmeticException Thrown, if the given value is zero.
Expand All @@ -40,11 +40,11 @@ public static int log2floor(int value) throws ArithmeticException {

return 31 - Integer.numberOfLeadingZeros(value);
}

/**
* Computes the logarithm of the given value to the base of 2. This method throws an error,
* if the given argument is not a power of 2.
*
*
* @param value The value to compute the logarithm for.
* @return The logarithm to the base of 2.
* @throws ArithmeticException Thrown, if the given value is zero.
Expand All @@ -59,25 +59,25 @@ public static int log2strict(int value) throws ArithmeticException, IllegalArgum
}
return 31 - Integer.numberOfLeadingZeros(value);
}

/**
* Decrements the given number down to the closest power of two. If the argument is a
* power of two, it remains unchanged.
*
*
* @param value The value to round down.
* @return The closest value that is a power of two and less or equal than the given value.
*/
public static int roundDownToPowerOf2(int value) {
return Integer.highestOneBit(value);
}

/**
* Casts the given value to a 32 bit integer, if it can be safely done. If the cast would change the numeric
* value, this method raises an exception.
* <p>
* This method is a protection in places where one expects to be able to safely case, but where unexpected
* situations could make the cast unsafe and would cause hidden problems that are hard to track down.
*
*
* @param value The value to be cast to an integer.
* @return The given value as an integer.
* @see Math#toIntExact(long)
Expand Down Expand Up @@ -172,8 +172,37 @@ public static int roundUpToPowerOfTwo(int x) {
return x + 1;
}

/**
* Pseudo-randomly maps a long (64-bit) to an integer (32-bit) using some bit-mixing for better distribution.
*
* @param in the long (64-bit)input.
* @return the bit-mixed int (32-bit) output
*/
public static int longToIntWithBitMixing(long in) {
in = (in ^ (in >>> 30)) * 0xbf58476d1ce4e5b9L;
in = (in ^ (in >>> 27)) * 0x94d049bb133111ebL;
in = in ^ (in >>> 31);
return (int) in;
}

/**
* Bit-mixing for pseudo-randomization of integers (e.g., to guard against bad hash functions). Implementation is
* from Murmur's 32 bit finalizer.
*
* @param in the input value
* @return the bit-mixed output value
*/
public static int bitMix(int in) {
in ^= in >>> 16;
in *= 0x85ebca6b;
in ^= in >>> 13;
in *= 0xc2b2ae35;
in ^= in >>> 16;
return in;
}

// ============================================================================================

/**
* Prevent Instantiation through private constructor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.heap.async.AbstractHeapMergingState;
import org.apache.flink.runtime.state.heap.async.InternalKeyContext;
import org.apache.flink.util.Preconditions;

import java.io.Closeable;
Expand All @@ -51,7 +53,7 @@
* @param <K> Type of the key by which state is keyed.
*/
public abstract class AbstractKeyedStateBackend<K>
implements KeyedStateBackend<K>, Snapshotable<KeyGroupsStateHandle>, Closeable {
implements KeyedStateBackend<K>, Snapshotable<KeyGroupsStateHandle>, Closeable, InternalKeyContext<K> {

/** {@link TypeSerializer} for our key. */
protected final TypeSerializer<K> keySerializer;
Expand Down Expand Up @@ -205,6 +207,7 @@ public int getNumberOfKeyGroups() {
/**
* @see KeyedStateBackend
*/
@Override
public KeyGroupRange getKeyGroupRange() {
return keyGroupRange;
}
Expand Down Expand Up @@ -293,10 +296,16 @@ public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T
@Override
@SuppressWarnings("unchecked,rawtypes")
public <N, S extends MergingState<?, ?>> void mergePartitionedStates(final N target, Collection<N> sources, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
if (stateDescriptor instanceof ReducingStateDescriptor) {

State stateRef = getPartitionedState(target, namespaceSerializer, stateDescriptor);
if (stateRef instanceof AbstractHeapMergingState) {

((AbstractHeapMergingState) stateRef).mergeNamespaces(target, sources);
} else if (stateDescriptor instanceof ReducingStateDescriptor) {

ReducingStateDescriptor reducingStateDescriptor = (ReducingStateDescriptor) stateDescriptor;
ReducingState state = (ReducingState) stateRef;
ReduceFunction reduceFn = reducingStateDescriptor.getReduceFunction();
ReducingState state = (ReducingState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
KvState kvState = (KvState) state;
Object result = null;
for (N source: sources) {
Expand All @@ -314,7 +323,8 @@ public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T
state.add(result);
}
} else if (stateDescriptor instanceof ListStateDescriptor) {
ListState<Object> state = (ListState) getPartitionedState(target, namespaceSerializer, stateDescriptor);

ListState<Object> state = (ListState) stateRef;
KvState kvState = (KvState) state;
List<Object> result = new ArrayList<>();
for (N source: sources) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import org.apache.flink.annotation.Internal;

/**
* Interface for a binary function that is used for push-down of state transformation into state backends. The
* function takes as inputs the old state and an element. From those inputs, the function computes the new state.
*
* @param <S> type of the previous state that is the bases for the computation of the new state.
* @param <T> type of the element value that is used to compute the change of state.
*/
@Internal
public interface StateTransformationFunction<S, T> {

/**
* Binary function that applies a given value to the given old state to compute the new state.
*
* @param previousState the previous state that is the basis for the transformation.
* @param value the value that the implementation applies to the old state to obtain the new state.
* @return the new state, computed by applying the given value on the given old state.
* @throws Exception if something goes wrong in applying the transformation function.
*/
S apply(S previousState, T value) throws Exception;
}
Loading