From a144d021c31b22d70b6b4e59391dad150afea29b Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 1 Nov 2016 21:14:40 +0100 Subject: [PATCH 1/2] [FLINK-4944] Replace Akka's death watch with own heartbeat on the TM side This PR introduces the HeartbeatActor which is used by the TaskManager to monitor the JobManager. The HeartbeatActor constantly sends Heartbeat messages to the JobManager which responds with a HeartbeatResponse. If the HeartbeatResponse fails to be received for an acceptable heartbeat pause, then the HeartbeatActor sends a HeartbeatTimeout message to the owner of the HeartbeatActor. The acceptable heartbeat pause can be extended by the HeartbeatActor if it detects that it has been stalled by garbage collection, for example. The HeartbeatActor is started as a child actor of the TaskManager. Add ClusterOptions Add comments --- docs/setup/config.md | 3 + .../flink/configuration/ClusterOptions.java | 53 ++++ .../TaskManagerConfiguration.java | 52 +++- .../taskmanager/heartbeat/HeartbeatActor.java | 231 +++++++++++++++ .../heartbeat/messages/Heartbeat.java | 54 ++++ .../heartbeat/messages/HeartbeatResponse.java | 38 +++ .../heartbeat/messages/HeartbeatTimeout.java | 51 ++++ .../messages/RequestAccumulatorSnapshots.java | 40 +++ .../messages/UpdateAccumulatorSnapshots.java | 45 +++ .../flink/runtime/jobmanager/JobManager.scala | 15 +- .../messages/TaskManagerMessages.scala | 23 -- .../runtime/taskmanager/TaskManager.scala | 166 +++++++---- ...kManagerComponentsStartupShutdownTest.java | 7 +- .../TaskManagerRegistrationTest.java | 8 +- .../runtime/taskmanager/TaskManagerTest.java | 4 +- .../heartbeat/HeartbeatActorTest.java | 264 ++++++++++++++++++ .../testingUtils/TestingJobManagerLike.scala | 2 +- .../testingUtils/TestingTaskManagerLike.scala | 24 +- .../jobmanager/JobManagerFailsITCase.scala | 4 +- 19 files changed, 975 insertions(+), 109 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/HeartbeatActor.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/Heartbeat.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/HeartbeatResponse.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/HeartbeatTimeout.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/RequestAccumulatorSnapshots.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/UpdateAccumulatorSnapshots.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/heartbeat/HeartbeatActorTest.java diff --git a/docs/setup/config.md b/docs/setup/config.md index c4a7354ea7e77..1d9a999e1f4c9 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -292,6 +292,9 @@ The following parameters configure Flink's JobManager and TaskManagers. - `blob.fetch.num-concurrent`: The number concurrent BLOB fetches (such as JAR file downloads) that the JobManager serves (DEFAULT: **50**). - `blob.fetch.backlog`: The maximum number of queued BLOB fetches (such as JAR file downloads) that the JobManager allows (DEFAULT: **1000**). +- `heartbeat.interval`: Heartbeat interval for Flink's heartbeat mechanism which is used by all TaskManager to monitor the JobManager. The interval is specified in milliseconds (DEFAULT: **5000**). +- `heartbeat.initial-acceptable-pause`: Initial acceptable heartbeat pause after which a heartbeat timeout is sent by Flink's heartbeat mechanism. This heartbeat mechanism is used by all TaskManager to monitor the JobManager. The pause is specified in milliseconds (DEFAULT: **20000**). +- `heartbeat.max-acceptable-pause`: Maximum acceptable heartbeat pause after which a heartbeat timeout is sent by Flink's heartbeat mechanism. The heartbeat pause can be extended in case of stalling due to garbage collection. This heartbeat mechanism is used by all TaskManager to monitor the JobManager. The pause is specified in milliseconds (DEFAULT: **60000**). - `task.cancellation-interval`: Time interval between two successive task cancellation attempts in milliseconds (DEFAULT: **30000**). diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java new file mode 100644 index 0000000000000..2ed8a922fb8ac --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * The set of options relevant for all components + */ +@PublicEvolving +public class ClusterOptions { + + /** + * The heartbeat interval in milliseconds + */ + public static final ConfigOption HEARTBEAT_INTERVAL = ConfigOptions + .key("heartbeat.interval") + .defaultValue(5000L); + + /** + * The initial acceptable heartbeat pause for heartbeat responses in milliseconds + */ + public static final ConfigOption HEARTBEAT_INITIAL_ACCEPTABLE_PAUSE = ConfigOptions + .key("heartbeat.initial-acceptable-pause") + .defaultValue(20000L); + + /** + * The maximum acceptable heartbeat pause for heartbeat responses in milliseconds. + * The acceptable pause can be adapted when the heartbeat mechanism is detects that it got + * delayed, e.g. in case of garbage collection stalls. This options defines the maximum to + * which the heartbeat response pause can be extended. + * + */ + public static final ConfigOption HEARTBEAT_MAX_ACCEPTABLE_PAUSE = ConfigOptions + .key("heartbeat.max-acceptable-pause") + .defaultValue(60000L); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index a6e47481084a6..27a2e6ed43f0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; @@ -54,21 +55,28 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { private final long cleanupInterval; + private final Time heartbeatInterval; + private final Time initialHeartbeatPause; + private final Time maxHeartbeatPause; + private final UnmodifiableConfiguration configuration; private final boolean exitJvmOnOutOfMemory; public TaskManagerConfiguration( - int numberSlots, - String[] tmpDirectories, - Time timeout, - Time maxRegistrationDuration, - Time initialRegistrationPause, - Time maxRegistrationPause, - Time refusedRegistrationPause, - long cleanupInterval, - Configuration configuration, - boolean exitJvmOnOutOfMemory) { + int numberSlots, + String[] tmpDirectories, + Time timeout, + Time maxRegistrationDuration, + Time initialRegistrationPause, + Time maxRegistrationPause, + Time refusedRegistrationPause, + long cleanupInterval, + Time heartbeatInterval, + Time initialHeartbeatPause, + Time maxHeartbeatPause, + Configuration configuration, + boolean exitJvmOnOutOfMemory) { this.numberSlots = numberSlots; this.tmpDirectories = Preconditions.checkNotNull(tmpDirectories); @@ -78,6 +86,11 @@ public TaskManagerConfiguration( this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause); this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause); this.cleanupInterval = Preconditions.checkNotNull(cleanupInterval); + + this.heartbeatInterval = Preconditions.checkNotNull(heartbeatInterval); + this.initialHeartbeatPause = Preconditions.checkNotNull(initialHeartbeatPause); + this.maxHeartbeatPause = Preconditions.checkNotNull(maxHeartbeatPause); + this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration)); this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory; } @@ -110,6 +123,18 @@ public long getCleanupInterval() { return cleanupInterval; } + public Time getHeartbeatInterval() { + return heartbeatInterval; + } + + public Time getInitialHeartbeatPause() { + return initialHeartbeatPause; + } + + public Time getMaxHeartbeatPause() { + return maxHeartbeatPause; + } + @Override public Configuration getConfiguration() { return configuration; @@ -219,6 +244,10 @@ public static TaskManagerConfiguration fromConfiguration(Configuration configura final boolean exitOnOom = configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY); + Time heartbeatInterval = Time.milliseconds(configuration.getLong(ClusterOptions.HEARTBEAT_INTERVAL)); + Time initialHeartbeatPause = Time.milliseconds(configuration.getLong(ClusterOptions.HEARTBEAT_INITIAL_ACCEPTABLE_PAUSE)); + Time maxHeartbeatPause = Time.milliseconds(configuration.getLong(ClusterOptions.HEARTBEAT_MAX_ACCEPTABLE_PAUSE)); + return new TaskManagerConfiguration( numberSlots, tmpDirPaths, @@ -228,6 +257,9 @@ public static TaskManagerConfiguration fromConfiguration(Configuration configura maxRegistrationPause, refusedRegistrationPause, cleanupInterval, + heartbeatInterval, + initialHeartbeatPause, + maxHeartbeatPause, configuration, exitOnOom); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/HeartbeatActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/HeartbeatActor.java new file mode 100644 index 0000000000000..3ee144a5297f7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/HeartbeatActor.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager.heartbeat; + +import akka.actor.ActorRef; +import akka.actor.Cancellable; +import akka.actor.Kill; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; +import org.apache.flink.runtime.taskmanager.heartbeat.messages.Heartbeat; +import org.apache.flink.runtime.taskmanager.heartbeat.messages.HeartbeatResponse; +import org.apache.flink.runtime.taskmanager.heartbeat.messages.HeartbeatTimeout; +import org.apache.flink.runtime.taskmanager.heartbeat.messages.RequestAccumulatorSnapshots; +import org.apache.flink.runtime.taskmanager.heartbeat.messages.UpdateAccumulatorSnapshots; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import scala.concurrent.duration.FiniteDuration; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; + +/** + * Actor which monitors the given heartbeat target by sending constantly heartbeat request to this + * target. If the heartbeat response fails to appear for a given heartbeat pause, then the owner + * of is notified about the heartbeat timeout. + * + * The heartbeat pause can be extended if the actor detects that it has been stalled due to garbage + * collection, for example. + */ +public class HeartbeatActor extends FlinkUntypedActor { + + /** Receiver of the heartbeat timeout and provider of the accumulator snapshots */ + private final ActorRef owner; + + /** Instance id of the owner */ + private final InstanceID instanceId; + + private final ActorRef heartbeatTarget; + + private final UUID leaderId; + + private final FiniteDuration heartbeatInterval; + + /** Initial acceptable heartbeat pause */ + private final long initialHeartbeatPause; + + /** Maximum acceptable heartbeat pause including garbage collection pauses */ + private final long maxHeartbeatPause; + + private final Logger log; + + private Cancellable heartbeatCancellable; + + private Collection lastAccumulatorSnapshot; + + /** Current acceptable heartbeat pause */ + private long acceptableHeartbeatPause; + + /** Time of last sent heartbeat request */ + private long lastHeartbeatRequest; + + /** Time of last received heartbeat response */ + private long lastHeartbeatResponse; + + private boolean running; + + public HeartbeatActor( + ActorRef owner, + InstanceID instanceId, + ActorRef heartbeatTarget, + UUID leaderId, + FiniteDuration heartbeatInterval, + FiniteDuration initialHeartbeatPause, + FiniteDuration maxHeartbeatPause, + Logger log) { + + this.owner = Preconditions.checkNotNull(owner); + this.instanceId = Preconditions.checkNotNull(instanceId); + this.heartbeatTarget = Preconditions.checkNotNull(heartbeatTarget); + this.leaderId = leaderId; + + Preconditions.checkNotNull(heartbeatInterval); + Preconditions.checkArgument(heartbeatInterval.toMillis() > 0L, + "The heartbeat interval must be larger than 0."); + this.heartbeatInterval = heartbeatInterval; + + Preconditions.checkNotNull(initialHeartbeatPause); + Preconditions.checkArgument(initialHeartbeatPause.compare(heartbeatInterval) >= 0, + "The acceptable heartbeat pause should be larger than the heartbeat interval."); + this.initialHeartbeatPause = initialHeartbeatPause.toMillis(); + + Preconditions.checkNotNull(maxHeartbeatPause); + Preconditions.checkArgument(maxHeartbeatPause.compare(initialHeartbeatPause) >= 0, + "The max heartbeat interval must be larger than the heartbeat interval."); + this.maxHeartbeatPause = maxHeartbeatPause.toMillis(); + + this.log = Preconditions.checkNotNull(log); + + lastAccumulatorSnapshot = Collections.emptyList(); + heartbeatCancellable = null; + acceptableHeartbeatPause = 0L; + lastHeartbeatRequest = 0L; + lastHeartbeatResponse = 0L; + + running = true; + } + + @Override + public void preStart() throws Exception { + super.preStart(); + + lastHeartbeatRequest = System.currentTimeMillis(); + lastHeartbeatResponse = lastHeartbeatRequest; + + acceptableHeartbeatPause = initialHeartbeatPause; + + log.info("Start sending heartbeat requests to {}.", heartbeatTarget.path()); + + heartbeatCancellable = getContext().system().scheduler().schedule( + heartbeatInterval, + heartbeatInterval, + getSelf(), + decorateMessage(SendHeartbeat.getInstance()), + getContext().dispatcher(), + getSelf()); + } + + @Override + public void postStop() throws Exception { + log.info("Stop sending heartbeat requests to {}.", heartbeatTarget.path()); + stopHeartbeat(); + + super.postStop(); + } + + @Override + protected void handleMessage(Object message) throws Exception { + if (running) { + if (message instanceof SendHeartbeat) { + // 1. Adapt timeoutValue for garbage collection pause + long currentTime = System.currentTimeMillis(); + long intervalDeviation = Math.max(currentTime - lastHeartbeatRequest - heartbeatInterval.toMillis(), 0L); + + lastHeartbeatRequest = currentTime; + + acceptableHeartbeatPause = Math.min(acceptableHeartbeatPause + intervalDeviation, maxHeartbeatPause); + + // 2. check whether we've timed out + if (currentTime > lastHeartbeatResponse + acceptableHeartbeatPause) { + log.debug("Heartbeat timed out for {}@{}.", leaderId, heartbeatTarget.path()); + + // too bad, the job manager seems to be dead. Let's notify the TM + owner.tell( + decorateMessage(new HeartbeatTimeout(heartbeatTarget, leaderId)), + getSelf()); + } else { + // 1. trigger accumulator update + owner.tell( + decorateMessage(RequestAccumulatorSnapshots.getInstance()), getSelf()); + + // 2. send heartbeat + heartbeatTarget.tell( + decorateMessage(new Heartbeat(instanceId, lastAccumulatorSnapshot)), + getSelf()); + } + } else if (message instanceof HeartbeatResponse) { + // let's update the last seen heartbeat response + lastHeartbeatResponse = System.currentTimeMillis(); + acceptableHeartbeatPause = initialHeartbeatPause; + log.debug("Received heartbeat response for {}.", leaderId); + + } else if (message instanceof UpdateAccumulatorSnapshots) { + UpdateAccumulatorSnapshots updateAccumulatorSnapshots = (UpdateAccumulatorSnapshots) message; + + lastAccumulatorSnapshot = updateAccumulatorSnapshots.getAccumulatorSnapshots(); + } else { + log.warn("Received unknown message {}. Discarding this message.", message); + } + } else { + log.warn("Received message {} after being stopped. Commit suicide.", message); + getSelf().tell(Kill.getInstance(), getSelf()); + } + } + + @Override + protected UUID getLeaderSessionID() { + return leaderId; + } + + private void stopHeartbeat() { + running = false; + + if (heartbeatCancellable != null) { + heartbeatCancellable.cancel(); + heartbeatCancellable = null; + } + } + + private static final class SendHeartbeat implements Serializable, RequiresLeaderSessionID { + + private static final long serialVersionUID = -6248028961658787549L; + + private static final SendHeartbeat INSTANCE = new SendHeartbeat(); + + public static SendHeartbeat getInstance() { + return INSTANCE; + } + + private SendHeartbeat() {} + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/Heartbeat.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/Heartbeat.java new file mode 100644 index 0000000000000..3aeeee0f82c2b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/Heartbeat.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager.heartbeat.messages; + +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.Collection; + +/** + * Heartbeat message sent by the {@link org.apache.flink.runtime.taskmanager.heartbeat.HeartbeatActor} + * to the heartbeat target. + */ +public class Heartbeat implements Serializable { + + private static final long serialVersionUID = -4741472008424382598L; + + /** Instance ID identifying the sender */ + private final InstanceID instanceId; + + /** Heartbeat payload: Set of accumulator snapshots */ + private final Collection accumulatorSnapshots; + + public Heartbeat(InstanceID instanceId, Collection accumulatorSnapshots) { + this.instanceId = Preconditions.checkNotNull(instanceId); + this.accumulatorSnapshots = Preconditions.checkNotNull(accumulatorSnapshots); + } + + public InstanceID getInstanceId() { + return instanceId; + } + + public Collection getAccumulatorSnapshots() { + return accumulatorSnapshots; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/HeartbeatResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/HeartbeatResponse.java new file mode 100644 index 0000000000000..b83ed558849d9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/HeartbeatResponse.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager.heartbeat.messages; + +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; + +import java.io.Serializable; + +/** + * Response to a {@link Heartbeat}. + */ +public class HeartbeatResponse implements Serializable, RequiresLeaderSessionID { + private static final long serialVersionUID = 5395932416824920529L; + + private static final HeartbeatResponse INSTANCE = new HeartbeatResponse(); + + public static HeartbeatResponse getInstance() { + return INSTANCE; + } + + private HeartbeatResponse() {} +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/HeartbeatTimeout.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/HeartbeatTimeout.java new file mode 100644 index 0000000000000..3f001df129e89 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/HeartbeatTimeout.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager.heartbeat.messages; + +import akka.actor.ActorRef; +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.UUID; + +/** + * Indicating that a heartbeat timeout has occurred for the given heartbeat target. + */ +public class HeartbeatTimeout implements Serializable, RequiresLeaderSessionID { + + private static final long serialVersionUID = 5288348091176172575L; + + private final ActorRef target; + + private final UUID leaderId; + + public HeartbeatTimeout(ActorRef target, UUID leaderId) { + this.target = Preconditions.checkNotNull(target); + this.leaderId = leaderId; + } + + public UUID getLeaderId() { + return leaderId; + } + + public ActorRef getTarget() { + return target; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/RequestAccumulatorSnapshots.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/RequestAccumulatorSnapshots.java new file mode 100644 index 0000000000000..9ffbfa4977788 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/RequestAccumulatorSnapshots.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager.heartbeat.messages; + +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; + +import java.io.Serializable; + +/** + * Requests the current accumulator snapshots. The response message is + * {@link UpdateAccumulatorSnapshots}. + */ +public class RequestAccumulatorSnapshots implements Serializable, RequiresLeaderSessionID { + + private static final long serialVersionUID = 2741933908968861202L; + + private static final RequestAccumulatorSnapshots INSTANCE = new RequestAccumulatorSnapshots(); + + public static RequestAccumulatorSnapshots getInstance() { + return INSTANCE; + } + + private RequestAccumulatorSnapshots() {} +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/UpdateAccumulatorSnapshots.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/UpdateAccumulatorSnapshots.java new file mode 100644 index 0000000000000..0d41f0a513ec2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/heartbeat/messages/UpdateAccumulatorSnapshots.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager.heartbeat.messages; + +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.Collection; + +/** + * Response to the {@link RequestAccumulatorSnapshots} message. The message contains the current + * accumulator snapshots. + */ +public class UpdateAccumulatorSnapshots implements Serializable, RequiresLeaderSessionID { + + private static final long serialVersionUID = 8775975021442407735L; + + private final Collection accumulatorSnapshots; + + public UpdateAccumulatorSnapshots(Collection accumulatorSnapshots) { + this.accumulatorSnapshots = Preconditions.checkNotNull(accumulatorSnapshots); + } + + public Collection getAccumulatorSnapshots() { + return accumulatorSnapshots; + } +} diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 57a6415c31975..11149372b6f52 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -65,7 +65,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.Messages.Disconnect import org.apache.flink.runtime.messages.RegistrationMessages._ import org.apache.flink.runtime.messages.{Acknowledge, StackTrace} -import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState import org.apache.flink.runtime.messages.accumulators._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint} @@ -82,6 +81,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration import org.apache.flink.runtime.taskexecutor.TaskExecutor import org.apache.flink.runtime.taskexecutor.TaskExecutor.TASK_MANAGER_NAME import org.apache.flink.runtime.taskmanager.TaskManager +import org.apache.flink.runtime.taskmanager.heartbeat.messages.{Heartbeat, HeartbeatResponse} import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} @@ -1091,12 +1091,15 @@ class JobManager( TaskManagerInstance(Option(instanceManager.getRegisteredInstanceById(instanceID))) ) - case Heartbeat(instanceID, accumulators) => - log.trace(s"Received heartbeat message from $instanceID.") + case msg: Heartbeat => + val heartbeat = msg.asInstanceOf[Heartbeat] + log.trace(s"Received heartbeat message from ${heartbeat.getInstanceId()}.") - updateAccumulators(accumulators) + sender ! decorateMessage(HeartbeatResponse.getInstance()) - instanceManager.reportHeartBeat(instanceID) + updateAccumulators(heartbeat.getAccumulatorSnapshots().asScala) + + instanceManager.reportHeartBeat(heartbeat.getInstanceId()) case message: AccumulatorMessage => handleAccumulatorMessage(message) @@ -1871,7 +1874,7 @@ class JobManager( * * @param accumulators list of accumulator snapshots */ - private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]): Unit = { + private def updateAccumulators(accumulators : Iterable[AccumulatorSnapshot]): Unit = { accumulators.foreach( snapshot => { if (snapshot != null) { currentJobs.get(snapshot.getJobID) match { diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala index 79141f1bf9297..2b3e811536215 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala @@ -36,29 +36,6 @@ object TaskManagerMessages { * @param description The description of the problem */ case class FatalError(description: String, cause: Throwable) - - /** - * Tells the task manager to send a heartbeat message to the job manager. - */ - case object SendHeartbeat { - - /** - * Accessor for the case object instance, to simplify Java interoperability. - * - * @return The SendHeartbeat case object instance. - */ - def get() : SendHeartbeat.type = SendHeartbeat - } - - /** - * Reports liveliness of the TaskManager instance with the given instance ID to the - * This message is sent to the job. - * - * @param instanceID The instance ID of the reporting TaskManager. - * @param accumulators Accumulators of tasks serialized as Tuple2[internal, user-defined] - */ - case class Heartbeat(instanceID: InstanceID, accumulators: Seq[AccumulatorSnapshot]) - // -------------------------------------------------------------------------- // Reporting the current TaskManager stack trace diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index a3110a4909c90..b0f8670a433b7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -28,6 +28,7 @@ import java.util.{Collections, UUID} import _root_.akka.actor._ import _root_.akka.pattern.ask import _root_.akka.util.Timeout +import akka.actor.SupervisorStrategy.{Decider, Restart, Stop} import grizzled.slf4j.Logger import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.flink.api.common.time.Time @@ -68,6 +69,8 @@ import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} +import org.apache.flink.runtime.taskmanager.heartbeat.HeartbeatActor +import org.apache.flink.runtime.taskmanager.heartbeat.messages._ import org.apache.flink.runtime.util._ import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} @@ -163,9 +166,9 @@ class TaskManager( /* The current leading JobManager URL */ private var jobManagerAkkaURL: Option[String] = None - private var instanceID: InstanceID = null + protected var instanceID: InstanceID = null - private var heartbeatScheduler: Option[Cancellable] = None + protected var heartbeatActor: Option[ActorRef] = None var leaderSessionID: Option[UUID] = None @@ -178,6 +181,49 @@ class TaskManager( ResultPartitionConsumableNotifier, TaskManagerActions)] = None + // -------------------------------------------------------------------------- + // Supervisor strategy + // -------------------------------------------------------------------------- + + override val supervisorStrategy = new SupervisorStrategy { + val retriesWindow = (Option(10), Option((1 minute).toMillis.toInt)) + + override def handleChildTerminated( + context: ActorContext, + child: ActorRef, + children: Iterable[ActorRef]) + : Unit = () + + override def processFailure( + context: ActorContext, + restart: Boolean, + child: ActorRef, + cause: Throwable, + stats: ChildRestartStats, + children: Iterable[ChildRestartStats]) + : Unit = { + if (restart && stats.requestRestartPermission(retriesWindow)) { + restartChild(child, cause, suspendFirst = false) + } else { + context.stop(child) + + if (restart) { + // we exceeded our restarts so there seems to be something wrong with us + killTaskManagerFatal("The child actor " + child.path + " could not be restarted.", cause) + } + } + } + + override def decider: Decider = { + case x: ActorInitializationException => Stop + case x: ActorKilledException => Stop + case _: Exception => Restart + case t: Throwable => + killTaskManagerFatal("Encountered error in child actor. This indicates a corrupt state.", t) + Stop + } + } + // -------------------------------------------------------------------------- // Actor messages and life cycle // -------------------------------------------------------------------------- @@ -188,6 +234,8 @@ class TaskManager( * JobManager. */ override def preStart(): Unit = { + super.preStart() + log.info(s"Starting TaskManager actor at ${self.path.toSerializationFormat}.") log.info(s"TaskManager data connection information: $location") log.info(s"TaskManager has $numberOfSlots task slot(s).") @@ -261,6 +309,8 @@ class TaskManager( case t: Exception => log.error("MetricRegistry did not shutdown properly.", t) } + super.postStop() + log.info(s"Task manager ${self.path} is completely shut down.") } @@ -287,7 +337,9 @@ class TaskManager( // ----- miscellaneous messages ---- // periodic heart beats that transport metrics - case SendHeartbeat => sendHeartbeatToJobManager() + case msg: RequestAccumulatorSnapshots => + sender ! decorateMessage( + new UpdateAccumulatorSnapshots(getAccumulatorSnapshots.asJavaCollection)) // sends the stack trace of this TaskManager to the sender case SendStackTrace => sendStackTrace(sender()) @@ -301,8 +353,10 @@ class TaskManager( waitForRegistration += sender } - // this message indicates that some actor watched by this TaskManager has died - case Terminated(actor: ActorRef) => + // this message indicates that the job manager monitored by the heartbeat actor has died + case msg: HeartbeatTimeout => + val heartbeatTimeout = msg.asInstanceOf[HeartbeatTimeout] + val actor = heartbeatTimeout.getTarget() if (isConnected && actor == currentJobManager.orNull) { handleJobManagerDisconnect("JobManager is no longer reachable") triggerTaskManagerRegistration() @@ -980,19 +1034,29 @@ class TaskManager( MetricUtils.instantiateStatusMetrics(taskManagerMetricGroup) MetricUtils.instantiateNetworkMetrics(taskManagerMetricGroup, network) - - // watch job manager to detect when it dies - context.watch(jobManager) - - // schedule regular heartbeat message for oneself - heartbeatScheduler = Some( - context.system.scheduler.schedule( - TaskManager.HEARTBEAT_INTERVAL, - TaskManager.HEARTBEAT_INTERVAL, + + heartbeatActor match { + case Some(actor) => + log.warn("Heartbeat is still active while associating to a new JobManager. Will stop the " + + "previous heartbeat.") + actor ! Kill + case None => + } + + // start the heartbeat actor + heartbeatActor = Some(context.actorOf( + Props( + classOf[HeartbeatActor], self, - decorateMessage(SendHeartbeat) - )(context.dispatcher) - ) + instanceID, + jobManager, + leaderSessionID.orNull, + FiniteDuration(config.getHeartbeatInterval().toMilliseconds, TimeUnit.MILLISECONDS), + FiniteDuration(config.getInitialHeartbeatPause().toMilliseconds, TimeUnit.MILLISECONDS), + FiniteDuration(config.getMaxHeartbeatPause().toMilliseconds, TimeUnit.MILLISECONDS), + log.logger), + "HeartbeatActor@" + leaderSessionID.getOrElse(UUID.randomUUID()) // unique actor name + )) // notify all the actors that listen for a successful registration for (listener <- waitForRegistration) { @@ -1015,16 +1079,12 @@ class TaskManager( log.info("Disassociating from JobManager") - // stop the periodic heartbeats - heartbeatScheduler foreach { - _.cancel() + heartbeatActor match { + case Some(actor) => context.stop(actor) + case None => log.warn("Disassociate from JobManager without having started a heartbeat.") } - heartbeatScheduler = None - // stop the monitoring of the JobManager - currentJobManager foreach { - jm => context.unwatch(jm) - } + heartbeatActor = None // de-register from the JobManager (faster detection of disconnect) currentJobManager foreach { @@ -1332,37 +1392,24 @@ class TaskManager( // Miscellaneous actions // -------------------------------------------------------------------------- - /** - * Sends a heartbeat message to the JobManager (if connected) with the current - * metrics report. - */ - protected def sendHeartbeatToJobManager(): Unit = { - try { - log.debug("Sending heartbeat to JobManager") + protected def getAccumulatorSnapshots: Iterable[AccumulatorSnapshot] = { + log.debug("Create accumulator snaphots.") - val accumulatorEvents = - scala.collection.mutable.Buffer[AccumulatorSnapshot]() + val accumulatorSnapshots = scala.collection.mutable.Buffer[AccumulatorSnapshot]() - runningTasks.asScala foreach { - case (execID, task) => - try { - val registry = task.getAccumulatorRegistry - val accumulators = registry.getSnapshot - accumulatorEvents.append(accumulators) - } catch { - case e: Exception => - log.warn("Failed to take accumulator snapshot for task {}.", - execID, ExceptionUtils.getRootCause(e)) - } - } - - currentJobManager foreach { - jm => jm ! decorateMessage(Heartbeat(instanceID, accumulatorEvents)) + for (task <- runningTasks.values().asScala) { + try { + val registry = task.getAccumulatorRegistry() + val accumulatorSnapshot = registry.getSnapshot() + accumulatorSnapshots.append(accumulatorSnapshot) + } catch { + case e: Exception => + log.warn("Failed to take accumulator snapshot for task {}.", + task.getExecutionId(), ExceptionUtils.getRootCause(e)) } } - catch { - case e: Exception => log.warn("Error sending the metric heartbeat to the JobManager", e) - } + + accumulatorSnapshots } /** @@ -1507,9 +1554,6 @@ object TaskManager { * connection attempts */ val STARTUP_CONNECT_LOG_SUPPRESS = 10000L - val HEARTBEAT_INTERVAL: FiniteDuration = 5000 milliseconds - - // -------------------------------------------------------------------------- // TaskManager standalone entry point // -------------------------------------------------------------------------- @@ -1985,4 +2029,16 @@ object TaskManager { throw new IOException("Could not connect to TaskManager at " + taskManagerUrl, e) } } + val heartbeatInterval = configuration.getLong(ClusterOptions.HEARTBEAT_INTERVAL) + val initialHeartbeatPause = configuration.getLong( + ClusterOptions.HEARTBEAT_INITIAL_ACCEPTABLE_PAUSE) + // make sure that the max acceptable heartbeat pause is larger or equal than the initial + // heartbeat pause + val maxHeartbeatPause = math.max( + configuration.getLong(ClusterOptions.HEARTBEAT_MAX_ACCEPTABLE_PAUSE), + initialHeartbeatPause) + + FiniteDuration(heartbeatInterval, TimeUnit.MILLISECONDS), + FiniteDuration(initialHeartbeatPause, TimeUnit.MILLISECONDS), + FiniteDuration(maxHeartbeatPause, TimeUnit.MILLISECONDS)) } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 2a4c0365d034c..200a602db1f85 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -117,9 +117,12 @@ public void testComponentsStartupShutdown() throws Exception { Time.milliseconds(500), Time.seconds(30), Time.seconds(10), - 1000000, // cleanup interval + 1000000, // cleanup interval, + Time.milliseconds(5000), + Time.milliseconds(10000), + Time.milliseconds(30000), config, - false); // exit-jvm-on-fatal-error + false); final int networkBufNum = 32; // note: the network buffer memory configured here is not actually used below but set diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index 92de31ac531d5..9039f3a92423f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -23,6 +23,7 @@ import akka.actor.InvalidActorNameException; import akka.actor.Terminated; import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -89,9 +90,8 @@ public class TaskManagerRegistrationTest extends TestLogger { public static void startActorSystem() { config = new Configuration(); config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms"); - config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s"); - config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0); + config.setLong(ClusterOptions.HEARTBEAT_INTERVAL, 200L); + config.setLong(ClusterOptions.HEARTBEAT_INITIAL_ACCEPTABLE_PAUSE, 2000L); actorSystem = AkkaUtils.createLocalActorSystem(config); } @@ -547,7 +547,7 @@ protected void run() { } Terminated terminatedMessage = (Terminated) message; - assertEquals(gateway.actor(), terminatedMessage.actor()); + assertEquals(gateway.actor(), terminatedMessage.getActor()); } }; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index e790ea814e63a..708bd6f3d4c4c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -244,7 +244,7 @@ protected void run() { if (message.equals(toRunning)) { break; } - else if (!(message instanceof TaskManagerMessages.Heartbeat)) { + else { fail("Unexpected message: " + message); } } while (System.currentTimeMillis() < deadline); @@ -255,7 +255,7 @@ else if (!(message instanceof TaskManagerMessages.Heartbeat)) { if (message.equals(toFinished)) { break; } - else if (!(message instanceof TaskManagerMessages.Heartbeat)) { + else { fail("Unexpected message: " + message); } } while (System.currentTimeMillis() < deadline); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/heartbeat/HeartbeatActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/heartbeat/HeartbeatActorTest.java new file mode 100644 index 0000000000000..83c8d84bf13c8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/heartbeat/HeartbeatActorTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager.heartbeat; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.taskmanager.heartbeat.messages.Heartbeat; +import org.apache.flink.runtime.taskmanager.heartbeat.messages.HeartbeatResponse; +import org.apache.flink.runtime.taskmanager.heartbeat.messages.HeartbeatTimeout; +import org.apache.flink.runtime.taskmanager.heartbeat.messages.RequestAccumulatorSnapshots; +import org.apache.flink.runtime.taskmanager.heartbeat.messages.UpdateAccumulatorSnapshots; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class HeartbeatActorTest extends TestLogger { + + private static ActorSystem actorsSystem; + + private static final FiniteDuration timeout = new FiniteDuration(20L, TimeUnit.SECONDS); + + private static final Logger LOG = LoggerFactory.getLogger(HeartbeatActorTest.class); + + @BeforeClass + public static void setup() { + actorsSystem = AkkaUtils.createActorSystem(TestingUtils.getDefaultTestingActorSystemConfig()); + } + + @AfterClass + public static void tearDown() { + actorsSystem.shutdown(); + actorsSystem.awaitTermination(timeout); + } + + /** + * Test whether the heartbeat actor sends heartbeat requests to the heartbeat target. + */ + @Test + public void testHeartbeat() throws IOException { + new JavaTestKit(actorsSystem) {{ + final FiniteDuration heartbeatInterval = new FiniteDuration(20L, TimeUnit.MILLISECONDS); + final FiniteDuration initialHeartbeatPause = new FiniteDuration(100L, TimeUnit.MILLISECONDS); + final FiniteDuration maxHeartbeatPause = initialHeartbeatPause; + final int numHeartbeats = 10; + + + final UUID leaderId = UUID.randomUUID(); + final InstanceID instanceId = new InstanceID(); + + final ActorRef forwardingActor = actorsSystem.actorOf( + Props.create( + TestingUtils.ForwardingActor.class, + getRef(), + Option.apply(leaderId)), + "ForwardingActor"); + + final TestProbe taskManager = TestProbe.apply(actorsSystem); + + final JobID jobId = new JobID(); + final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID(); + + final Collection accumulatorSnapshots = Arrays.asList( + new AccumulatorSnapshot( + jobId, + executionAttemptId, + Collections.>emptyMap())); + + final ActorRef heartbeatActor = actorsSystem.actorOf( + Props.create( + HeartbeatActor.class, + taskManager.ref(), + instanceId, + forwardingActor, + leaderId, + heartbeatInterval, + initialHeartbeatPause, + maxHeartbeatPause, + log), + "heartbeatActor"); + + try { + final Deadline deadline = timeout.fromNow(); + + heartbeatActor.tell(new JobManagerMessages.LeaderSessionMessage(leaderId, new UpdateAccumulatorSnapshots(accumulatorSnapshots)), getRef()); + + for (int i = 0; i < numHeartbeats && deadline.hasTimeLeft(); ++i) { + JobManagerMessages.LeaderSessionMessage message = taskManager.expectMsgClass( + deadline.timeLeft(), + JobManagerMessages.LeaderSessionMessage.class); + assertTrue(message.message() instanceof RequestAccumulatorSnapshots); + + Heartbeat heartbeat = expectMsgClass( + deadline.timeLeft(), + Heartbeat.class); + + assertEquals(instanceId, heartbeat.getInstanceId()); + assertEquals(accumulatorSnapshots, heartbeat.getAccumulatorSnapshots()); + + getLastSender().tell( + new JobManagerMessages.LeaderSessionMessage( + leaderId, + HeartbeatResponse.getInstance()), + getRef()); + } + } finally { + TestingUtils.stopActor(forwardingActor); + TestingUtils.stopActor(heartbeatActor); + } + }}; + } + + /** + * Test that the heartbeat actor throws a HeartbeatTimeout if the heartbeat target stops sending + * heartbeat responses. + */ + @Test + public void testTimeout() throws IOException, InterruptedException { + new JavaTestKit(actorsSystem) {{ + final FiniteDuration heartbeatInterval = new FiniteDuration(20L, TimeUnit.MILLISECONDS); + final FiniteDuration initialHeartbeatPause = new FiniteDuration(500L, TimeUnit.MILLISECONDS); + final FiniteDuration maxHeartbeatPause = initialHeartbeatPause; + + final UUID leaderId = UUID.randomUUID(); + final InstanceID instanceId = new InstanceID(); + + final ActorRef simpleHeartbeatResponder = actorsSystem.actorOf( + Props.create( + SimpleHeartbeatResponder.class, + leaderId), + "HeartbeatResponder"); + + final ActorRef forwardingActor = actorsSystem.actorOf( + Props.create( + TestingUtils.ForwardingActor.class, + getRef(), + Option.apply(leaderId)), + "ForwardingActor"); + + final JobID jobId = new JobID(); + final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID(); + + final Collection accumulatorSnapshots = Arrays.asList( + new AccumulatorSnapshot( + jobId, + executionAttemptId, + Collections.>emptyMap())); + + final ActorRef heartbeatActor = actorsSystem.actorOf( + Props.create( + HeartbeatActor.class, + forwardingActor, + instanceId, + simpleHeartbeatResponder, + leaderId, + heartbeatInterval, + initialHeartbeatPause, + maxHeartbeatPause, + log), + "heartbeatActor"); + + try { + heartbeatActor.tell(new JobManagerMessages.LeaderSessionMessage(leaderId, new UpdateAccumulatorSnapshots(accumulatorSnapshots)), getRef()); + + FiniteDuration maxTimeForTimeoutMessage = new FiniteDuration(initialHeartbeatPause.toMillis() * 2L, TimeUnit.MILLISECONDS); + + Deadline deadline = maxTimeForTimeoutMessage.fromNow(); + + while (deadline.hasTimeLeft()) { + // receive the request accumulator requests + expectMsgClass(deadline.timeLeft(), RequestAccumulatorSnapshots.class); + } + + TestingUtils.stopActor(simpleHeartbeatResponder); + + deadline = maxTimeForTimeoutMessage.fromNow(); + + HeartbeatTimeout heartbeatTimeout = null; + + while(deadline.hasTimeLeft()) { + Object msg = receiveOne(deadline.timeLeft()); + + if (msg instanceof HeartbeatTimeout) { + heartbeatTimeout = (HeartbeatTimeout) msg; + break; + } + } + + assertNotNull(heartbeatTimeout); + + assertEquals(leaderId, heartbeatTimeout.getLeaderId()); + + } finally { + TestingUtils.stopActor(forwardingActor); + TestingUtils.stopActor(heartbeatActor); + } + }}; + } + + private static class SimpleHeartbeatResponder extends FlinkUntypedActor { + + private final UUID leaderId; + + public SimpleHeartbeatResponder(UUID leaderId) { + this.leaderId = leaderId; + } + + @Override + protected void handleMessage(Object message) throws Exception { + if (message instanceof Heartbeat) { + getSender().tell(new JobManagerMessages.LeaderSessionMessage(leaderId, HeartbeatResponse.getInstance()), getSelf()); + } + } + + @Override + protected UUID getLeaderSessionID() { + return leaderId; + } + } +} diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala index a3d31f595ef34..bfe0155ea1dc3 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -32,7 +32,7 @@ import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps} import org.apache.flink.runtime.messages.Messages.Disconnect import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager -import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat +import org.apache.flink.runtime.taskmanager.heartbeat.messages.Heartbeat import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingMessages._ import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala index 9db11d79ac030..c9752c7ba7d8c 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala @@ -31,6 +31,7 @@ import org.apache.flink.runtime.messages.Messages.Disconnect import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered} import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, TaskInFinalState, UpdateTaskExecutionState} import org.apache.flink.runtime.taskmanager.TaskManager +import org.apache.flink.runtime.taskmanager.heartbeat.messages.{Heartbeat, HeartbeatResponse, HeartbeatTimeout} import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved import org.apache.flink.runtime.testingUtils.TestingMessages._ import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._ @@ -158,23 +159,36 @@ trait TestingTaskManagerLike extends FlinkActor { /** * Message from task manager that accumulator values changed and need to be reported immediately * instead of lazily through the - * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this - * message to the job manager that it knows it should report to the listeners. + * [[org.apache.flink.runtime.taskmanager.heartbeat.messages.Heartbeat]] message. We forward + * this message to the job manager that it knows it should report to the listeners. */ case msg: AccumulatorsChanged => currentJobManager match { case Some(jobManager) => jobManager.forward(msg) - sendHeartbeatToJobManager() + val accumulatorSnapshots = getAccumulatorSnapshots + jobManager ! decorateMessage( + new Heartbeat(instanceID, accumulatorSnapshots.asJavaCollection)) sender ! true case None => } - case msg@Terminated(jobManager) => + case msg: HeartbeatResponse => + // forward the heartbeat response in case of a manual heartbeat triggered by + // AccumulatorsChanged + heartbeatActor match { + case Some(actor) => actor.tell(decorateMessage(msg), sender) + case None => log.warn("There is no heartbeat active at the moment.") + } + + + case msg: HeartbeatTimeout => + + val heartbeatTimeout = msg.asInstanceOf[HeartbeatTimeout] val currentJM = currentJobManager.getOrElse(ActorRef.noSender) - val leaderId = if (jobManager.equals(currentJM)) { + val leaderId = if (heartbeatTimeout.getTarget().equals(currentJM)) { leaderSessionID } else { None diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala index e5f26c54e9676..3eb3806991176 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala @@ -20,7 +20,7 @@ package org.apache.flink.api.scala.runtime.jobmanager import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.configuration.{ClusterOptions, ConfigConstants, Configuration} import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex} import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable} @@ -133,6 +133,8 @@ class JobManagerFailsITCase(_system: ActorSystem) def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = { val config = new Configuration() + config.setLong(ClusterOptions.HEARTBEAT_INTERVAL, 50L); + config.setLong(ClusterOptions.HEARTBEAT_INITIAL_ACCEPTABLE_PAUSE, 500L); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers) From 6cd1b4cda20d47dbd9d71a4d594d3ce7ac9925f2 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 21 Feb 2017 16:32:21 +0100 Subject: [PATCH 2/2] Harden HeartbeatActorTest.testTimeout by increasing the maximum acceptable heartbeat pause --- .../flink/runtime/taskmanager/TaskManager.scala | 12 ------------ .../taskmanager/heartbeat/HeartbeatActorTest.java | 2 +- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index b0f8670a433b7..27d4151b3345f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -2029,16 +2029,4 @@ object TaskManager { throw new IOException("Could not connect to TaskManager at " + taskManagerUrl, e) } } - val heartbeatInterval = configuration.getLong(ClusterOptions.HEARTBEAT_INTERVAL) - val initialHeartbeatPause = configuration.getLong( - ClusterOptions.HEARTBEAT_INITIAL_ACCEPTABLE_PAUSE) - // make sure that the max acceptable heartbeat pause is larger or equal than the initial - // heartbeat pause - val maxHeartbeatPause = math.max( - configuration.getLong(ClusterOptions.HEARTBEAT_MAX_ACCEPTABLE_PAUSE), - initialHeartbeatPause) - - FiniteDuration(heartbeatInterval, TimeUnit.MILLISECONDS), - FiniteDuration(initialHeartbeatPause, TimeUnit.MILLISECONDS), - FiniteDuration(maxHeartbeatPause, TimeUnit.MILLISECONDS)) } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/heartbeat/HeartbeatActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/heartbeat/HeartbeatActorTest.java index 83c8d84bf13c8..f9921d2348c88 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/heartbeat/HeartbeatActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/heartbeat/HeartbeatActorTest.java @@ -163,7 +163,7 @@ public void testTimeout() throws IOException, InterruptedException { new JavaTestKit(actorsSystem) {{ final FiniteDuration heartbeatInterval = new FiniteDuration(20L, TimeUnit.MILLISECONDS); final FiniteDuration initialHeartbeatPause = new FiniteDuration(500L, TimeUnit.MILLISECONDS); - final FiniteDuration maxHeartbeatPause = initialHeartbeatPause; + final FiniteDuration maxHeartbeatPause = new FiniteDuration(2000L, TimeUnit.MILLISECONDS); final UUID leaderId = UUID.randomUUID(); final InstanceID instanceId = new InstanceID();