Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8959][tests] Port AccumulatorLiveITCase to flip6 #5719

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public interface HeartbeatListener<I, O> {
* Retrieves the payload value for the next heartbeat message. Since the operation can happen
* asynchronously, the result is returned wrapped in a future.
*
* @param resourceID Resource ID identifying the receiver of the payload
* @return Future containing the next payload for heartbeats
*/
CompletableFuture<O> retrievePayload();
CompletableFuture<O> retrievePayload(ResourceID resourceID);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import javax.annotation.concurrent.ThreadSafe;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -106,8 +107,8 @@ HeartbeatListener<I, O> getHeartbeatListener() {
return heartbeatListener;
}

Collection<HeartbeatManagerImpl.HeartbeatMonitor<O>> getHeartbeatTargets() {
return heartbeatTargets.values();
Collection<Map.Entry<ResourceID, HeartbeatMonitor<O>>> getHeartbeatTargets() {
return heartbeatTargets.entrySet();
}

//----------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -202,7 +203,7 @@ public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload)
heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
}

CompletableFuture<O> futurePayload = heartbeatListener.retrievePayload();
CompletableFuture<O> futurePayload = heartbeatListener.retrievePayload(requestOrigin);

if (futurePayload != null) {
CompletableFuture<Void> sendHeartbeatFuture = futurePayload.thenAcceptAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.slf4j.Logger;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -62,9 +63,9 @@ public HeartbeatManagerSenderImpl(
public void run() {
if (!stopped) {
log.debug("Trigger heartbeat request.");
for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets()) {
CompletableFuture<O> futurePayload = getHeartbeatListener().retrievePayload();
final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
for (Map.Entry<ResourceID, HeartbeatMonitor<O>> heartbeatTargetEntry : getHeartbeatTargets()) {
CompletableFuture<O> futurePayload = getHeartbeatListener().retrievePayload(heartbeatTargetEntry.getKey());
final HeartbeatTarget<O> heartbeatTarget = heartbeatTargetEntry.getValue().getHeartbeatTarget();

if (futurePayload != null) {
CompletableFuture<Void> requestHeartbeatFuture = futurePayload.thenAcceptAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
Expand Down Expand Up @@ -93,6 +94,7 @@
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
Expand Down Expand Up @@ -166,7 +168,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
private final JobManagerJobMetricGroup jobMetricGroup;

/** The heartbeat manager with task managers. */
private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
private final HeartbeatManager<AccumulatorReport, Void> taskManagerHeartbeatManager;

/** The heartbeat manager with resource manager. */
private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
Expand Down Expand Up @@ -938,8 +940,8 @@ public void disconnectResourceManager(
}

@Override
public void heartbeatFromTaskManager(final ResourceID resourceID) {
taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
public void heartbeatFromTaskManager(final ResourceID resourceID, AccumulatorReport accumulatorReport) {
taskManagerHeartbeatManager.receiveHeartbeat(resourceID, accumulatorReport);
}

@Override
Expand Down Expand Up @@ -1504,7 +1506,7 @@ public void jobStatusChanges(
}
}

private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
private class TaskManagerHeartbeatListener implements HeartbeatListener<AccumulatorReport, Void> {

private final JobMasterGateway jobMasterGateway;

Expand All @@ -1522,12 +1524,14 @@ public void notifyHeartbeatTimeout(ResourceID resourceID) {
}

@Override
public void reportPayload(ResourceID resourceID, Void payload) {
// nothing to do since there is no payload
public void reportPayload(ResourceID resourceID, AccumulatorReport payload) {
for (AccumulatorSnapshot snapshot : payload) {
executionGraph.updateAccumulators(snapshot);
}
}

@Override
public CompletableFuture<Void> retrievePayload() {
public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
return CompletableFuture.completedFuture(null);
}
}
Expand All @@ -1551,7 +1555,7 @@ public void reportPayload(ResourceID resourceID, Void payload) {
}

@Override
public CompletableFuture<Void> retrievePayload() {
public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
return CompletableFuture.completedFuture(null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
Expand Down Expand Up @@ -219,8 +220,11 @@ CompletableFuture<RegistrationResponse> registerTaskManager(
* Sends the heartbeat to job manager from task manager.
*
* @param resourceID unique id of the task manager
* @param accumulatorReport report containing accumulator updates
*/
void heartbeatFromTaskManager(final ResourceID resourceID);
void heartbeatFromTaskManager(
final ResourceID resourceID,
final AccumulatorReport accumulatorReport);

/**
* Sends heartbeat request from the resource manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ public void run() {
}

@Override
public CompletableFuture<Void> retrievePayload() {
public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
return CompletableFuture.completedFuture(null);
}
}
Expand Down Expand Up @@ -1109,7 +1109,7 @@ public void reportPayload(ResourceID resourceID, Void payload) {
}

@Override
public CompletableFuture<Void> retrievePayload() {
public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
return CompletableFuture.completedFuture(null);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskexecutor;

import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;

/**
* A report about the current values of all accumulators of the TaskExecutor for a given job.
*/
public class AccumulatorReport implements Serializable, Iterable<AccumulatorSnapshot> {
private final List<AccumulatorSnapshot> accumulatorSnapshots;

public AccumulatorReport(List<AccumulatorSnapshot> accumulatorSnapshots) {
this.accumulatorSnapshots = accumulatorSnapshots;
}

@Override
public Iterator<AccumulatorSnapshot> iterator() {
return accumulatorSnapshots.iterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -138,7 +139,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
private final TaskManagerConfiguration taskManagerConfiguration;

/** The heartbeat manager for job manager in the task manager. */
private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
private final HeartbeatManager<Void, AccumulatorReport> jobManagerHeartbeatManager;

/** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, SlotReport> resourceManagerHeartbeatManager;
Expand Down Expand Up @@ -1050,14 +1051,14 @@ private void establishJobManagerConnection(JobID jobId, final JobMasterGateway j
jobManagerTable.put(jobId, newJobManagerConnection);

// monitor the job manager as heartbeat target
jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<Void>() {
jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<AccumulatorReport>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
jobMasterGateway.heartbeatFromTaskManager(resourceID);
public void receiveHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
}

@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {
public void requestHeartbeat(ResourceID resourceID, AccumulatorReport payload) {
// request heartbeat will never be called on the task manager side
}
});
Expand Down Expand Up @@ -1488,7 +1489,7 @@ public void timeoutSlot(final AllocationID allocationId, final UUID ticket) {
}
}

private class JobManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
private class JobManagerHeartbeatListener implements HeartbeatListener<Void, AccumulatorReport> {

@Override
public void notifyHeartbeatTimeout(final ResourceID resourceID) {
Expand All @@ -1515,8 +1516,22 @@ public void reportPayload(ResourceID resourceID, Void payload) {
}

@Override
public CompletableFuture<Void> retrievePayload() {
return CompletableFuture.completedFuture(null);
public CompletableFuture<AccumulatorReport> retrievePayload(ResourceID resourceID) {
JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
if (jobManagerConnection != null) {
JobID jobId = jobManagerConnection.getJobID();

List<AccumulatorSnapshot> accumulatorSnapshots = new ArrayList<>(16);
Iterator<Task> allTasks = taskSlotTable.getTasks(jobId);

while (allTasks.hasNext()) {
Task task = allTasks.next();
accumulatorSnapshots.add(task.getAccumulatorRegistry().getSnapshot());
}
return CompletableFuture.completedFuture(new AccumulatorReport(accumulatorSnapshots));
} else {
return CompletableFuture.completedFuture(new AccumulatorReport(Collections.emptyList()));
}
}
}

Expand Down Expand Up @@ -1544,7 +1559,7 @@ public void reportPayload(ResourceID resourceID, Void payload) {
}

@Override
public CompletableFuture<SlotReport> retrievePayload() {
public CompletableFuture<SlotReport> retrievePayload(ResourceID resourceID) {
return callAsync(
() -> taskSlotTable.createSlotReport(getResourceID()),
taskManagerConfiguration.getTimeout());
Expand Down