Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.TestLogger;

import org.junit.Rule;
import org.junit.Test;
Expand All @@ -49,9 +50,11 @@
import static org.junit.Assert.assertThat;

/** Tests if the coordinator handles up and downscaling. */
public class CoordinatedSourceRescaleITCase {
public class CoordinatedSourceRescaleITCase extends TestLogger {

public static final String CREATED_CHECKPOINT = "successfully created checkpoint";
public static final String RESTORED_CHECKPOINT = "successfully restored checkpoint";

@Rule public final TemporaryFolder temp = new TemporaryFolder();

@Test
Expand Down Expand Up @@ -128,14 +131,16 @@ private StreamExecutionEnvironment createEnv(
private static class FailingMapFunction extends RichMapFunction<Long, Long>
implements CheckpointListener {
private static final long serialVersionUID = 699621912578369378L;
private boolean generateCheckpoint;
private final boolean generateCheckpoint;
private boolean processedRecord;

FailingMapFunction(boolean generateCheckpoint) {
this.generateCheckpoint = generateCheckpoint;
}

@Override
public Long map(Long value) throws Exception {
processedRecord = true;
// run a bit before failing
if (!generateCheckpoint && value % 100 == 42) {
throw new Exception(RESTORED_CHECKPOINT);
Expand All @@ -145,7 +150,7 @@ public Long map(Long value) throws Exception {

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (generateCheckpoint && checkpointId > 5) {
if (generateCheckpoint && processedRecord && checkpointId > 5) {
throw new Exception(CREATED_CHECKPOINT);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ public class Execution

private final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture;

/**
* Gets completed successfully when the task switched to {@link ExecutionState#INITIALIZING} or
* {@link ExecutionState#RUNNING}. If the task never switches to those state, but fails
* immediately, then this future never completes.
*/
private final CompletableFuture<?> initializingOrRunningFuture;

private volatile ExecutionState state = CREATED;

private LogicalSlot assignedResource;
Expand Down Expand Up @@ -214,6 +221,7 @@ public Execution(
this.terminalStateFuture = new CompletableFuture<>();
this.releaseFuture = new CompletableFuture<>();
this.taskManagerLocationFuture = new CompletableFuture<>();
this.initializingOrRunningFuture = new CompletableFuture<>();

this.assignedResource = null;
}
Expand Down Expand Up @@ -352,6 +360,23 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) {
this.taskRestore = taskRestore;
}

/**
* Gets a future that completes once the task execution reaches one of the states {@link
* ExecutionState#INITIALIZING} or {@link ExecutionState#RUNNING}. If this task never reaches
* these states (for example because the task is cancelled before it was properly deployed and
* restored), then this future will never complete.
*
* <p>The future is completed already in the {@link ExecutionState#INITIALIZING} state, because
* various running actions are already possible in that state (the task already accepts and
* sends events and network data for task recovery). (Note that in earlier versions, the
* INITIALIZING state was not separate but part of the RUNNING state).
*
* <p>This future is always completed from the job master's main thread.
*/
public CompletableFuture<?> getInitializingOrRunningFuture() {
return initializingOrRunningFuture;
}

/**
* Gets a future that completes once the task execution reaches a terminal state. The future
* will be completed with specific state that the execution reached. This future is always
Expand Down Expand Up @@ -854,6 +879,8 @@ private void triggerCheckpointHelper(
*/
public CompletableFuture<Acknowledge> sendOperatorEvent(
OperatorID operatorId, SerializedValue<OperatorEvent> event) {

assertRunningInJobMasterMainThread();
final LogicalSlot slot = assignedResource;

if (slot != null && (getState() == RUNNING || getState() == INITIALIZING)) {
Expand Down Expand Up @@ -1426,7 +1453,9 @@ private boolean transitionState(
}
}

if (targetState.isTerminal()) {
if (targetState == INITIALIZING || targetState == RUNNING) {
initializingOrRunningFuture.complete(null);
} else if (targetState.isTerminal()) {
// complete the terminal state future
terminalStateFuture.complete(targetState);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.operators.coordination;

import org.apache.flink.runtime.messages.Acknowledge;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;

/** Simple interface for a component that takes and sends events. */
@FunctionalInterface
interface EventSender {

/**
* Takes the given Callable and calls it at a certain point to send the event. The result of
* that Callable are bridged to the given result future.
*/
void sendEvent(
Callable<CompletableFuture<Acknowledge>> sendAction,
CompletableFuture<Acknowledge> result);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.operators.coordination;

import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* An implementation of the {@link SubtaskAccess} interface that uses the ExecutionGraph's classes,
* specifically {@link Execution} and {@link ExecutionJobVertex} to access tasks.
*/
final class ExecutionSubtaskAccess implements SubtaskAccess {

private final Execution taskExecution;
private final OperatorID operator;
private final IncompleteFuturesTracker futuresTracker;

ExecutionSubtaskAccess(Execution taskExecution, OperatorID operator) {
this.taskExecution = taskExecution;
this.operator = operator;
this.futuresTracker = new IncompleteFuturesTracker();

// this is a safeguard to speed up things: as soon as the task is in a terminal state, all
// the pending futures from events sent to that task should fail immediately
// without this, the futures would only fail after the RPC system hits the ask-timeout.
taskExecution
.getTerminalStateFuture()
.thenAccept(
(state) ->
futuresTracker.failAllFutures(
new FlinkException("Task is no longer running")));
}

@Override
public Callable<CompletableFuture<Acknowledge>> createEventSendAction(
SerializedValue<OperatorEvent> event) {
return () -> {
final CompletableFuture<Acknowledge> result =
taskExecution.sendOperatorEvent(operator, event);
futuresTracker.trackFutureWhileIncomplete(result);
return result;
};
}

@Override
public int getSubtaskIndex() {
return taskExecution.getParallelSubtaskIndex();
}

@Override
public ExecutionAttemptID currentAttempt() {
return taskExecution.getAttemptId();
}

@Override
public String subtaskName() {
return taskExecution.getVertexWithAttempt();
}

@Override
public CompletableFuture<?> hasSwitchedToRunning() {
return taskExecution.getInitializingOrRunningFuture();
}

@Override
public boolean isStillRunning() {
return taskExecution.getState() == ExecutionState.RUNNING
|| taskExecution.getState() == ExecutionState.INITIALIZING;
}

@Override
public void triggerTaskFailover(Throwable cause) {
taskExecution.fail(cause);
}

// ------------------------------------------------------------------------

static final class ExecutionJobVertexSubtaskAccess implements SubtaskAccessFactory {

private final ExecutionJobVertex ejv;
private final OperatorID operator;

ExecutionJobVertexSubtaskAccess(ExecutionJobVertex ejv, OperatorID operator) {
this.ejv = checkNotNull(ejv);
this.operator = checkNotNull(operator);
}

@Override
public SubtaskAccess getAccessForSubtask(int subtask) {
if (subtask < 0 || subtask >= ejv.getParallelism()) {
throw new IllegalArgumentException(
"Subtask index out of bounds [0, " + ejv.getParallelism() + ')');
}

final Execution taskExecution =
ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt();
return new ExecutionSubtaskAccess(taskExecution, operator);
}
}
}
Loading