Skip to content

Commit

Permalink
[FLINK-8881][runtime] Send accumulator updates via heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol authored and tillrohrmann committed Mar 20, 2018
1 parent c90a757 commit 95d4c01
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 16 deletions.
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException; import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason; import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
Expand Down Expand Up @@ -93,6 +94,7 @@
import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskExecutionState;
Expand Down Expand Up @@ -166,7 +168,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
private final JobManagerJobMetricGroup jobMetricGroup; private final JobManagerJobMetricGroup jobMetricGroup;


/** The heartbeat manager with task managers. */ /** The heartbeat manager with task managers. */
private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager; private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;


/** The heartbeat manager with resource manager. */ /** The heartbeat manager with resource manager. */
private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager; private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
Expand Down Expand Up @@ -938,8 +940,8 @@ public void disconnectResourceManager(
} }


@Override @Override
public void heartbeatFromTaskManager(final ResourceID resourceID) { public void heartbeatFromTaskManager(final ResourceID resourceID, AccumulatorReport accumulatorReport) {
taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null); taskManagerHeartbeatManager.receiveHeartbeat(resourceID, accumulatorReport);
} }


@Override @Override
Expand Down Expand Up @@ -1504,7 +1506,7 @@ public void jobStatusChanges(
} }
} }


private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> { private class TaskManagerHeartbeatListener implements HeartbeatListener<AccumulatorReport, Void> {


private final JobMasterGateway jobMasterGateway; private final JobMasterGateway jobMasterGateway;


Expand All @@ -1522,8 +1524,10 @@ public void notifyHeartbeatTimeout(ResourceID resourceID) {
} }


@Override @Override
public void reportPayload(ResourceID resourceID, Void payload) { public void reportPayload(ResourceID resourceID, AccumulatorReport payload) {
// nothing to do since there is no payload for (AccumulatorSnapshot snapshot : payload.getAccumulatorSnapshots()) {
executionGraph.updateAccumulators(snapshot);
}
} }


@Override @Override
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse; import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
Expand Down Expand Up @@ -219,8 +220,11 @@ CompletableFuture<RegistrationResponse> registerTaskManager(
* Sends the heartbeat to job manager from task manager. * Sends the heartbeat to job manager from task manager.
* *
* @param resourceID unique id of the task manager * @param resourceID unique id of the task manager
* @param accumulatorReport report containing accumulator updates
*/ */
void heartbeatFromTaskManager(final ResourceID resourceID); void heartbeatFromTaskManager(
final ResourceID resourceID,
final AccumulatorReport accumulatorReport);


/** /**
* Sends heartbeat request from the resource manager. * Sends heartbeat request from the resource manager.
Expand Down
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskexecutor;

import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;

import java.io.Serializable;
import java.util.Collection;
import java.util.List;

/**
* A report about the current values of all accumulators of the TaskExecutor for a given job.
*/
public class AccumulatorReport implements Serializable {
private final Collection<AccumulatorSnapshot> accumulatorSnapshots;

public AccumulatorReport(List<AccumulatorSnapshot> accumulatorSnapshots) {
this.accumulatorSnapshots = accumulatorSnapshots;
}

public Collection<AccumulatorSnapshot> getAccumulatorSnapshots() {
return accumulatorSnapshots;
}
}
Expand Up @@ -107,6 +107,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
Expand Down Expand Up @@ -138,7 +139,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
private final TaskManagerConfiguration taskManagerConfiguration; private final TaskManagerConfiguration taskManagerConfiguration;


/** The heartbeat manager for job manager in the task manager. */ /** The heartbeat manager for job manager in the task manager. */
private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager; private final HeartbeatManager<Void, AccumulatorReport> jobManagerHeartbeatManager;


/** The heartbeat manager for resource manager in the task manager. */ /** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, SlotReport> resourceManagerHeartbeatManager; private final HeartbeatManager<Void, SlotReport> resourceManagerHeartbeatManager;
Expand Down Expand Up @@ -1050,14 +1051,14 @@ private void establishJobManagerConnection(JobID jobId, final JobMasterGateway j
jobManagerTable.put(jobId, newJobManagerConnection); jobManagerTable.put(jobId, newJobManagerConnection);


// monitor the job manager as heartbeat target // monitor the job manager as heartbeat target
jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<Void>() { jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<AccumulatorReport>() {
@Override @Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) { public void receiveHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
jobMasterGateway.heartbeatFromTaskManager(resourceID); jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
} }


@Override @Override
public void requestHeartbeat(ResourceID resourceID, Void payload) { public void requestHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
// request heartbeat will never be called on the task manager side // request heartbeat will never be called on the task manager side
} }
}); });
Expand Down Expand Up @@ -1488,7 +1489,7 @@ public void timeoutSlot(final AllocationID allocationId, final UUID ticket) {
} }
} }


private class JobManagerHeartbeatListener implements HeartbeatListener<Void, Void> { private class JobManagerHeartbeatListener implements HeartbeatListener<Void, AccumulatorReport> {


@Override @Override
public void notifyHeartbeatTimeout(final ResourceID resourceID) { public void notifyHeartbeatTimeout(final ResourceID resourceID) {
Expand All @@ -1515,8 +1516,23 @@ public void reportPayload(ResourceID resourceID, Void payload) {
} }


@Override @Override
public CompletableFuture<Void> retrievePayload(ResourceID resourceID) { public CompletableFuture<AccumulatorReport> retrievePayload(ResourceID resourceID) {
return CompletableFuture.completedFuture(null); validateRunsInMainThread();
JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
if (jobManagerConnection != null) {
JobID jobId = jobManagerConnection.getJobID();

List<AccumulatorSnapshot> accumulatorSnapshots = new ArrayList<>(16);
Iterator<Task> allTasks = taskSlotTable.getTasks(jobId);

while (allTasks.hasNext()) {
Task task = allTasks.next();
accumulatorSnapshots.add(task.getAccumulatorRegistry().getSnapshot());
}
return CompletableFuture.completedFuture(new AccumulatorReport(accumulatorSnapshots));
} else {
return CompletableFuture.completedFuture(new AccumulatorReport(Collections.emptyList()));
}
} }
} }


Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse; import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
Expand Down Expand Up @@ -130,7 +131,7 @@ public CompletableFuture<RegistrationResponse> registerTaskManager(String taskMa
} }


@Override @Override
public void heartbeatFromTaskManager(ResourceID resourceID) { public void heartbeatFromTaskManager(ResourceID resourceID, AccumulatorReport accumulatorReport) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


Expand Down

0 comments on commit 95d4c01

Please sign in to comment.