Skip to content

Commit

Permalink
[FLINK-8360][checkpointing] Implement state storage for local recover…
Browse files Browse the repository at this point in the history
…y and integrate with task lifecycle
  • Loading branch information
StefanRRichter committed Feb 25, 2018
1 parent ea0d16d commit df3e6bb
Show file tree
Hide file tree
Showing 159 changed files with 8,768 additions and 1,834 deletions.
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
Expand Down Expand Up @@ -389,34 +390,34 @@ public void testScaleUpAfterScalingDown() throws Exception {
final int parallelism3 = 3; final int parallelism3 = 3;
final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3)); final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3));


List<OperatorStateHandle> operatorStateHandles = repartitionAndExecute( List<OperatorStateHandle> operatorSubtaskState = repartitionAndExecute(
topic, topic,
Collections.emptyList(), Collections.emptyList(),
parallelism1, parallelism1,
maxParallelism, maxParallelism,
IntStream.range(0, parallelism1).boxed().iterator()); IntStream.range(0, parallelism1).boxed().iterator());


operatorStateHandles = repartitionAndExecute( operatorSubtaskState = repartitionAndExecute(
topic, topic,
operatorStateHandles, operatorSubtaskState,
parallelism2, parallelism2,
maxParallelism, maxParallelism,
IntStream.range(parallelism1, parallelism1 + parallelism2).boxed().iterator()); IntStream.range(parallelism1, parallelism1 + parallelism2).boxed().iterator());


operatorStateHandles = repartitionAndExecute( operatorSubtaskState = repartitionAndExecute(
topic, topic,
operatorStateHandles, operatorSubtaskState,
parallelism3, parallelism3,
maxParallelism, maxParallelism,
IntStream.range(parallelism1 + parallelism2, parallelism1 + parallelism2 + parallelism3).boxed().iterator()); IntStream.range(parallelism1 + parallelism2, parallelism1 + parallelism2 + parallelism3).boxed().iterator());


// After each previous repartitionAndExecute call, we are left with some lingering transactions, that would // After each previous repartitionAndExecute call, we are left with some lingering transactions, that would
// not allow us to read all committed messages from the topic. Thus we initialize operators from // not allow us to read all committed messages from the topic. Thus we initialize operators from
// operatorStateHandles once more, but without any new data. This should terminate all ongoing transactions. // OperatorSubtaskState once more, but without any new data. This should terminate all ongoing transactions.


operatorStateHandles = repartitionAndExecute( operatorSubtaskState = repartitionAndExecute(
topic, topic,
operatorStateHandles, operatorSubtaskState,
1, 1,
maxParallelism, maxParallelism,
Collections.emptyIterator()); Collections.emptyIterator());
Expand Down Expand Up @@ -448,10 +449,10 @@ private List<OperatorStateHandle> repartitionAndExecute(
testHarness.setup(); testHarness.setup();


testHarness.initializeState(new OperatorSubtaskState( testHarness.initializeState(new OperatorSubtaskState(
inputStates, new StateObjectCollection<>(inputStates),
Collections.emptyList(), StateObjectCollection.empty(),
Collections.emptyList(), StateObjectCollection.empty(),
Collections.emptyList())); StateObjectCollection.empty()));
testHarness.open(); testHarness.open();


if (inputData.hasNext()) { if (inputData.hasNext()) {
Expand All @@ -460,9 +461,9 @@ private List<OperatorStateHandle> repartitionAndExecute(
OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);


outputStates.addAll(snapshot.getManagedOperatorState()); outputStates.addAll(snapshot.getManagedOperatorState());
checkState(snapshot.getRawOperatorState() == null, "Unexpected raw operator state"); checkState(snapshot.getRawOperatorState().isEmpty(), "Unexpected raw operator state");
checkState(snapshot.getManagedKeyedState() == null, "Unexpected managed keyed state"); checkState(snapshot.getManagedKeyedState().isEmpty(), "Unexpected managed keyed state");
checkState(snapshot.getRawKeyedState() == null, "Unexpected raw keyed state"); checkState(snapshot.getRawKeyedState().isEmpty(), "Unexpected raw keyed state");


for (int i = 1; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) { for (int i = 1; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) {
testHarness.processElement(-nextValue, 0); testHarness.processElement(-nextValue, 0);
Expand Down
Expand Up @@ -66,6 +66,20 @@ public class CheckpointingOptions {
" complete checkpoint state. Some state backends may not support incremental checkpoints and ignore" + " complete checkpoint state. Some state backends may not support incremental checkpoints and ignore" +
" this option."); " this option.");


/**
* This option configures local recovery for this state backend.
*/
public static final ConfigOption<String> LOCAL_RECOVERY = ConfigOptions
.key("state.backend.local-recovery")
.defaultValue("DISABLED");

/**
* The config parameter defining the root directories for storing file-based state for local recovery.
*/
public static final ConfigOption<String> LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS = ConfigOptions
.key("taskmanager.state.local.root-dirs")
.noDefaultValue();

// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Options specific to the file-system-based state backends // Options specific to the file-system-based state backends
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down
Expand Up @@ -18,24 +18,48 @@


package org.apache.flink.configuration; package org.apache.flink.configuration;


import javax.annotation.Nonnull;

import java.io.File; import java.io.File;


/** /**
* Utility class for {@link Configuration} related helper functions. * Utility class for {@link Configuration} related helper functions.
*/ */
public class ConfigurationUtils { public class ConfigurationUtils {


private static final String[] EMPTY = new String[0];

/** /**
* Extracts the task manager directories for temporary files as defined by * Extracts the task manager directories for temporary files as defined by
* {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}. * {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}.
* *
* @param configuration configuration object * @param configuration configuration object
* @return array of configured directories (in order) * @return array of configured directories (in order)
*/ */
@Nonnull
public static String[] parseTempDirectories(Configuration configuration) { public static String[] parseTempDirectories(Configuration configuration) {
return configuration.getString(CoreOptions.TMP_DIRS).split(",|" + File.pathSeparator); return splitPaths(configuration.getString(CoreOptions.TMP_DIRS));
}

/**
* Extracts the local state directories as defined by
* {@link CheckpointingOptions#LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS}.
*
* @param configuration configuration object
* @return array of configured directories (in order)
*/
@Nonnull
public static String[] parseLocalStateDirectories(Configuration configuration) {
String configValue = configuration.getString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS, "");
return splitPaths(configValue);
}

@Nonnull
private static String[] splitPaths(@Nonnull String separatedPaths) {
return separatedPaths.length() > 0 ? separatedPaths.split(",|" + File.pathSeparator) : EMPTY;
} }


// Make sure that we cannot instantiate this class // Make sure that we cannot instantiate this class
private ConfigurationUtils() {} private ConfigurationUtils() {
}
} }
Expand Up @@ -211,10 +211,12 @@ private boolean delete(final File f) throws IOException {


if (f.isDirectory()) { if (f.isDirectory()) {
final File[] files = f.listFiles(); final File[] files = f.listFiles();
for (File file : files) { if (files != null) {
final boolean del = delete(file); for (File file : files) {
if (!del) { final boolean del = delete(file);
return false; if (!del) {
return false;
}
} }
} }
} else { } else {
Expand Down
36 changes: 36 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/Disposable.java
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

/**
* Interface for classes that can be disposed, i.e. that have a dedicated lifecycle step to "destroy" the object. On
* reason for this is for example to release native resources. From this point, the interface fulfills a similar purpose
* as the {@link java.io.Closeable} interface, but sometimes both should be represented as isolated, independent
* lifecycle steps.
*/
public interface Disposable {

/**
* Disposes the object and releases all resources. After calling this method, calling any methods on the
* object may result in undefined behavior.
*
* @throws Exception if something goes wrong during disposal.
*/
void dispose() throws Exception;
}
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

import org.mockito.Mockito;
import org.mockito.internal.util.MockUtil;

import java.lang.reflect.Array;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;

/**
* Helper class with a method that attempts to automatically test method forwarding between a delegate and a wrapper.
*/
public class MethodForwardingTestUtil {

/**
* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper
* class is a subtype of the delegate. This ignores methods that are inherited from Object.
*
* @param delegateClass the class for the delegate.
* @param wrapperFactory factory that produces a wrapper from a delegate.
* @param <D> type of the delegate
* @param <W> type of the wrapper
*/
public static <D, W> void testMethodForwarding(
Class<D> delegateClass,
Function<D, W> wrapperFactory)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
testMethodForwarding(delegateClass, wrapperFactory, () -> spy(delegateClass), Collections.emptySet());
}

/**
* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper
* class is a subtype of the delegate. This ignores methods that are inherited from Object.
*
* @param delegateClass the class for the delegate.
* @param wrapperFactory factory that produces a wrapper from a delegate.
* @param delegateObjectSupplier supplier for the delegate object passed to the wrapper factory.
* @param <D> type of the delegate
* @param <W> type of the wrapper
* @param <I> type of the object created as delegate, is a subtype of D.
*/
public static <D, W, I extends D> void testMethodForwarding(
Class<D> delegateClass,
Function<I, W> wrapperFactory,
Supplier<I> delegateObjectSupplier)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
testMethodForwarding(delegateClass, wrapperFactory, delegateObjectSupplier, Collections.emptySet());
}

/**
* This is a best effort automatic test for method forwarding between a delegate and its wrapper, where the wrapper
* class is a subtype of the delegate. Methods can be remapped in case that the implementation does not call the
* original method. Remapping to null skips the method. This ignores methods that are inherited from Object.
*
* @param delegateClass the class for the delegate.
* @param wrapperFactory factory that produces a wrapper from a delegate.
* @param delegateObjectSupplier supplier for the delegate object passed to the wrapper factory.
* @param skipMethodSet set of methods to ignore.
* @param <D> type of the delegate
* @param <W> type of the wrapper
* @param <I> type of the object created as delegate, is a subtype of D.
*/
public static <D, W, I extends D> void testMethodForwarding(
Class<D> delegateClass,
Function<I, W> wrapperFactory,
Supplier<I> delegateObjectSupplier,
Set<Method> skipMethodSet) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {

Preconditions.checkNotNull(delegateClass);
Preconditions.checkNotNull(wrapperFactory);
Preconditions.checkNotNull(skipMethodSet);

I delegate = delegateObjectSupplier.get();

//check if we need to wrap the delegate object as a spy, or if it is already testable with Mockito.
MockUtil mockUtil = new MockUtil();
if (!mockUtil.isSpy(delegate) || !mockUtil.isMock(delegate)) {
delegate = spy(delegate);
}

W wrapper = wrapperFactory.apply(delegate);

// ensure that wrapper is a subtype of delegate
Preconditions.checkArgument(delegateClass.isAssignableFrom(wrapper.getClass()));

for (Method delegateMethod : delegateClass.getMethods()) {

if (checkSkipMethodForwardCheck(delegateMethod, skipMethodSet)) {
continue;
}

// find the correct method to substitute the bridge for erased generic types.
// if this doesn't work, the user need to exclude the method and write an additional test.
Method wrapperMethod = wrapper.getClass().getMethod(
delegateMethod.getName(),
delegateMethod.getParameterTypes());

// things get a bit fuzzy here, best effort to find a match but this might end up with a wrong method.
if (wrapperMethod.isBridge()) {
for (Method method : wrapper.getClass().getMethods()) {
if (!method.isBridge()
&& method.getName().equals(wrapperMethod.getName())
&& method.getParameterCount() == wrapperMethod.getParameterCount()) {
wrapperMethod = method;
break;
}
}
}

Class<?>[] parameterTypes = wrapperMethod.getParameterTypes();
Object[] arguments = new Object[parameterTypes.length];
for (int j = 0; j < arguments.length; j++) {
Class<?> parameterType = parameterTypes[j];
if (parameterType.isArray()) {
arguments[j] = Array.newInstance(parameterType.getComponentType(), 0);
} else if (parameterType.isPrimitive()) {
if (boolean.class.equals(parameterType)) {
arguments[j] = false;
} else if (char.class.equals(parameterType)) {
arguments[j] = 'a';
} else {
arguments[j] = (byte) 0;
}
} else {
arguments[j] = Mockito.mock(parameterType);
}
}

wrapperMethod.invoke(wrapper, arguments);
delegateMethod.invoke(Mockito.verify(delegate, Mockito.times(1)), arguments);
reset(delegate);
}
}

/**
* Test if this method should be skipped in our check for proper forwarding, e.g. because it is just a bridge.
*/
private static boolean checkSkipMethodForwardCheck(Method delegateMethod, Set<Method> skipMethods) {

if (delegateMethod.isBridge()
|| delegateMethod.isDefault()
|| skipMethods.contains(delegateMethod)) {
return true;
}

// skip methods declared in Object (Mockito doesn't like them)
try {
Object.class.getMethod(delegateMethod.getName(), delegateMethod.getParameterTypes());
return true;
} catch (Exception ignore) {
}
return false;
}
}
Expand Up @@ -120,9 +120,7 @@ private static void createHtmlFile(DocumentingRestEndpoint restEndpoint, Path ou
List<MessageHeaders> specs = restEndpoint.getSpecs(); List<MessageHeaders> specs = restEndpoint.getSpecs();
specs.forEach(spec -> html.append(createHtmlEntry(spec))); specs.forEach(spec -> html.append(createHtmlEntry(spec)));


if (Files.exists(outputFile)) { Files.deleteIfExists(outputFile);
Files.delete(outputFile);
}
Files.write(outputFile, html.toString().getBytes(StandardCharsets.UTF_8)); Files.write(outputFile, html.toString().getBytes(StandardCharsets.UTF_8));
} }


Expand Down

0 comments on commit df3e6bb

Please sign in to comment.