Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -747,14 +747,15 @@ public void deploy() throws JobException {
final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
vertex.getExecutionGraph().getJobMasterMainThreadExecutor();

getVertex().notifyPendingDeployment(this);
// We run the submission in the future executor so that the serialization of large TDDs does not block
// the main thread and sync back to the main thread once submission is completed.
CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
.thenCompose(Function.identity())
.whenCompleteAsync(
(ack, failure) -> {
if (failure == null) {
vertex.notifyDeployment(this);
vertex.notifyCompletedDeployment(this);
} else {
if (failure instanceof TimeoutException) {
String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;

/**
* A listener that is called when an execution has been deployed.
* A listener that is called when the deployment of an execution has been started/completed.
*/
public interface ExecutionDeploymentListener {
void onCompletedDeployment(ExecutionAttemptID execution, ResourceID host);
void onStartedDeployment(ExecutionAttemptID execution, ResourceID host);

void onCompletedDeployment(ExecutionAttemptID execution);
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public static ExecutionGraph buildGraph(
shuffleMaster,
partitionTracker,
failoverStrategy,
(execution, host) -> {},
NoOpExecutionDeploymentListener.get(),
(execution, newState) -> {});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -874,16 +874,26 @@ void executionFinished(Execution execution) {
// Miscellaneous
// --------------------------------------------------------------------------------------------

void notifyDeployment(Execution execution) {
void notifyPendingDeployment(Execution execution) {
// only forward this notification if the execution is still the current execution
// otherwise we have an outdated execution
if (isCurrentExecution(execution)) {
getExecutionGraph().getExecutionDeploymentListener().onCompletedDeployment(
getExecutionGraph().getExecutionDeploymentListener().onStartedDeployment(
execution.getAttemptId(),
execution.getAssignedResourceLocation().getResourceID());
}
}

void notifyCompletedDeployment(Execution execution) {
// only forward this notification if the execution is still the current execution
// otherwise we have an outdated execution
if (isCurrentExecution(execution)) {
getExecutionGraph().getExecutionDeploymentListener().onCompletedDeployment(
execution.getAttemptId()
);
}
}

/**
* Simply forward this notification.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.runtime.clusterframework.types.ResourceID;

/**
* No-op implementation of {@link ExecutionDeploymentListener}.
*/
public enum NoOpExecutionDeploymentListener implements ExecutionDeploymentListener {
INSTANCE;

@Override
public void onStartedDeployment(ExecutionAttemptID execution, ResourceID host) {}

@Override
public void onCompletedDeployment(ExecutionAttemptID execution) {}

public static ExecutionDeploymentListener get() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
Expand All @@ -37,21 +38,21 @@ public DefaultExecutionDeploymentReconciler(ExecutionDeploymentReconciliationHan
}

@Override
public void reconcileExecutionDeployments(ResourceID taskExecutorHost, ExecutionDeploymentReport executionDeploymentReport, Set<ExecutionAttemptID> expectedDeployedExecutions) {
final Set<ExecutionAttemptID> unknownExecutions = new HashSet<>();
final Set<ExecutionAttemptID> expectedExecutions = new HashSet<>(expectedDeployedExecutions);

for (ExecutionAttemptID executionAttemptID : executionDeploymentReport.getExecutions()) {
boolean isTracked = expectedExecutions.remove(executionAttemptID);
if (!isTracked) {
unknownExecutions.add(executionAttemptID);
public void reconcileExecutionDeployments(ResourceID taskExecutorHost, ExecutionDeploymentReport executionDeploymentReport, Map<ExecutionAttemptID, ExecutionDeploymentState> expectedDeployedExecutions) {
final Set<ExecutionAttemptID> unknownExecutions = new HashSet<>(executionDeploymentReport.getExecutions());
final Set<ExecutionAttemptID> missingExecutions = new HashSet<>();

for (Map.Entry<ExecutionAttemptID, ExecutionDeploymentState> execution : expectedDeployedExecutions.entrySet()) {
boolean deployed = unknownExecutions.remove(execution.getKey());
if (!deployed && execution.getValue() != ExecutionDeploymentState.PENDING) {
missingExecutions.add(execution.getKey());
}
}
if (!unknownExecutions.isEmpty()) {
handler.onUnknownDeploymentsOf(unknownExecutions, taskExecutorHost);
}
if (!expectedExecutions.isEmpty()) {
handler.onMissingDeploymentsOf(expectedExecutions, taskExecutorHost);
if (!missingExecutions.isEmpty()) {
handler.onMissingDeploymentsOf(missingExecutions, taskExecutorHost);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,32 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Default {@link ExecutionDeploymentTracker} implementation.
*/
public class DefaultExecutionDeploymentTracker implements ExecutionDeploymentTracker {

private final Set<ExecutionAttemptID> pendingDeployments = new HashSet<>();
private final Map<ResourceID, Set<ExecutionAttemptID>> executionsByHost = new HashMap<>();
private final Map<ExecutionAttemptID, ResourceID> hostByExecution = new HashMap<>();

@Override
public void startTrackingDeploymentOf(ExecutionAttemptID executionAttemptId, ResourceID host) {
public void startTrackingPendingDeploymentOf(ExecutionAttemptID executionAttemptId, ResourceID host) {
pendingDeployments.add(executionAttemptId);
hostByExecution.put(executionAttemptId, host);
executionsByHost.computeIfAbsent(host, ignored -> new HashSet<>()).add(executionAttemptId);
}

@Override
public void completeDeploymentOf(ExecutionAttemptID executionAttemptId) {
pendingDeployments.remove(executionAttemptId);
}

@Override
public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId) {
pendingDeployments.remove(executionAttemptId);
ResourceID host = hostByExecution.remove(executionAttemptId);
if (host != null) {
executionsByHost.computeIfPresent(host, (resourceID, executionAttemptIds) -> {
Expand All @@ -55,7 +64,11 @@ public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId) {
}

@Override
public Set<ExecutionAttemptID> getExecutionsOn(ResourceID host) {
return executionsByHost.getOrDefault(host, Collections.emptySet());
public Map<ExecutionAttemptID, ExecutionDeploymentState> getExecutionsOn(ResourceID host) {
return executionsByHost.getOrDefault(host, Collections.emptySet())
.stream()
.collect(Collectors.toMap(
x -> x,
x -> pendingDeployments.contains(x) ? ExecutionDeploymentState.PENDING : ExecutionDeploymentState.DEPLOYED));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;

import java.util.Set;
import java.util.Map;

/**
* Component for reconciling the deployment state of executions.
Expand All @@ -40,8 +40,11 @@ interface Factory {
*
* @param taskExecutorHost hosting task executor
* @param executionDeploymentReport task executor report for deployed executions
* @param expectedDeployedExecutionIds set of expected deployed executions
* @param expectedDeployedExecutionIds map of expected executions and their current deployment status
*/
void reconcileExecutionDeployments(ResourceID taskExecutorHost, ExecutionDeploymentReport executionDeploymentReport, Set<ExecutionAttemptID> expectedDeployedExecutionIds);
void reconcileExecutionDeployments(
ResourceID taskExecutorHost,
ExecutionDeploymentReport executionDeploymentReport,
Map<ExecutionAttemptID, ExecutionDeploymentState> expectedDeployedExecutionIds);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster;

/**
* Possible states for the deployment of an execution.
*/
public enum ExecutionDeploymentState {
/** The deployment has or is about to be started. */
PENDING,
/** The deployment has been acknowledged by the TaskExecutor. */
DEPLOYED
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,27 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;

import java.util.Set;
import java.util.Map;

/**
* A tracker for deployed executions.
*/
public interface ExecutionDeploymentTracker {

/**
* Starts tracking the given execution that was deployed on the given host.
* Starts tracking the given execution that is being deployed on the given host.
*
* @param executionAttemptId execution to start tracking
* @param host hosting task executor
*/
void startTrackingDeploymentOf(ExecutionAttemptID executionAttemptId, ResourceID host);
void startTrackingPendingDeploymentOf(ExecutionAttemptID executionAttemptId, ResourceID host);

/**
* Marks the deployment of the given execution as complete.
*
* @param executionAttemptId execution whose deployment to mark as complete
*/
void completeDeploymentOf(ExecutionAttemptID executionAttemptId);

/**
* Stops tracking the given execution.
Expand All @@ -48,6 +55,5 @@ public interface ExecutionDeploymentTracker {
* @param host hosting task executor
* @return tracked executions
*/
Set<ExecutionAttemptID> getExecutionsOn(ResourceID host);

Map<ExecutionAttemptID, ExecutionDeploymentState> getExecutionsOn(ResourceID host);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;

/**
* An adapter for using an {@link ExecutionDeploymentTracker} as an {@link ExecutionDeploymentListener}.
*/
public class ExecutionDeploymentTrackerDeploymentListenerAdapter implements ExecutionDeploymentListener {
private final ExecutionDeploymentTracker executionDeploymentTracker;

public ExecutionDeploymentTrackerDeploymentListenerAdapter(ExecutionDeploymentTracker executionDeploymentTracker) {
this.executionDeploymentTracker = executionDeploymentTracker;
}

@Override
public void onStartedDeployment(ExecutionAttemptID execution, ResourceID host) {
executionDeploymentTracker.startTrackingPendingDeploymentOf(execution, host);
}

@Override
public void onCompletedDeployment(ExecutionAttemptID execution) {
executionDeploymentTracker.completeDeploymentOf(execution);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void onUnknownDeploymentsOf(Collection<ExecutionAttemptID> executionAttem
for (ExecutionAttemptID executionAttemptId : executionAttemptIds) {
Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerInfo = registeredTaskManagers.get(host);
if (taskManagerInfo != null) {
//taskManagerInfo.f1.cancelTask(executionAttemptId, rpcTimeout);
taskManagerInfo.f1.cancelTask(executionAttemptId, rpcTimeout);
}
}
}
Expand Down Expand Up @@ -1234,7 +1234,10 @@ public void notifyHeartbeatTimeout(ResourceID resourceID) {
@Override
public void reportPayload(ResourceID resourceID, TaskExecutorToJobManagerHeartbeatPayload payload) {
validateRunsInMainThread();
executionDeploymentReconciler.reconcileExecutionDeployments(resourceID, payload.getExecutionDeploymentReport(), executionDeploymentTracker.getExecutionsOn(resourceID));
executionDeploymentReconciler.reconcileExecutionDeployments(
resourceID,
payload.getExecutionDeploymentReport(),
executionDeploymentTracker.getExecutionsOn(resourceID));
for (AccumulatorSnapshot snapshot : payload.getAccumulatorReport().getAccumulatorSnapshots()) {
schedulerNG.updateAccumulators(snapshot);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
Expand Down Expand Up @@ -272,7 +273,7 @@ private ExecutionGraph createExecutionGraph(
final JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker) throws JobExecutionException, JobException {

ExecutionDeploymentListener executionDeploymentListener = executionDeploymentTracker::startTrackingDeploymentOf;
ExecutionDeploymentListener executionDeploymentListener = new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
ExecutionStateUpdateListener executionStateUpdateListener = (execution, newState) -> {
if (newState.isTerminal()) {
executionDeploymentTracker.stopTrackingDeploymentOf(execution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static TestingExecutionGraphBuilder newBuilder() {
private JobGraph jobGraph = new JobGraph();
private MetricGroup metricGroup = new UnregisteredMetricsGroup();
private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
private ExecutionDeploymentListener executionDeploymentListener = (execution, host) -> {};
private ExecutionDeploymentListener executionDeploymentListener = NoOpExecutionDeploymentListener.get();
private ExecutionStateUpdateListener executionStateUpdateListener = (execution, newState) -> {};

private TestingExecutionGraphBuilder() {
Expand Down
Loading