Skip to content

Commit

Permalink
[FLINK-5985] Report no task states for stateless tasks in checkpointing
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Mar 13, 2017
1 parent 7456d78 commit af45e89
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 52 deletions.
Expand Up @@ -238,6 +238,10 @@ public int getKeyGroupPrefixBytes() {
return keyGroupPrefixBytes;
}

private boolean hasRegisteredState() {
return !kvStateInformation.isEmpty();
}

/**
* Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
* is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
Expand Down Expand Up @@ -265,13 +269,12 @@ public RunnableFuture<KeyGroupsStateHandle> snapshot(

if (db != null) {

if (kvStateInformation.isEmpty()) {
if (!hasRegisteredState()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
" . Returning null.");
}

return new DoneFuture<>(null);
return DoneFuture.nullValue();
}

snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
Expand Down
Expand Up @@ -161,7 +161,7 @@ public RunnableFuture<OperatorStateHandle> snapshot(
CheckpointOptions checkpointOptions) throws Exception {

if (registeredStates.isEmpty()) {
return new DoneFuture<>(null);
return DoneFuture.nullValue();
}

List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
Expand Down
Expand Up @@ -30,10 +30,13 @@
* @param <T> The type of object in this {@code Future}.
*/
public class DoneFuture<T> implements RunnableFuture<T> {
private final T keyGroupsStateHandle;

public DoneFuture(T keyGroupsStateHandle) {
this.keyGroupsStateHandle = keyGroupsStateHandle;
private static final DoneFuture<?> NULL_FUTURE = new DoneFuture<Object>(null);

private final T payload;

public DoneFuture(T payload) {
this.payload = payload;
}

@Override
Expand All @@ -53,7 +56,7 @@ public boolean isDone() {

@Override
public T get() throws InterruptedException, ExecutionException {
return keyGroupsStateHandle;
return payload;
}

@Override
Expand All @@ -67,4 +70,9 @@ public T get(
public void run() {

}

@SuppressWarnings("unchecked")
public static <T> DoneFuture<T> nullValue() {
return (DoneFuture<T>) NULL_FUTURE;
}
}
Expand Up @@ -36,7 +36,8 @@
public interface ManagedInitializationContext {

/**
* Returns true, if some managed state was restored from the snapshot of a previous execution.
* Returns true, if state was restored from the snapshot of a previous execution. This returns always false for
* stateless tasks.
*/
boolean isRestored();

Expand Down
Expand Up @@ -18,12 +18,13 @@

package org.apache.flink.runtime.state;

import org.apache.flink.runtime.checkpoint.CheckpointOptions;

import java.util.Collection;
import java.util.concurrent.RunnableFuture;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;

/**
* Interface for operations that can perform snapshots of their state.
* Interface for operators that can perform snapshots of their state.
*
* @param <S> Generic type of the state object that is created as handle to snapshots.
*/
Expand Down
Expand Up @@ -144,6 +144,10 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
return stateTable;
}

private boolean hasRegisteredState() {
return !stateTables.isEmpty();
}

@Override
public <N, V> InternalValueState<N, V> createValueState(
TypeSerializer<N> namespaceSerializer,
Expand Down Expand Up @@ -219,8 +223,8 @@ public RunnableFuture<KeyGroupsStateHandle> snapshot(
CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {

if (stateTables.isEmpty()) {
return new DoneFuture<>(null);
if (!hasRegisteredState()) {
return DoneFuture.nullValue();
}

try (CheckpointStreamFactory.CheckpointStateOutputStream stream = streamFactory.
Expand Down
Expand Up @@ -24,11 +24,10 @@
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;

import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import org.mockito.Mockito;

import java.io.File;
Expand All @@ -48,14 +47,18 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.when;

public class PendingCheckpointTest {

private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap<>();
private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();

static {
ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class));
ExecutionVertex vertex = mock(ExecutionVertex.class);
when(vertex.getMaxParallelism()).thenReturn(128);
when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(1);
ACK_TASKS.put(ATTEMPT_ID, vertex);
}

@Rule
Expand Down Expand Up @@ -287,6 +290,32 @@ public void testPendingCheckpointStatsCallbacks() throws Exception {
}
}

/**
* FLINK-5985
* <p>
* Ensures that subtasks that acknowledge their state as 'null' are considered stateless. This means that they
* should not appear in the task states map of the checkpoint.
*/
@Test
public void testNullSubtaskStateLeadsToStatelessTask() throws Exception {
PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class));
Assert.assertTrue(pending.getTaskStates().isEmpty());
}

/**
* FLINK-5985
* <p>
* This tests checks the inverse of {@link #testNullSubtaskStateLeadsToStatelessTask()}. We want to test that
* for subtasks that acknowledge some state are given an entry in the task states of the checkpoint.
*/
@Test
public void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception {
PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
pending.acknowledgeTask(ATTEMPT_ID, mock(SubtaskState.class), mock(CheckpointMetrics.class));
Assert.assertFalse(pending.getTaskStates().isEmpty());
}

// ------------------------------------------------------------------------

private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
Expand Down
Expand Up @@ -23,11 +23,14 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.FutureUtil;
import org.junit.Assert;
import org.junit.Test;

import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.RunnableFuture;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -143,6 +146,19 @@ public void testRegisterStates() throws Exception {
}
}

@Test
public void testSnapshotEmpty() throws Exception {
DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
CheckpointStreamFactory streamFactory =
abstractStateBackend.createStreamFactory(new JobID(), "testOperator");

RunnableFuture<OperatorStateHandle> snapshot =
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());

OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot);
Assert.assertNull(stateHandle);
}

@Test
public void testSnapshotRestore() throws Exception {
DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
Expand All @@ -166,7 +182,8 @@ public void testSnapshotRestore() throws Exception {
listState3.add(20);

CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()).get();
OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()));

try {

Expand Down

0 comments on commit af45e89

Please sign in to comment.