Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,10 @@ private static final class RocksDBIncrementalSnapshotOperation<K> {
/** The state meta data. */
private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>();

/** Local filesystem for the RocksDB backup. */
private FileSystem backupFileSystem;

/** Local path for the RocksDB backup. */
private Path backupPath;

// Registry for all opened i/o streams
Expand Down Expand Up @@ -831,11 +834,12 @@ private StreamStateHandle materializeStateData(Path filePath) throws Exception {
return result;

} finally {
if (inputStream != null && closeableRegistry.unregisterCloseable(inputStream)) {

if (closeableRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}

if (outputStream != null && closeableRegistry.unregisterCloseable(outputStream)) {
if (closeableRegistry.unregisterCloseable(outputStream)) {
outputStream.close();
}
}
Expand Down Expand Up @@ -1041,7 +1045,13 @@ public void restore(Collection<KeyedStateHandle> restoreState) throws Exception

@Override
public void notifyCheckpointComplete(long completedCheckpointId) {

if (!enableIncrementalCheckpointing) {
return;
}

synchronized (materializedSstFiles) {

if (completedCheckpointId < lastCompletedCheckpointId) {
return;
}
Expand Down Expand Up @@ -1153,8 +1163,7 @@ private void restoreKeyGroupsInStateHandle()
restoreKVStateMetaData();
restoreKVStateData();
} finally {
if (currentStateHandleInStream != null
&& rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
if (rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
IOUtils.closeQuietly(currentStateHandleInStream);
}
}
Expand Down Expand Up @@ -1318,7 +1327,7 @@ private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBack

return serializationProxy.getStateMetaInfoSnapshots();
} finally {
if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
if (stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}
}
Expand Down Expand Up @@ -1350,11 +1359,11 @@ private void readStateData(
outputStream.write(buffer, 0, numBytes);
}
} finally {
if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
if (stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}

if (outputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
if (stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
outputStream.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
Expand All @@ -42,10 +43,12 @@
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
Expand Down Expand Up @@ -132,21 +135,15 @@ public String getKey(String value) throws Exception {
final OneShotLatch delayCheckpointLatch = new OneShotLatch();
final OneShotLatch ensureCheckpointLatch = new OneShotLatch();

StreamMockEnvironment mockEnv = new StreamMockEnvironment(
testHarness.jobConfig,
testHarness.taskConfig,
testHarness.memorySize,
new MockInputSplitProvider(),
testHarness.bufferSize) {
CheckpointResponder checkpointResponderMock = new CheckpointResponder() {

@Override
public void acknowledgeCheckpoint(
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot checkpointStateHandles) {

super.acknowledgeCheckpoint(checkpointId, checkpointMetrics);

JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot subtaskState) {
// block on the latch, to verify that triggerCheckpoint returns below,
// even though the async checkpoint would not finish
try {
Expand All @@ -156,7 +153,7 @@ public void acknowledgeCheckpoint(
}

boolean hasManagedKeyedState = false;
for (Map.Entry<OperatorID, OperatorSubtaskState> entry : checkpointStateHandles.getSubtaskStateMappings()) {
for (Map.Entry<OperatorID, OperatorSubtaskState> entry : subtaskState.getSubtaskStateMappings()) {
OperatorSubtaskState state = entry.getValue();
if (state != null) {
hasManagedKeyedState |= state.getManagedKeyedState() != null;
Expand All @@ -169,8 +166,30 @@ public void acknowledgeCheckpoint(
// we now know that the checkpoint went through
ensureCheckpointLatch.trigger();
}

@Override
public void declineCheckpoint(
JobID jobID, ExecutionAttemptID executionAttemptID,
long checkpointId, Throwable cause) {

}
};

JobID jobID = new JobID();
ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(0L, 0L);
TestTaskStateManager taskStateManagerTestMock = new TestTaskStateManager(
jobID,
executionAttemptID,
checkpointResponderMock);

StreamMockEnvironment mockEnv = new StreamMockEnvironment(
testHarness.jobConfig,
testHarness.taskConfig,
testHarness.memorySize,
new MockInputSplitProvider(),
testHarness.bufferSize,
taskStateManagerTestMock);

testHarness.invoke(mockEnv);

// wait for the task to be running
Expand Down Expand Up @@ -260,12 +279,15 @@ public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointS
streamConfig.setStreamOperator(new AsyncCheckpointOperator());
streamConfig.setOperatorID(new OperatorID());

TestTaskStateManager taskStateManagerTestMock = new TestTaskStateManager();

StreamMockEnvironment mockEnv = new StreamMockEnvironment(
testHarness.jobConfig,
testHarness.taskConfig,
testHarness.memorySize,
new MockInputSplitProvider(),
testHarness.bufferSize);
testHarness.bufferSize,
taskStateManagerTestMock);

blockerCheckpointStreamFactory.setBlockerLatch(new OneShotLatch());
blockerCheckpointStreamFactory.setWaiterLatch(new OneShotLatch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
Expand Down Expand Up @@ -375,11 +376,13 @@ public Map<String, Object> getComponentConfiguration() {
when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
when(env.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());

final CloseableRegistry closeableRegistry = new CloseableRegistry();
StreamTask<?, ?> mockTask = mock(StreamTask.class);
when(mockTask.getCheckpointLock()).thenReturn(new Object());
when(mockTask.getConfiguration()).thenReturn(new StreamConfig(new Configuration()));
when(mockTask.getEnvironment()).thenReturn(env);
when(mockTask.getExecutionConfig()).thenReturn(execConfig);
when(mockTask.getCancelables()).thenReturn(closeableRegistry);

return mockTask;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;

/**
* This interface represents an iterable that is also closeable.
*
* @param <T> type of the iterated objects.
*/
public interface CloseableIterable<T> extends Iterable<T>, Closeable {

class Empty<T> implements CloseableIterable<T> {

private Empty() {
}

@Override
public void close() throws IOException {

}

@Override
public Iterator<T> iterator() {
return Collections.emptyIterator();
}
}

static <T> CloseableIterable<T> empty() {
return new CloseableIterable.Empty<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,38 @@

package org.apache.flink.util;

import org.apache.flink.annotation.Internal;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;

/**
* Tagging interface for migration related classes.
* This interface represents an iterator that is also closeable.
*
* @param <T> type of the iterated objects.
*/
@Internal
public interface Migration {
public interface CloseableIterator<T> extends Iterator<T>, Closeable {

class Empty<T> implements CloseableIterator<T> {

private Empty() {
}

@Override
public boolean hasNext() {
return false;
}

@Override
public T next() {
return null;
}

@Override
public void close() throws IOException {
}
}

static <T> CloseableIterator<T> empty() {
return new Empty<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ public boolean restoreLatestCheckpointedState(
final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();

StateAssignmentOperation stateAssignmentOperation =
new StateAssignmentOperation(tasks, operatorStates, allowNonRestoredState);
new StateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState);

stateAssignmentOperation.assignStates();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint;

import java.io.Serializable;

/**
* This class encapsulates the data from the job manager to restore a task.
*/
public class JobManagerTaskRestore implements Serializable {

private static final long serialVersionUID = 1L;

private final long restoreCheckpointId;

private final TaskStateSnapshot taskStateSnapshot;

public JobManagerTaskRestore(long restoreCheckpointId, TaskStateSnapshot taskStateSnapshot) {
this.restoreCheckpointId = restoreCheckpointId;
this.taskStateSnapshot = taskStateSnapshot;
}

public long getRestoreCheckpointId() {
return restoreCheckpointId;
}

public TaskStateSnapshot getTaskStateSnapshot() {
return taskStateSnapshot;
}

@Override
public String toString() {
return "TaskRestore{" +
"restoreCheckpointId=" + restoreCheckpointId +
", taskStateSnapshot=" + taskStateSnapshot +
'}';
}
}
Loading