From aafd9020eae2769fd0ade9d922683aac2886412a Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 5 Oct 2016 17:02:06 +0200 Subject: [PATCH] [FLINK-4343] [tm] Implement TM -> JM registration logic Upon requesting a slot for a new job, the TaskManager registers this job at the JobLeaderService. The job leader service is responsible to monitor job leader changes for all registered jobs. In case of a new job leader, the service will try to establish a connection to the new job leader. Upon establishing the connection the task manager is informed about it. The task manager will then offer all allocated but not yet active slots to the new job leader. Implement JobLeaderService The JobLeaderService is responsible for establishing a connection to the JM leader of a given job. Disable TaskExecutorTest#testRejectAllocationRequestsForOutOfSyncSlots Add simple task submission test Add job leader detection test case Add task slot acceptance test Fix RpcCompletenessTest Add comments --- .../org/apache/flink/util/ReflectionUtil.java | 110 ++++ .../deployment/TaskDeploymentDescriptor.java | 11 +- .../executiongraph/ExecutionVertex.java | 2 + .../HighAvailabilityServices.java | 3 +- .../highavailability/NonHaServices.java | 4 +- .../highavailability/ZookeeperHaServices.java | 2 +- .../jobmaster/JMTMRegistrationSuccess.java | 45 ++ .../flink/runtime/jobmaster/JobMaster.java | 19 + .../runtime/jobmaster/JobMasterGateway.java | 36 ++ .../registration/RegisteredRpcConnection.java | 2 +- .../resourcemanager/ResourceManager.java | 2 +- .../slotmanager/SlotManager.java | 8 +- .../taskexecutor/JobLeaderListener.java | 60 ++ .../taskexecutor/JobLeaderService.java | 390 +++++++++++++ .../taskexecutor/JobManagerConnection.java | 23 +- .../runtime/taskexecutor/JobManagerTable.java | 59 ++ .../runtime/taskexecutor/TaskExecutor.java | 522 +++++++++++++----- .../taskexecutor/TaskExecutorGateway.java | 25 +- ...skExecutorToResourceManagerConnection.java | 5 + .../taskexecutor/TaskManagerRunner.java | 2 + .../taskexecutor/TaskManagerServices.java | 24 +- .../exceptions/SlotAllocationException.java | 39 ++ .../taskexecutor/slot/TaskSlotTable.java | 39 +- .../flink/runtime/taskmanager/Task.java | 2 +- .../TaskDeploymentDescriptorTest.java | 6 +- .../TestingHighAvailabilityServices.java | 2 +- .../metrics/groups/TaskManagerGroupTest.java | 10 +- .../slotmanager/SlotManagerTest.java | 2 +- .../slotmanager/SlotProtocolTest.java | 8 +- .../runtime/rpc/RpcCompletenessTest.java | 29 +- .../taskexecutor/TaskExecutorTest.java | 431 ++++++++++++++- .../taskmanager/TaskAsyncCallTest.java | 3 +- .../runtime/taskmanager/TaskManagerTest.java | 75 +-- .../runtime/taskmanager/TaskStopTest.java | 2 +- .../flink/runtime/taskmanager/TaskTest.java | 23 +- .../tasks/InterruptSensitiveRestoreTest.java | 38 +- .../runtime/tasks/StreamTaskTest.java | 25 +- 37 files changed, 1805 insertions(+), 283 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JMTMRegistrationSuccess.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java diff --git a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java index b851ebacdfd88..2883570400846 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java @@ -23,6 +23,9 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; @Internal public final class ReflectionUtil { @@ -150,6 +153,113 @@ public static Class[] getTemplateTypes(Class clazz) { return types; } + /** + * Extract the full template type information from the given type's template parameter at the + * given position. + * + * @param type type to extract the full template parameter information from + * @param templatePosition describing at which position the template type parameter is + * @return Full type information describing the template parameter's type + */ + public static FullTypeInfo getFullTemplateType(Type type, int templatePosition) { + if (type instanceof ParameterizedType) { + return getFullTemplateType(((ParameterizedType) type).getActualTypeArguments()[templatePosition]); + } else { + throw new IllegalArgumentException(); + } + } + + /** + * Extract the full type information from the given type. + * + * @param type to be analyzed + * @return Full type information describing the given type + */ + public static FullTypeInfo getFullTemplateType(Type type) { + if (type instanceof ParameterizedType) { + ParameterizedType parameterizedType = (ParameterizedType) type; + + FullTypeInfo[] templateTypeInfos = new FullTypeInfo[parameterizedType.getActualTypeArguments().length]; + + for (int i = 0; i < parameterizedType.getActualTypeArguments().length; i++) { + templateTypeInfos[i] = getFullTemplateType(parameterizedType.getActualTypeArguments()[i]); + } + + return new FullTypeInfo((Class)parameterizedType.getRawType(), templateTypeInfos); + } else { + return new FullTypeInfo((Class) type, null); + } + } + + /** + * Container for the full type information of a type. This means that it contains the + * {@link Class} object and for each template parameter it contains a full type information + * describing the type. + */ + public static class FullTypeInfo { + private final Class clazz; + private final FullTypeInfo[] templateTypeInfos; + + + public FullTypeInfo(Class clazz, FullTypeInfo[] templateTypeInfos) { + this.clazz = Preconditions.checkNotNull(clazz); + this.templateTypeInfos = templateTypeInfos; + } + + public Class getClazz() { + return clazz; + } + + public FullTypeInfo[] getTemplateTypeInfos() { + return templateTypeInfos; + } + + public Iterator> getClazzIterator() { + UnionIterator> unionIterator = new UnionIterator<>(); + + unionIterator.add(Collections.>singleton(clazz).iterator()); + + if (templateTypeInfos != null) { + for (int i = 0; i < templateTypeInfos.length; i++) { + unionIterator.add(templateTypeInfos[i].getClazzIterator()); + } + } + + return unionIterator; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + + builder.append(clazz.getSimpleName()); + + if (templateTypeInfos != null) { + builder.append("<"); + + for (int i = 0; i < templateTypeInfos.length - 1; i++) { + builder.append(templateTypeInfos[i]).append(", "); + } + + builder.append(templateTypeInfos[templateTypeInfos.length - 1]); + builder.append(">"); + } + + return builder.toString(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof FullTypeInfo) { + FullTypeInfo other = (FullTypeInfo) obj; + + return clazz == other.getClazz() && Arrays.equals(templateTypeInfos, other.getTemplateTypeInfos()); + } else { + return false; + } + } + } + /** * Private constructor to prevent instantiation. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index b1ac665f59da5..884d6321010da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -59,7 +59,7 @@ public final class TaskDeploymentDescriptor implements Serializable { private final ExecutionAttemptID executionId; /** The allocation ID of the slot in which the task shall be run */ - private final AllocationID allocationID; + private final AllocationID allocationId; /** The task's name. */ private final String taskName; @@ -115,6 +115,7 @@ public final class TaskDeploymentDescriptor implements Serializable { */ public TaskDeploymentDescriptor( JobID jobID, + AllocationID allocationId, String jobName, JobVertexID vertexID, ExecutionAttemptID executionId, @@ -142,6 +143,7 @@ public TaskDeploymentDescriptor( checkArgument(attemptNumber >= 0); this.jobID = checkNotNull(jobID); + this.allocationId = checkNotNull(allocationId); this.jobName = checkNotNull(jobName); this.vertexID = checkNotNull(vertexID); this.executionId = checkNotNull(executionId); @@ -162,11 +164,11 @@ public TaskDeploymentDescriptor( this.operatorState = operatorState; this.keyGroupState = keyGroupState; this.partitionableOperatorState = partitionableOperatorStateHandles; - this.allocationID = new AllocationID(); } public TaskDeploymentDescriptor( JobID jobID, + AllocationID allocationId, String jobName, JobVertexID vertexID, ExecutionAttemptID executionId, @@ -187,6 +189,7 @@ public TaskDeploymentDescriptor( this( jobID, + allocationId, jobName, vertexID, executionId, @@ -327,8 +330,8 @@ public List getRequiredClasspaths() { return requiredClasspaths; } - public AllocationID getAllocationID() { - return allocationID; + public AllocationID getAllocationId() { + return allocationId; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 4837803cd911f..708a72c8ab039 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor; @@ -666,6 +667,7 @@ TaskDeploymentDescriptor createDeploymentDescriptor( return new TaskDeploymentDescriptor( getJobId(), + new AllocationID(), // TODO: Obtain the proper allocation id from the slot getExecutionGraph().getJobName(), getJobvertexId(), executionId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index 5d78ffca28d94..484cddbda60fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -49,10 +49,11 @@ public interface HighAvailabilityServices { * Gets the leader retriever for the job JobMaster which is responsible for the given job * * @param jobID The identifier of the job. + * @param defaultAddress address under which the job manager is reachable * @return * @throws Exception */ - LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception; + LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception; /** * Gets the leader election service for the cluster's resource manager. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index d7fd2bfb68fcd..1c73c01d1d8aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -82,8 +82,8 @@ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Excepti } @Override - public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { - return new StandaloneLeaderRetrievalService(jobMastersAddress.get(jobID), new UUID(0, 0)); + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception { + return new StandaloneLeaderRetrievalService(defaultAddress, new UUID(0, 0)); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java index 3a7736b91ca2a..bbe8ecb78e7e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -111,7 +111,7 @@ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Excepti } @Override - public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JMTMRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JMTMRegistrationSuccess.java new file mode 100644 index 0000000000000..7272cd4840afe --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JMTMRegistrationSuccess.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.util.Preconditions; + +public class JMTMRegistrationSuccess extends RegistrationResponse.Success { + private static final long serialVersionUID = -3528383155961318929L; + + private final ResourceID resourceID; + private final int blobPort; + + public JMTMRegistrationSuccess(ResourceID resourceID, int blobPort) { + Preconditions.checkArgument(0 < blobPort && 65536 > blobPort, "The blob port has to be 0 < blobPort < 65536."); + + this.resourceID = Preconditions.checkNotNull(resourceID); + this.blobPort = blobPort; + } + + public ResourceID getResourceID() { + return resourceID; + } + + public int getBlobPort() { + return blobPort; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index e11f3a1c5b02c..a7be47677799b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; @@ -646,6 +647,24 @@ public ClassloadingProps requestClassloadingProps() throws Exception { executionGraph.getRequiredClasspaths()); } + @RpcMethod + public Iterable offerSlots(final Iterable slots, UUID leaderId) { + throw new UnsupportedOperationException("Has to be implemented."); + } + + @RpcMethod + public void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause) { + throw new UnsupportedOperationException("Has to be implemented."); + } + + @RpcMethod + public RegistrationResponse registerTaskManager( + final String taskManagerAddress, + final ResourceID taskManagerProcessId, + final UUID leaderId) { + throw new UnsupportedOperationException("Has to be implemented."); + } + //---------------------------------------------------------------------------------------------- // Internal methods //---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index b27b41c2a7a75..0f155a468d8d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -35,6 +36,7 @@ import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KvState; @@ -170,4 +172,38 @@ void notifyKvStateUnregistered( * Request the classloading props of this job. */ Future requestClassloadingProps(); + + /** + * Offer the given slots to the job manager. The response contains the set of accepted slots. + * + * @param slots to offer to the job manager + * @param leaderId identifying the job leader + * @param timeout for the rpc call + * @return Future set of accepted slots. + */ + Future> offerSlots(final Iterable slots, UUID leaderId, @RpcTimeout final Time timeout); + + /** + * Fail the slot with the given allocation id and cause. + * + * @param allocationId identifying the slot to fail + * @param leaderId identifying the job leader + * @param cause of the failing + */ + void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause); + + /** + * Register the task manager at the job manager. + * + * @param taskManagerAddress address of the task manager + * @param taskManagerProcessId identifying the task manager + * @param leaderId identifying the job leader + * @param timeout for the rpc call + * @return Future registration response indicating whether the registration was successful or not + */ + Future registerTaskManager( + final String taskManagerAddress, + final ResourceID taskManagerProcessId, + final UUID leaderId, + @RpcTimeout final Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java index 76093b0fea26b..78d4dbc914baa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java @@ -35,7 +35,7 @@ /** * This utility class implements the basis of RPC connecting from one component to another component, * for example the RPC connection from TaskExecutor to ResourceManager. - * This {@code RegisteredRpcConnection} implements registration and get target gateway . + * This {@code RegisteredRpcConnection} implements registration and get target gateway. * *

The registration gives access to a future that is completed upon successful registration. * The RPC connection can be closed, for example when the target where it tries to register diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 31228046fef3a..6f6d525b11cc8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -188,7 +188,7 @@ public Future registerJobMaster( } else { try { LeaderRetrievalService jobMasterLeaderRetriever = - highAvailabilityServices.getJobManagerLeaderRetriever(jobID); + highAvailabilityServices.getJobManagerLeaderRetriever(jobID, jobMasterAddress); jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever); } catch (Exception e) { log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index e312ea26e0d6f..f055971c4bec5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -298,7 +298,13 @@ private void sendSlotRequest(final ResourceSlot freeSlot, final SlotRequest slot final TaskExecutorRegistration registration = freeSlot.getTaskExecutorRegistration(); final Future slotRequestReplyFuture = registration.getTaskExecutorGateway() - .requestSlot(freeSlot.getSlotId(), allocationID, rmServices.getLeaderID(), timeout); + .requestSlot( + freeSlot.getSlotId(), + slotRequest.getJobId(), + allocationID, + "foobar", // TODO: set proper JM address + rmServices.getLeaderID(), + timeout); slotRequestReplyFuture.handleAsync(new BiFunction() { @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java new file mode 100644 index 0000000000000..f02a8c23694e7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; + +import java.util.UUID; + +/** + * Listener for the {@link JobLeaderService}. The listener is notified whenever a job manager + * gained leadership for a registered job and the service could establish a connection to it. + * Furthermore, the listener is notified when a job manager loses leadership for a job. In case + * of an error, the {@link #handleError(Throwable)}} is called. + */ +public interface JobLeaderListener { + + /** + * Callback if a job manager has gained leadership for the job identified by the job id and a + * connection could be established to this job manager. + * + * @param jobId identifying the job for which the job manager has gained leadership + * @param jobManagerGateway to the job leader + * @param jobLeaderId new leader id of the job leader + * @param registrationMessage containing further registration information + */ + void jobManagerGainedLeadership(JobID jobId, JobMasterGateway jobManagerGateway, UUID jobLeaderId, JMTMRegistrationSuccess registrationMessage); + + /** + * Callback if the job leader for the job with the given job id lost its leadership. + * + * @param jobId identifying the job whose leader has lost leadership + * @param jobLeaderId old leader id + */ + void jobManagerLostLeadership(JobID jobId, UUID jobLeaderId); + + /** + * Callback for errors which might occur in the {@link JobLeaderService}. + * + * @param throwable cause + */ + void handleError(Throwable throwable); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java new file mode 100644 index 0000000000000..9e7134980c0fa --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.registration.RegisteredRpcConnection; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.registration.RetryingRegistration; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executor; + +/** + * This service has the responsibility to monitor the job leaders (the job manager which is leader + * for a given job) for all registered jobs. Upon gaining leadership for a job and detection by the + * job leader service, the service tries to establish a connection to the job leader. After + * successfully establishing a connection, the job leader listener is notified about the new job + * leader and its connection. In case that a job leader loses leadership, the job leader listener + * is notified as well. + */ +public class JobLeaderService { + + private static final Logger LOG = LoggerFactory.getLogger(JobLeaderService.class); + + /** Process id of the owning process */ + private final ResourceID ownerProcessId; + + /** The leader retrieval service and listener for each registered job */ + private final Map> jobLeaderServices; + + /** Internal state of the service */ + private volatile JobLeaderService.State state; + + /** Address of the owner of this service. This address is used for the job manager connection */ + private String ownerAddress; + + /** Rpc service to use for establishing connections */ + private RpcService rpcService; + + /** High availability services to create the leader retrieval services from */ + private HighAvailabilityServices highAvailabilityServices; + + /** Job leader listener listening for job leader changes */ + private JobLeaderListener jobLeaderListener; + + public JobLeaderService(ResourceID ownerProcessId) { + this.ownerProcessId = Preconditions.checkNotNull(ownerProcessId); + + jobLeaderServices = new HashMap<>(4); + + state = JobLeaderService.State.CREATED; + + ownerAddress = null; + rpcService = null; + highAvailabilityServices = null; + jobLeaderListener = null; + } + + // ------------------------------------------------------------------------------- + // Methods + // ------------------------------------------------------------------------------- + + /** + * Start the job leader service with the given services. + * + * @param initialOwnerAddress to be used for establishing connections (source address) + * @param initialRpcService to be used to create rpc connections + * @param initialHighAvailabilityServices to create leader retrieval services for the different jobs + * @param initialJobLeaderListener listening for job leader changes + */ + public void start( + final String initialOwnerAddress, + final RpcService initialRpcService, + final HighAvailabilityServices initialHighAvailabilityServices, + final JobLeaderListener initialJobLeaderListener) { + + if (JobLeaderService.State.CREATED != state) { + throw new IllegalStateException("The service has already been started."); + } else { + LOG.info("Start job leader service."); + + this.ownerAddress = Preconditions.checkNotNull(initialOwnerAddress); + this.rpcService = Preconditions.checkNotNull(initialRpcService); + this.highAvailabilityServices = Preconditions.checkNotNull(initialHighAvailabilityServices); + this.jobLeaderListener = Preconditions.checkNotNull(initialJobLeaderListener); + state = JobLeaderService.State.STARTED; + } + } + + /** + * Stop the job leader services. This implies stopping all leader retrieval services for the + * different jobs and their leader retrieval listeners. + * + * @throws Exception if an error occurs while stopping the service + */ + public void stop() throws Exception { + LOG.info("Stop job leader service."); + + if (JobLeaderService.State.STARTED == state) { + + for (Tuple2 leaderRetrievalServiceEntry: jobLeaderServices.values()) { + LeaderRetrievalService leaderRetrievalService = leaderRetrievalServiceEntry.f0; + JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = leaderRetrievalServiceEntry.f1; + + jobManagerLeaderListener.stop(); + leaderRetrievalService.stop(); + } + + jobLeaderServices.clear(); + } + + state = JobLeaderService.State.STOPPED; + } + + /** + * Check whether the service monitors the given job. + * + * @param jobId identifying the job + * @return True if the given job is monitored; otherwise false + */ + public boolean containsJob(JobID jobId) { + Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running."); + + return jobLeaderServices.containsKey(jobId); + } + + /** + * Remove the given job from being monitored by the job leader service. + * + * @param jobId identifying the job to remove from monitoring + * @throws Exception if an error occurred while stopping the leader retrieval service and listener + */ + public void removeJob(JobID jobId) throws Exception { + Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running."); + + Tuple2 entry = jobLeaderServices.remove(jobId); + + if (entry != null) { + LOG.info("Remove job {} from job leader monitoring.", jobId); + + LeaderRetrievalService leaderRetrievalService = entry.f0; + JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = entry.f1; + + leaderRetrievalService.stop(); + jobManagerLeaderListener.stop(); + } + } + + /** + * Add the given job to be monitored. This means that the service tries to detect leaders for + * this job and then tries to establish a connection to it. + * + * @param jobId identifying the job to monitor + * @param defaultTargetAddress of the job leader + * @throws Exception if an error occurs while starting the leader retrieval service + */ + public void addJob(final JobID jobId, final String defaultTargetAddress) throws Exception { + Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running."); + + LOG.info("Add job {} for job leader monitoring.", jobId); + + final LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever( + jobId, + defaultTargetAddress); + + JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId); + + leaderRetrievalService.start(jobManagerLeaderListener); + + jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, jobManagerLeaderListener)); + } + + /** + * Leader listener which tries to establish a connection to a newly detected job leader. + */ + private final class JobManagerLeaderListener implements LeaderRetrievalListener { + + /** Job id identifying the job to look for a leader */ + private final JobID jobId; + + /** Rpc connection to the job leader */ + private RegisteredRpcConnection rpcConnection; + + /** State of the listener */ + private volatile boolean stopped; + + /** Leader id of the current job leader */ + private volatile UUID currentLeaderId; + + private JobManagerLeaderListener(JobID jobId) { + this.jobId = Preconditions.checkNotNull(jobId); + + stopped = false; + rpcConnection = null; + currentLeaderId = null; + } + + public void stop() { + stopped = true; + + if (rpcConnection != null) { + rpcConnection.close(); + } + } + + @Override + public void notifyLeaderAddress(final String leaderAddress, final UUID leaderId) { + if (stopped) { + LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " + + "However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId); + } else { + LOG.debug("New leader information for job {}. Address: {}, leader id: {}.", + jobId, leaderAddress, leaderId); + + if (leaderAddress == null || leaderAddress.isEmpty()) { + // the leader lost leadership but there is no other leader yet. + if (rpcConnection != null) { + rpcConnection.close(); + } + + jobLeaderListener.jobManagerLostLeadership(jobId, currentLeaderId); + + currentLeaderId = leaderId; + } else { + currentLeaderId = leaderId; + + if (rpcConnection != null) { + // check if we are already trying to connect to this leader + if (!leaderId.equals(rpcConnection.getTargetLeaderId())) { + rpcConnection.close(); + + rpcConnection = new JobManagerRegisteredRpcConnection( + LOG, + leaderAddress, + leaderId, + rpcService.getExecutor()); + } + } else { + rpcConnection = new JobManagerRegisteredRpcConnection( + LOG, + leaderAddress, + leaderId, + rpcService.getExecutor()); + } + + // double check for a concurrent stop operation + if (stopped) { + rpcConnection.close(); + } else { + LOG.info("Try to register at job manager {} with leader id {}.", leaderAddress, leaderId); + rpcConnection.start(); + } + } + } + } + + @Override + public void handleError(Exception exception) { + if (stopped) { + LOG.debug("{}'s leader retrieval listener reported an exception for job {}. " + + "However, the service is no longer running.", JobLeaderService.class.getSimpleName(), + jobId, exception); + } else { + jobLeaderListener.handleError(exception); + } + } + + /** + * Rpc connection for the job manager <--> task manager connection. + */ + private final class JobManagerRegisteredRpcConnection extends RegisteredRpcConnection { + + JobManagerRegisteredRpcConnection( + Logger log, + String targetAddress, + UUID targetLeaderId, + Executor executor) { + super(log, targetAddress, targetLeaderId, executor); + } + + @Override + protected RetryingRegistration generateRegistration() { + return new JobLeaderService.JobManagerRetryingRegistration( + LOG, + rpcService, + "JobManager", + JobMasterGateway.class, + getTargetAddress(), + getTargetLeaderId(), + ownerAddress, + ownerProcessId); + } + + @Override + protected void onRegistrationSuccess(JMTMRegistrationSuccess success) { + // filter out old registration attempts + if (getTargetLeaderId().equals(currentLeaderId)) { + log.info("Successful registration at job manager {} for job {}.", getTargetAddress(), jobId); + + jobLeaderListener.jobManagerGainedLeadership(jobId, getTargetGateway(), getTargetLeaderId(), success); + } else { + log.debug("Encountered obsolete JobManager registration success from {} with leader session ID {}.", getTargetAddress(), getTargetLeaderId()); + } + } + + @Override + protected void onRegistrationFailure(Throwable failure) { + // filter out old registration attempts + if (getTargetLeaderId().equals(currentLeaderId)) { + log.info("Failed to register at job manager {} for job {}.", getTargetAddress(), jobId); + jobLeaderListener.handleError(failure); + } else { + log.debug("Obsolete JobManager registration failure from {} with leader session ID {}.", getTargetAddress(), getTargetLeaderId(), failure); + } + } + } + } + + /** + * Retrying registration for the job manager <--> task manager connection. + */ + private static final class JobManagerRetryingRegistration extends RetryingRegistration { + + private final String taskManagerAddress; + private final ResourceID taskManagerProcessId; + + JobManagerRetryingRegistration( + Logger log, + RpcService rpcService, + String targetName, + Class targetType, + String targetAddress, + UUID leaderId, + String taskManagerAddress, + ResourceID taskManagerProcessId) { + + super(log, rpcService, targetName, targetType, targetAddress, leaderId); + + this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress); + this.taskManagerProcessId = Preconditions.checkNotNull(taskManagerProcessId); + } + + @Override + protected Future invokeRegistration(JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception { + return gateway.registerTaskManager( + taskManagerAddress, + taskManagerProcessId, + leaderId, + Time.milliseconds(timeoutMillis)); + } + } + + /** + * Internal state of the service + */ + private enum State { + CREATED, STARTED, STOPPED + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java index 6fcd082e62d04..8d2057a8a7a8f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java @@ -34,7 +34,7 @@ public class JobManagerConnection { // Job master leader session id - private final UUID jobMasterLeaderId; + private final UUID leaderId; // Gateway to the job master private final JobMasterGateway jobMasterGateway; @@ -55,15 +55,14 @@ public class JobManagerConnection { private final PartitionStateChecker partitionStateChecker; public JobManagerConnection( - UUID jobMasterLeaderId, - JobMasterGateway jobMasterGateway, - TaskManagerActions taskManagerActions, - CheckpointResponder checkpointResponder, - LibraryCacheManager libraryCacheManager, - ResultPartitionConsumableNotifier resultPartitionConsumableNotifier, - PartitionStateChecker partitionStateChecker) - { - this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId); + JobMasterGateway jobMasterGateway, + UUID leaderId, + TaskManagerActions taskManagerActions, + CheckpointResponder checkpointResponder, + LibraryCacheManager libraryCacheManager, + ResultPartitionConsumableNotifier resultPartitionConsumableNotifier, + PartitionStateChecker partitionStateChecker) { + this.leaderId = Preconditions.checkNotNull(leaderId); this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions); this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder); @@ -72,8 +71,8 @@ public JobManagerConnection( this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker); } - public UUID getJobMasterLeaderId() { - return jobMasterLeaderId; + public UUID getLeaderId() { + return leaderId; } public JobMasterGateway getJobManagerGateway() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java new file mode 100644 index 0000000000000..00c467ef5552e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; + +import java.util.HashMap; +import java.util.Map; + +/** + * Container for multiple {@link JobManagerConnection} registered under their respective job id. + */ +public class JobManagerTable { + private final Map jobManagerConnections; + + public JobManagerTable() { + jobManagerConnections = new HashMap<>(4); + } + + public boolean contains(JobID jobId) { + return jobManagerConnections.containsKey(jobId); + } + + public boolean put(JobID jobId, JobManagerConnection jobManagerConnection) { + JobManagerConnection previousJMC = jobManagerConnections.put(jobId, jobManagerConnection); + + if (previousJMC != null) { + jobManagerConnections.put(jobId, previousJMC); + + return false; + } else { + return true; + } + } + + public JobManagerConnection remove(JobID jobId) { + return jobManagerConnections.remove(jobId); + } + + public JobManagerConnection get(JobID jobId) { + return jobManagerConnections.get(jobId); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index e642315bc23d5..3e3a5440419e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -19,12 +19,14 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -34,7 +36,6 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; @@ -42,6 +43,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.memory.MemoryManager; @@ -51,7 +53,6 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; @@ -59,6 +60,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException; import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskexecutor.exceptions.TaskException; import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException; import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder; @@ -81,9 +83,10 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -97,6 +100,9 @@ public class TaskExecutor extends RpcEndpoint { /** The connection information of this task manager */ private final TaskManagerLocation taskManagerLocation; + /** Max blob port which is accepted */ + public static final int MAX_BLOB_PORT = 65536; + /** The access to the leader election and retrieval services */ private final HighAvailabilityServices haServices; @@ -121,10 +127,6 @@ public class TaskExecutor extends RpcEndpoint { private final TaskManagerMetricGroup taskManagerMetricGroup; private final BroadcastVariableManager broadcastVariableManager; - - /** Slots which have become available but haven't been confirmed by the RM */ - private final Set unconfirmedFreeSlots; - private final FileCache fileCache; @@ -140,6 +142,10 @@ public class TaskExecutor extends RpcEndpoint { private final TaskSlotTable taskSlotTable; + private final JobManagerTable jobManagerTable; + + private final JobLeaderService jobLeaderService; + // ------------------------------------------------------------------------ public TaskExecutor( @@ -155,6 +161,8 @@ public TaskExecutor( BroadcastVariableManager broadcastVariableManager, FileCache fileCache, TaskSlotTable taskSlotTable, + JobManagerTable jobManagerTable, + JobLeaderService jobLeaderService, FatalErrorHandler fatalErrorHandler) { super(rpcService); @@ -173,10 +181,10 @@ public TaskExecutor( this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup); this.broadcastVariableManager = checkNotNull(broadcastVariableManager); this.fileCache = checkNotNull(fileCache); + this.jobManagerTable = checkNotNull(jobManagerTable); + this.jobLeaderService = checkNotNull(jobLeaderService); this.jobManagerConnections = new HashMap<>(4); - - this.unconfirmedFreeSlots = new HashSet<>(); } // ------------------------------------------------------------------------ @@ -195,7 +203,10 @@ public void start() { } // tell the task slot table who's responsible for the task slot actions - taskSlotTable.start(new SlotActionsImpl(), taskManagerConfiguration.getTimeout()); + taskSlotTable.start(new SlotActionsImpl()); + + // start the job leader service + jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); } /** @@ -207,7 +218,7 @@ public void shutDown() { taskSlotTable.stop(); - if (resourceManagerConnection.isConnected()) { + if (isConnectedToResourceManager()) { try { resourceManagerConnection.close(); } catch (Exception e) { @@ -248,30 +259,39 @@ public void shutDown() { log.info("Stopped TaskManager {}.", getAddress()); } - // ======================================================================== + // ====================================================================== // RPC methods - // ======================================================================== + // ====================================================================== // ---------------------------------------------------------------------- // Task lifecycle RPCs // ---------------------------------------------------------------------- @RpcMethod - public Acknowledge submitTask(TaskDeploymentDescriptor tdd, ResourceID jobManagerID) throws TaskSubmissionException { + public Acknowledge submitTask(TaskDeploymentDescriptor tdd, UUID jobManagerLeaderId) throws TaskSubmissionException { - JobManagerConnection jobManagerConnection = getJobManagerConnection(jobManagerID); + JobManagerConnection jobManagerConnection = jobManagerTable.get(tdd.getJobID()); if (jobManagerConnection == null) { - final String message = "Could not submit task because JobManager " + jobManagerID + - " was not associated."; + final String message = "Could not submit task because there is no JobManager " + + "associated for the job " + tdd.getJobID() + '.'; + + log.debug(message); + throw new TaskSubmissionException(message); + } + + if (!jobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) { + final String message = "Rejecting the task submission because the job manager leader id " + + jobManagerLeaderId + " does not match the expected job manager leader id " + + jobManagerConnection.getLeaderId() + '.'; log.debug(message); throw new TaskSubmissionException(message); } - if (!taskSlotTable.existActiveSlot(tdd.getJobID(), tdd.getAllocationID())) { + if (!taskSlotTable.existsActiveSlot(tdd.getJobID(), tdd.getAllocationId())) { final String message = "No task slot allocated for job ID " + tdd.getJobID() + - " and allocation ID " + tdd.getAllocationID() + '.'; + " and allocation ID " + tdd.getAllocationId() + '.'; log.debug(message); throw new TaskSubmissionException(message); } @@ -279,7 +299,7 @@ public Acknowledge submitTask(TaskDeploymentDescriptor tdd, ResourceID jobManage TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd); InputSplitProvider inputSplitProvider = new RpcInputSplitProvider( - jobManagerConnection.getJobMasterLeaderId(), + jobManagerConnection.getLeaderId(), jobManagerConnection.getJobManagerGateway(), tdd.getJobID(), tdd.getVertexID(), @@ -375,7 +395,7 @@ public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) throws TaskEx // ---------------------------------------------------------------------- @RpcMethod - public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Collection partitionInfos) throws PartitionException { + public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Iterable partitionInfos) throws PartitionException { final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { @@ -471,38 +491,319 @@ public Acknowledge confirmCheckpoint(ExecutionAttemptID executionAttemptID, long } } + // ---------------------------------------------------------------------- + // Slot allocation RPCs + // ---------------------------------------------------------------------- + /** + * /** * Requests a slot from the TaskManager * - * @param slotID Slot id for the request - * @param allocationID id for the request - * @param resourceManagerLeaderID current leader id of the ResourceManager + * @param slotId identifying the requested slot + * @param jobId identifying the job for which the request is issued + * @param allocationId id for the request + * @param targetAddress of the job manager requesting the slot + * @param rmLeaderId current leader id of the ResourceManager + * @throws SlotAllocationException if the slot allocation fails * @return answer to the slot request */ @RpcMethod - public TMSlotRequestReply requestSlot(SlotID slotID, AllocationID allocationID, UUID resourceManagerLeaderID) { - if (!resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderID)) { - return new TMSlotRequestRejected( - resourceManagerConnection.getRegistrationId(), getResourceID(), allocationID); + public TMSlotRequestReply requestSlot( + final SlotID slotId, + final JobID jobId, + final AllocationID allocationId, + final String targetAddress, + final UUID rmLeaderId) throws SlotAllocationException { + log.info("Receive slot request {} for job {} from resource manager with leader id {}.", + allocationId, jobId, rmLeaderId); + + if (resourceManagerConnection == null) { + final String message = "TaskManager is not connected to a resource manager."; + log.debug(message); + throw new SlotAllocationException(message); } - if (unconfirmedFreeSlots.contains(slotID)) { - // check if request has not been blacklisted because the notification of a free slot - // has not been confirmed by the ResourceManager - return new TMSlotRequestRejected( - resourceManagerConnection.getRegistrationId(), getResourceID(), allocationID); + + if (!resourceManagerConnection.getTargetLeaderId().equals(rmLeaderId)) { + final String message = "The leader id " + rmLeaderId + + " does not match with the leader id of the connected resource manager " + + resourceManagerConnection.getTargetLeaderId() + '.'; + + log.debug(message); + throw new SlotAllocationException(message); + } + + if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) { + if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) { + log.info("Allocated slot for {}.", allocationId); + } else { + log.info("Could not allocate slot for {}.", allocationId); + throw new SlotAllocationException("Could not allocate slot."); + } + } else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) { + final String message = "The slot " + slotId + " has already been allocated for a different job."; + + log.info(message); + throw new SlotAllocationException(message); + } + + if (jobManagerTable.contains(jobId)) { + offerSlotsToJobManager(jobId); + } else { + try { + jobLeaderService.addJob(jobId, targetAddress); + } catch (Exception e) { + // free the allocated slot + try { + taskSlotTable.freeSlot(allocationId); + } catch (SlotNotFoundException slotNotFoundException) { + // slot no longer existent, this should actually never happen, because we've + // just allocated the slot. So let's fail hard in this case! + onFatalError(slotNotFoundException); + } + + // sanity check + if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) { + onFatalError(new Exception("Could not free slot " + slotId)); + } + + throw new SlotAllocationException("Could not add job to job leader service.", e); + } } - return new TMSlotRequestRegistered(new InstanceID(), ResourceID.generate(), allocationID); + return new TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), getResourceID(), allocationId); } - // ------------------------------------------------------------------------ + // ====================================================================== // Internal methods + // ====================================================================== + + // ------------------------------------------------------------------------ + // Internal resource manager connection methods + // ------------------------------------------------------------------------ + + private void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) { + if (resourceManagerConnection != null) { + if (newLeaderAddress != null) { + // the resource manager switched to a new leader + log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", + resourceManagerConnection.getTargetAddress(), newLeaderAddress); + } + else { + // address null means that the current leader is lost without a new leader being there, yet + log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", + resourceManagerConnection.getTargetAddress()); + } + + // drop the current connection or connection attempt + if (resourceManagerConnection != null) { + resourceManagerConnection.close(); + resourceManagerConnection = null; + } + } + + // establish a connection to the new leader + if (newLeaderAddress != null) { + log.info("Attempting to register at ResourceManager {}", newLeaderAddress); + resourceManagerConnection = + new TaskExecutorToResourceManagerConnection( + log, + this, + newLeaderAddress, + newLeaderId, + getMainThreadExecutor()); + resourceManagerConnection.start(); + } + } + // ------------------------------------------------------------------------ + // Internal job manager connection methods + // ------------------------------------------------------------------------ + + private void offerSlotsToJobManager(final JobID jobId) { + final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId); + + if (jobManagerConnection == null) { + log.debug("There is no job manager connection to the leader of job {}.", jobId); + } else { + if (taskSlotTable.hasAllocatedSlots(jobId)) { + log.info("Offer reserved slots to the leader of job {}.", jobId); + + final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway(); + + final Iterator reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId); + final UUID leaderId = jobManagerConnection.getLeaderId(); + + final Collection reservedSlots = new HashSet<>(2); + + while (reservedSlotsIterator.hasNext()) { + reservedSlots.add(reservedSlotsIterator.next()); + } + + Future> acceptedSlotsFuture = jobMasterGateway.offerSlots( + reservedSlots, + leaderId, + taskManagerConfiguration.getTimeout()); + + acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction>() { + @Override + public void accept(Iterable acceptedSlots) { + // check if the response is still valid + if (isJobManagerConnectionValid(jobId, leaderId)) { + // mark accepted slots active + for (AllocationID acceptedSlot: acceptedSlots) { + try { + if (!taskSlotTable.markSlotActive(acceptedSlot)) { + // the slot is either free or releasing at the moment + final String message = "Could not mark slot " + jobId + " active."; + log.debug(message); + jobMasterGateway.failSlot(acceptedSlot, leaderId, new Exception(message)); + } + + // remove the assigned slots so that we can free the left overs + reservedSlots.remove(acceptedSlot); + } catch (SlotNotFoundException e) { + log.debug("Could not mark slot {} active.", acceptedSlot, e); + jobMasterGateway.failSlot(acceptedSlot, leaderId, e); + } + } + + final Exception e = new Exception("The slot was rejected by the JobManager."); + + for (AllocationID rejectedSlot: reservedSlots) { + freeSlot(rejectedSlot, e); + } + } else { + // discard the response since there is a new leader for the job + log.debug("Discard offer slot response since there is a new leader " + + "for the job {}.", jobId); + } + } + }, getMainThreadExecutor()); + + acceptedSlotsFuture.exceptionallyAsync(new ApplyFunction() { + @Override + public Void apply(Throwable throwable) { + if (throwable instanceof TimeoutException) { + // We ran into a timeout. Try again. + offerSlotsToJobManager(jobId); + } else { + // We encountered an exception. Free the slots and return them to the RM. + for (AllocationID reservedSlot: reservedSlots) { + freeSlot(reservedSlot, throwable); + } + } + + return null; + } + }, getMainThreadExecutor()); + } else { + log.debug("There are no unassigned slots for the job {}.", jobId); + } + } + } + + private void establishJobManagerConnection(JobID jobId, JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, JMTMRegistrationSuccess registrationSuccess) { + log.info("Establish JobManager connection for job {}.", jobId); + + if (jobManagerTable.contains(jobId)) { + JobManagerConnection oldJobManagerConnection = jobManagerTable.get(jobId); + + if (!oldJobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) { + closeJobManagerConnection(jobId); + jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort())); + } + } else { + jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort())); + } - private JobManagerConnection getJobManagerConnection(ResourceID jobManagerID) { - return jobManagerConnections.get(jobManagerID); + offerSlotsToJobManager(jobId); } + private void closeJobManagerConnection(JobID jobId) { + log.info("Close JobManager connection for job {}.", jobId); + + // 1. fail tasks running under this JobID + Iterator tasks = taskSlotTable.getTasks(jobId); + + while (tasks.hasNext()) { + tasks.next().failExternally(new Exception("JobManager responsible for " + jobId + + " lost the leadership.")); + } + + // 2. Move the active slots to state allocated (possible to time out again) + Iterator activeSlots = taskSlotTable.getActiveSlots(jobId); + + while (activeSlots.hasNext()) { + AllocationID activeSlot = activeSlots.next(); + + try { + if (!taskSlotTable.markSlotInactive(activeSlot, taskManagerConfiguration.getTimeout())) { + freeSlot(activeSlot, new Exception("Slot could not be marked inactive.")); + } + } catch (SlotNotFoundException e) { + log.debug("Could not mark the slot {} inactive.", jobId, e); + } + } + + // 3. Disassociate from the JobManager + JobManagerConnection jobManagerConnection = jobManagerTable.remove(jobId); + + if (jobManagerConnection != null) { + try { + disassociateFromJobManager(jobManagerConnection); + } catch (IOException e) { + log.warn("Could not properly disassociate from JobManager {}.", + jobManagerConnection.getJobManagerGateway().getAddress(), e); + } + } + } + + private JobManagerConnection associateWithJobManager(JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, int blobPort) { + Preconditions.checkNotNull(jobManagerLeaderId); + Preconditions.checkNotNull(jobMasterGateway); + Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob port is out of range."); + + TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobManagerLeaderId, jobMasterGateway); + + CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway); + + InetSocketAddress address = new InetSocketAddress(jobMasterGateway.getAddress(), blobPort); + + BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration()); + + LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager( + blobCache, + taskManagerConfiguration.getCleanupInterval()); + + ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier( + jobManagerLeaderId, + jobMasterGateway, + getRpcService().getExecutor(), + taskManagerConfiguration.getTimeout()); + + PartitionStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobManagerLeaderId, jobMasterGateway); + + return new JobManagerConnection( + jobMasterGateway, + jobManagerLeaderId, + taskManagerActions, + checkpointResponder, + libraryCacheManager, + resultPartitionConsumableNotifier, + partitionStateChecker); + } + + private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException { + Preconditions.checkNotNull(jobManagerConnection); + JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway(); + jobManagerGateway.disconnectTaskManager(getResourceID()); + jobManagerConnection.getLibraryCacheManager().shutdown(); + } + + // ------------------------------------------------------------------------ + // Internal task methods + // ------------------------------------------------------------------------ + private void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) { final Task task = taskSlotTable.getTask(executionAttemptID); @@ -571,94 +872,11 @@ private void unregisterTaskAndNotifyFinalState( } } - private void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) { - if (resourceManagerConnection != null) { - if (newLeaderAddress != null) { - // the resource manager switched to a new leader - log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getTargetAddress(), newLeaderAddress); - } - else { - // address null means that the current leader is lost without a new leader being there, yet - log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", - resourceManagerConnection.getTargetAddress()); - } - - // drop the current connection or connection attempt - if (resourceManagerConnection != null) { - resourceManagerConnection.close(); - resourceManagerConnection = null; - } - } - - unconfirmedFreeSlots.clear(); - - // establish a connection to the new leader - if (newLeaderAddress != null) { - log.info("Attempting to register at ResourceManager {}", newLeaderAddress); - resourceManagerConnection = - new TaskExecutorToResourceManagerConnection( - log, - this, - newLeaderAddress, - newLeaderId, - getMainThreadExecutor()); - resourceManagerConnection.start(); - } - } - - private JobManagerConnection associateWithJobManager(UUID jobMasterLeaderId, - JobMasterGateway jobMasterGateway, int blobPort) - { - Preconditions.checkNotNull(jobMasterLeaderId); - Preconditions.checkNotNull(jobMasterGateway); - Preconditions.checkArgument(blobPort > 0 || blobPort <= 65535, "Blob port is out of range."); - - TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterLeaderId, jobMasterGateway); - - CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway); - - InetSocketAddress address = new InetSocketAddress(jobMasterGateway.getAddress(), blobPort); - - BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration()); - - LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager( - blobCache, - taskManagerConfiguration.getCleanupInterval()); - - ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier( - jobMasterLeaderId, - jobMasterGateway, - getRpcService().getExecutor(), - taskManagerConfiguration.getTimeout()); - - PartitionStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterLeaderId, jobMasterGateway); - - return new JobManagerConnection( - jobMasterLeaderId, - jobMasterGateway, - taskManagerActions, - checkpointResponder, - libraryCacheManager, - resultPartitionConsumableNotifier, - partitionStateChecker); - } - - private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException { - if (jobManagerConnection != null) { - JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway(); - - jobManagerGateway.disconnectTaskManager(getResourceID()); - - jobManagerConnection.getLibraryCacheManager().shutdown(); - } - } - - private void freeSlot(AllocationID allocationId) { + private void freeSlot(AllocationID allocationId, Throwable cause) { Preconditions.checkNotNull(allocationId); try { - int freedSlotIndex = taskSlotTable.freeSlot(allocationId); + int freedSlotIndex = taskSlotTable.freeSlot(allocationId, cause); if (freedSlotIndex != -1 && isConnectedToResourceManager()) { // the slot was freed. Tell the RM about it @@ -674,21 +892,35 @@ private void freeSlot(AllocationID allocationId) { } } + private void freeSlot(AllocationID allocationId) { + freeSlot(allocationId, new Exception("The slot " + allocationId + " is beeing freed.")); + } + private void timeoutSlot(AllocationID allocationId, UUID ticket) { Preconditions.checkNotNull(allocationId); Preconditions.checkNotNull(ticket); if (taskSlotTable.isValidTimeout(allocationId, ticket)) { - freeSlot(allocationId); + freeSlot(allocationId, new Exception("The slot " + allocationId + " has timed out.")); } else { log.debug("Received an invalid timeout for allocation id {} with ticket {}.", allocationId, ticket); } } + // ------------------------------------------------------------------------ + // Internal utility methods + // ------------------------------------------------------------------------ + private boolean isConnectedToResourceManager() { return (resourceManagerConnection != null && resourceManagerConnection.isConnected()); } + private boolean isJobManagerConnectionValid(JobID jobId, UUID leaderId) { + JobManagerConnection jmConnection = jobManagerTable.get(jobId); + + return jmConnection != null && jmConnection.getLeaderId().equals(leaderId); + } + // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ @@ -737,11 +969,6 @@ TaskExecutorToResourceManagerConnection getResourceManagerConnection() { return resourceManagerConnection; } - @VisibleForTesting - public void addUnconfirmedFreeSlotNotification(SlotID slotID) { - unconfirmedFreeSlots.add(slotID); - } - // ------------------------------------------------------------------------ // Utility classes // ------------------------------------------------------------------------ @@ -767,6 +994,44 @@ public void handleError(Exception exception) { } } + private final class JobLeaderListenerImpl implements JobLeaderListener { + + @Override + public void jobManagerGainedLeadership( + final JobID jobId, + final JobMasterGateway jobManagerGateway, + final UUID jobLeaderId, + final JMTMRegistrationSuccess registrationMessage) { + runAsync(new Runnable() { + @Override + public void run() { + establishJobManagerConnection( + jobId, + jobManagerGateway, + jobLeaderId, + registrationMessage); + } + }); + } + + @Override + public void jobManagerLostLeadership(final JobID jobId, final UUID jobLeaderId) { + log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobLeaderId); + + runAsync(new Runnable() { + @Override + public void run() { + closeJobManagerConnection(jobId); + } + }); + } + + @Override + public void handleError(Throwable throwable) { + onFatalErrorAsync(throwable); + } + } + private final class TaskManagerActionsImpl implements TaskManagerActions { private final UUID jobMasterLeaderId; private final JobMasterGateway jobMasterGateway; @@ -830,5 +1095,4 @@ public void run() { }); } } - } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index f062b96c9f61f..1ffc407c0281e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -18,10 +18,10 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -30,9 +30,9 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskmanager.Task; -import java.util.Collection; import java.util.UUID; /** @@ -43,28 +43,31 @@ public interface TaskExecutorGateway extends RpcGateway { /** * Requests a slot from the TaskManager * - * @param slotID slot id for the request - * @param allocationID id for the request - * @param resourceManagerLeaderID current leader id of the ResourceManager + * @param slotId slot id for the request + * @param allocationId id for the request + * @param resourceManagerLeaderId current leader id of the ResourceManager + * @throws SlotAllocationException if the slot allocation fails * @return answer to the slot request */ Future requestSlot( - SlotID slotID, - AllocationID allocationID, - UUID resourceManagerLeaderID, + SlotID slotId, + JobID jobId, + AllocationID allocationId, + String targetAddress, + UUID resourceManagerLeaderId, @RpcTimeout Time timeout); /** * Submit a {@link Task} to the {@link TaskExecutor}. * * @param tdd describing the task to submit - * @param jobManagerID identifying the submitting JobManager + * @param leaderId of the job leader * @param timeout of the submit operation * @return Future acknowledge of the successful operation */ Future submitTask( TaskDeploymentDescriptor tdd, - ResourceID jobManagerID, + UUID leaderId, @RpcTimeout Time timeout); /** @@ -74,7 +77,7 @@ Future submitTask( * @param partitionInfos telling where the partition can be retrieved from * @return Future acknowledge if the partitions have been successfully updated */ - Future updatePartitions(ExecutionAttemptID executionAttemptID, Collection partitionInfos); + Future updatePartitions(ExecutionAttemptID executionAttemptID, Iterable partitionInfos); /** * Fail all intermediate result partitions of the given task. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java index 2dbd9ebe2c556..53f030e727b60 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -68,11 +68,16 @@ protected RetryingRegistration timerService = new TimerService<>(new ScheduledThreadPoolExecutor(1)); final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService); + + final JobManagerTable jobManagerTable = new JobManagerTable(); + + final JobLeaderService jobLeaderService = new JobLeaderService(resourceID); return new TaskManagerServices( taskManagerLocation, @@ -200,7 +218,9 @@ public static TaskManagerServices fromConfiguration( taskManagerMetricGroup, broadcastVariableManager, fileCache, - taskSlotTable); + taskSlotTable, + jobManagerTable, + jobLeaderService); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java new file mode 100644 index 0000000000000..66d8102d5401c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.exceptions; + +/** + * Exception indicating that the slot allocation on the task manager failed. + */ +public class SlotAllocationException extends TaskManagerException { + + private static final long serialVersionUID = -4764932098204266773L; + + public SlotAllocationException(String message) { + super(message); + } + + public SlotAllocationException(String message, Throwable cause) { + super(message, cause); + } + + public SlotAllocationException(Throwable cause) { + super(cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index 42cb919592146..88123b4cc5724 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -70,10 +70,7 @@ public class TaskSlotTable implements TimeoutListener { /** Interface for slot actions, such as freeing them or timing them out */ private SlotActions slotActions; - - /** The timeout for allocated slots */ - private Time slotTimeout; - + /** Whether the table has been started */ private boolean started; @@ -104,7 +101,6 @@ public TaskSlotTable( slotsPerJob = new HashMap<>(4); slotActions = null; - slotTimeout = null; started = false; } @@ -112,11 +108,9 @@ public TaskSlotTable( * Start the task slot table with the given slot actions and slot timeout value. * * @param initialSlotActions to use for slot actions - * @param initialSlotTimeout to use for slot timeouts */ - public void start(SlotActions initialSlotActions, Time initialSlotTimeout) { + public void start(SlotActions initialSlotActions) { this.slotActions = Preconditions.checkNotNull(initialSlotActions); - this.slotTimeout = Preconditions.checkNotNull(initialSlotTimeout); timerService.start(this); @@ -129,7 +123,6 @@ public void start(SlotActions initialSlotActions, Time initialSlotTimeout) { public void stop() { started = false; timerService.stop(); - slotTimeout = null; slotActions = null; } @@ -144,9 +137,10 @@ public void stop() { * @param index of the task slot to allocate * @param jobId to allocate the task slot for * @param allocationId identifying the allocation + * @param slotTimeout until the slot times out * @return True if the task slot could be allocated; otherwise false */ - public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId) { + public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) { checkInit(); TaskSlot taskSlot = taskSlots.get(index); @@ -180,7 +174,7 @@ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId) { * * @param allocationId to identify the task slot to mark as active * @throws SlotNotFoundException if the slot could not be found for the given allocation id - * @return True if the slot could be marked active + * @return True if the slot could be marked active; otherwise false */ public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException { checkInit(); @@ -190,6 +184,8 @@ public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundExce if (taskSlot != null) { if (taskSlot.markActive()) { // unregister a potential timeout + LOG.info("Activate slot {}.", allocationId); + timerService.unregisterTimeout(allocationId); return true; @@ -206,10 +202,11 @@ public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundExce * then a {@link SlotNotFoundException} is thrown. * * @param allocationId to identify the task slot to mark as inactive + * @param slotTimeout until the slot times out * @throws SlotNotFoundException if the slot could not be found for the given allocation id * @return True if the slot could be marked inactive */ - public boolean markSlotInactive(AllocationID allocationId) throws SlotNotFoundException { + public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException { checkInit(); TaskSlot taskSlot = getTaskSlot(allocationId); @@ -253,6 +250,12 @@ public int freeSlot(AllocationID allocationId) throws SlotNotFoundException { */ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException { checkInit(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Free slot {}.", allocationId, cause); + } else { + LOG.info("Free slot {}.", allocationId); + } TaskSlot taskSlot = getTaskSlot(allocationId); @@ -322,8 +325,6 @@ public boolean isValidTimeout(AllocationID allocationId, UUID ticket) { * @return True if the given task slot is allocated for the given job and allocation id */ public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) { - checkInit(); - TaskSlot taskSlot = taskSlots.get(index); return taskSlot.isAllocated(jobId, allocationId); @@ -336,7 +337,7 @@ public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) { * @param allocationId identifying the allocation * @return True if there exists a task slot which is active for the given job and allocation id. */ - public boolean existActiveSlot(JobID jobId, AllocationID allocationId) { + public boolean existsActiveSlot(JobID jobId, AllocationID allocationId) { TaskSlot taskSlot = getTaskSlot(allocationId); if (taskSlot != null) { @@ -431,6 +432,8 @@ public boolean addTask(Task task) throws SlotNotFoundException, SlotNotActiveExc * @return The removed task if there is any for the given execution attempt id; otherwise null */ public Task removeTask(ExecutionAttemptID executionAttemptID) { + checkInit(); + TaskSlotMapping taskSlotMapping = taskSlotMappings.remove(executionAttemptID); if (taskSlotMapping != null) { @@ -481,6 +484,8 @@ public Iterator getTasks(JobID jobId) { @Override public void notifyTimeout(AllocationID key, UUID ticket) { + checkInit(); + if (slotActions != null) { slotActions.timeoutSlot(key, ticket); } @@ -493,9 +498,7 @@ public void notifyTimeout(AllocationID key, UUID ticket) { private TaskSlot getTaskSlot(AllocationID allocationId) { Preconditions.checkNotNull(allocationId); - TaskSlot taskSlot = allocationIDTaskSlotMap.get(allocationId); - - return taskSlot; + return allocationIDTaskSlotMap.get(allocationId); } private void checkInit() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index b67737d0f5085..90a829cfbbd18 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -278,7 +278,7 @@ public Task( this.jobId = checkNotNull(tdd.getJobID()); this.vertexId = checkNotNull(tdd.getVertexID()); this.executionId = checkNotNull(tdd.getExecutionId()); - this.allocationId = checkNotNull(tdd.getAllocationID()); + this.allocationId = checkNotNull(tdd.getAllocationId()); this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks(); this.jobConfiguration = checkNotNull(tdd.getJobConfiguration()); this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java index 39ea17610f8a8..993fd19f231c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -44,6 +45,7 @@ public class TaskDeploymentDescriptorTest { public void testSerialization() { try { final JobID jobID = new JobID(); + final AllocationID allocationId = new AllocationID(); final JobVertexID vertexID = new JobVertexID(); final ExecutionAttemptID execId = new ExecutionAttemptID(); final String jobName = "job name"; @@ -61,7 +63,7 @@ public void testSerialization() { final List requiredClasspaths = new ArrayList(0); final SerializedValue executionConfig = new SerializedValue<>(new ExecutionConfig()); - final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, jobName, vertexID, execId, + final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, allocationId, jobName, vertexID, execId, executionConfig, taskName, numberOfKeyGroups, indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber, jobConfiguration, taskConfiguration, invokableClass.getName(), producedResults, inputGates, requiredJars, requiredClasspaths, 47); @@ -69,12 +71,14 @@ public void testSerialization() { final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig); assertFalse(orig.getJobID() == copy.getJobID()); + assertFalse(orig.getAllocationId() == copy.getAllocationId()); assertFalse(orig.getVertexID() == copy.getVertexID()); assertFalse(orig.getTaskName() == copy.getTaskName()); assertFalse(orig.getJobConfiguration() == copy.getJobConfiguration()); assertFalse(orig.getTaskConfiguration() == copy.getTaskConfiguration()); assertEquals(orig.getJobID(), copy.getJobID()); + assertEquals(orig.getAllocationId(), copy.getAllocationId()); assertEquals(orig.getVertexID(), copy.getVertexID()); assertEquals(orig.getTaskName(), copy.getTaskName()); assertEquals(orig.getNumberOfKeyGroups(), copy.getNumberOfKeyGroups()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index a255027f498a9..38e372d10c626 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -91,7 +91,7 @@ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Excepti } @Override - public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) throws Exception { LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID); if (service != null) { return service; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java index 49f2268ae8fab..073aeacef918c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -75,7 +76,8 @@ public void addAndRemoveJobs() throws IOException { final ExecutionAttemptID execution21 = new ExecutionAttemptID(); TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( - jid1, + jid1, + new AllocationID(), jobName1, vertex11, execution11, @@ -91,6 +93,7 @@ public void addAndRemoveJobs() throws IOException { TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( jid1, + new AllocationID(), jobName1, vertex12, execution12, @@ -106,6 +109,7 @@ public void addAndRemoveJobs() throws IOException { TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor( jid2, + new AllocationID(), jobName2, vertex21, execution21, @@ -121,6 +125,7 @@ public void addAndRemoveJobs() throws IOException { TaskDeploymentDescriptor tdd4 = new TaskDeploymentDescriptor( jid1, + new AllocationID(), jobName1, vertex13, execution13, @@ -192,6 +197,7 @@ public void testCloseClosesAll() throws IOException { TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( jid1, + new AllocationID(), jobName1, vertex11, execution11, @@ -207,6 +213,7 @@ public void testCloseClosesAll() throws IOException { TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( jid1, + new AllocationID(), jobName1, vertex12, execution12, @@ -222,6 +229,7 @@ public void testCloseClosesAll() throws IOException { TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor( jid2, + new AllocationID(), jobName2, vertex21, execution21, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 558d3c2a7fba8..948c1290db741 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -68,7 +68,7 @@ public static void setUp() { taskExecutorRegistration = Mockito.mock(TaskExecutorRegistration.class); TaskExecutorGateway gateway = Mockito.mock(TaskExecutorGateway.class); Mockito.when(taskExecutorRegistration.getTaskExecutorGateway()).thenReturn(gateway); - Mockito.when(gateway.requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class))) + Mockito.when(gateway.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class))) .thenReturn(new FlinkCompletableFuture()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 24d959ef9b8e1..86cd1f828ace2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -144,7 +144,7 @@ public void testSlotsUnavailableRequest() throws Exception { Mockito .when( taskExecutorGateway - .requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class))) + .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class))) .thenReturn(new FlinkCompletableFuture()); testRpcService.registerGateway(tmAddress, taskExecutorGateway); @@ -161,7 +161,7 @@ public void testSlotsUnavailableRequest() throws Exception { // 4) Slot becomes available and TaskExecutor gets a SlotRequest verify(taskExecutorGateway, timeout(5000)) - .requestSlot(eq(slotID), eq(allocationID), any(UUID.class), any(Time.class)); + .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class)); } /** @@ -189,7 +189,7 @@ public void testSlotAvailableRequest() throws Exception { TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); Mockito.when( taskExecutorGateway - .requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class))) + .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class))) .thenReturn(new FlinkCompletableFuture()); testRpcService.registerGateway(tmAddress, taskExecutorGateway); @@ -240,7 +240,7 @@ public void testSlotAvailableRequest() throws Exception { // 4) a SlotRequest is routed to the TaskExecutor verify(taskExecutorGateway, timeout(5000)) - .requestSlot(eq(slotID), eq(allocationID), any(UUID.class), any(Time.class)); + .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class)); } private static TestingLeaderElectionService configureHA( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index e7143aea3e708..bbde33156b933 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -287,17 +288,27 @@ private boolean checkMethod(Method gatewayMethod, Method endpointMethod) { if (!futureClass.equals(RpcCompletenessTest.futureClass)) { return false; } else { - Class valueClass = ReflectionUtil.getTemplateType1(gatewayMethod.getGenericReturnType()); + ReflectionUtil.FullTypeInfo fullValueTypeInfo = ReflectionUtil.getFullTemplateType(gatewayMethod.getGenericReturnType(), 0); if (endpointMethod.getReturnType().equals(futureClass)) { - Class rpcEndpointValueClass = ReflectionUtil.getTemplateType1(endpointMethod.getGenericReturnType()); + ReflectionUtil.FullTypeInfo fullRpcEndpointValueTypeInfo = ReflectionUtil.getFullTemplateType(endpointMethod.getGenericReturnType(), 0); // check if we have the same future value types - if (valueClass != null && rpcEndpointValueClass != null && !checkType(valueClass, rpcEndpointValueClass)) { - return false; + if (fullValueTypeInfo != null && fullRpcEndpointValueTypeInfo != null) { + Iterator> valueClasses = fullValueTypeInfo.getClazzIterator(); + Iterator> rpcClasses = fullRpcEndpointValueTypeInfo.getClazzIterator(); + + while (valueClasses.hasNext() && rpcClasses.hasNext()) { + if (!checkType(valueClasses.next(), rpcClasses.next())) { + return false; + } + } + + // both should be empty + return !valueClasses.hasNext() && !rpcClasses.hasNext(); } } else { - if (valueClass != null && !checkType(valueClass, endpointMethod.getReturnType())) { + if (fullValueTypeInfo != null && !checkType(fullValueTypeInfo.getClazz(), endpointMethod.getReturnType())) { return false; } } @@ -342,16 +353,16 @@ private String generateEndpointMethodSignature(Method method) { if (method.getReturnType().equals(Void.TYPE)) { builder.append("void").append(" "); } else if (method.getReturnType().equals(futureClass)) { - Class valueClass = ReflectionUtil.getTemplateType1(method.getGenericReturnType()); + ReflectionUtil.FullTypeInfo fullTypeInfo = ReflectionUtil.getFullTemplateType(method.getGenericReturnType(), 0); builder .append(futureClass.getSimpleName()) .append("<") - .append(valueClass != null ? valueClass.getSimpleName() : "") + .append(fullTypeInfo != null ? fullTypeInfo.toString() : "") .append(">"); - if (valueClass != null) { - builder.append("/").append(valueClass.getSimpleName()); + if (fullTypeInfo != null) { + builder.append("/").append(fullTypeInfo); } builder.append(" "); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index baae25141aeb4..23c6833e51ee1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -18,21 +18,45 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.NonHaServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; @@ -40,19 +64,42 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; +import org.apache.flink.runtime.taskexecutor.slot.TimerService; +import org.apache.flink.runtime.taskmanager.CheckpointResponder; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; +import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.Matchers; import org.powermock.api.mockito.PowerMockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; import static org.mockito.Mockito.*; +import static org.hamcrest.Matchers.contains; public class TaskExecutorTest extends TestLogger { + @Rule + public TestName name = new TestName(); + + @Test public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception { final ResourceID resourceID = ResourceID.generate(); @@ -85,6 +132,8 @@ public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception { mock(BroadcastVariableManager.class), mock(FileCache.class), mock(TaskSlotTable.class), + mock(JobManagerTable.class), + mock(JobLeaderService.class), mock(FatalErrorHandler.class)); taskManager.start(); @@ -142,6 +191,8 @@ public void testTriggerRegistrationOnLeaderChange() throws Exception { mock(BroadcastVariableManager.class), mock(FileCache.class), mock(TaskSlotTable.class), + mock(JobManagerTable.class), + mock(JobLeaderService.class), mock(FatalErrorHandler.class)); taskManager.start(); @@ -172,18 +223,386 @@ public void testTriggerRegistrationOnLeaderChange() throws Exception { } } + /** + * Tests that we can submit a task to the TaskManager given that we've allocated a slot there. + */ + @Test(timeout = 1000L) + public void testTaskSubmission() throws Exception { + final Configuration configuration = new Configuration(); + + final TestingSerialRpcService rpc = new TestingSerialRpcService(); + final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); + final JobID jobId = new JobID(); + final AllocationID allocationId = new AllocationID(); + final UUID jobManagerLeaderId = UUID.randomUUID(); + final JobVertexID jobVertexId = new JobVertexID(); + + final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( + jobId, + allocationId, + name.getMethodName(), + jobVertexId, + new ExecutionAttemptID(), + new SerializedValue<>(new ExecutionConfig()), + "test task", + 1, + 0, + 1, + 0, + configuration, + configuration, + TestInvokable.class.getName(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); + + final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class); + when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader()); + + final JobManagerConnection jobManagerConnection = new JobManagerConnection( + mock(JobMasterGateway.class), + jobManagerLeaderId, + mock(TaskManagerActions.class), + mock(CheckpointResponder.class), + libraryCacheManager, + mock(ResultPartitionConsumableNotifier.class), + mock(PartitionStateChecker.class)); + + final JobManagerTable jobManagerTable = new JobManagerTable(); + jobManagerTable.put(jobId, jobManagerConnection); + + final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class); + when(taskSlotTable.existsActiveSlot(eq(jobId), eq(allocationId))).thenReturn(true); + when(taskSlotTable.addTask(any(Task.class))).thenReturn(true); + + final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); + + when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class)); + + final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class); + + when(taskManagerMetricGroup.addTaskForJob(eq(tdd))).thenReturn(mock(TaskMetricGroup.class)); + + final HighAvailabilityServices haServices = mock(HighAvailabilityServices.class); + when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class)); + + try { + + TaskExecutor taskManager = new TaskExecutor( + taskManagerConfiguration, + mock(TaskManagerLocation.class), + rpc, + mock(MemoryManager.class), + mock(IOManager.class), + networkEnvironment, + haServices, + mock(MetricRegistry.class), + taskManagerMetricGroup, + mock(BroadcastVariableManager.class), + mock(FileCache.class), + taskSlotTable, + jobManagerTable, + mock(JobLeaderService.class), + mock(FatalErrorHandler.class)); + + taskManager.start(); + + taskManager.submitTask(tdd, jobManagerLeaderId); + + Future completionFuture = TestInvokable.completableFuture; + + completionFuture.get(); + + } finally { + rpc.stopService(); + } + } + + /** + * Test invokable which completes the given future when executed. + */ + public static class TestInvokable extends AbstractInvokable { + + static final CompletableFuture completableFuture = new FlinkCompletableFuture<>(); + + @Override + public void invoke() throws Exception { + completableFuture.complete(true); + } + } + + /** + * Tests that a TaskManager detects a job leader for which has reserved slots. Upon detecting + * the job leader, it will offer all reserved slots to the JobManager. + */ + @Test + public void testJobLeaderDetection() throws TestingFatalErrorHandler.TestingException, SlotAllocationException { + final JobID jobId = new JobID(); + + final TestingSerialRpcService rpc = new TestingSerialRpcService(); + final Configuration configuration = new Configuration(); + final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); + final ResourceID resourceId = new ResourceID("foobar"); + final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234); + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + final TimerService timerService = mock(TimerService.class); + final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), timerService); + final JobManagerTable jobManagerTable = new JobManagerTable(); + final JobLeaderService jobLeaderService = new JobLeaderService(resourceId); + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + + final TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(); + final TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(); + haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService); + haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService); + + final String resourceManagerAddress = "rm"; + final UUID resourceManagerLeaderId = UUID.randomUUID(); + + final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); + final InstanceID registrationId = new InstanceID(); + + when(resourceManagerGateway.registerTaskExecutor( + eq(resourceManagerLeaderId), + any(String.class), + eq(resourceId), + any(SlotReport.class), + any(Time.class))).thenReturn(FlinkCompletableFuture.completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L))); + + final String jobManagerAddress = "jm"; + final UUID jobManagerLeaderId = UUID.randomUUID(); + final ResourceID jmResourceId = new ResourceID(jobManagerAddress); + final int blobPort = 42; + + final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); + + when(jobMasterGateway.registerTaskManager( + any(String.class), + eq(resourceId), + eq(jobManagerLeaderId), + any(Time.class) + )).thenReturn(FlinkCompletableFuture.completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); + when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress); + + rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); + rpc.registerGateway(jobManagerAddress, jobMasterGateway); + + final AllocationID allocationId = new AllocationID(); + final SlotID slotId = new SlotID(resourceId, 0); + + try { + TaskExecutor taskManager = new TaskExecutor( + taskManagerConfiguration, + taskManagerLocation, + rpc, + mock(MemoryManager.class), + mock(IOManager.class), + mock(NetworkEnvironment.class), + haServices, + mock(MetricRegistry.class), + mock(TaskManagerMetricGroup.class), + mock(BroadcastVariableManager.class), + mock(FileCache.class), + taskSlotTable, + jobManagerTable, + jobLeaderService, + testingFatalErrorHandler); + + taskManager.start(); + + // tell the task manager about the rm leader + resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerLeaderId); + + // request slots from the task manager under the given allocation id + TMSlotRequestReply reply = taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId); + + // this is hopefully successful :-) + assertTrue(reply instanceof TMSlotRequestRegistered); + + // now inform the task manager about the new job leader + jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, jobManagerLeaderId); + + // the job leader should get the allocation id offered + verify(jobMasterGateway).offerSlots((Iterable)Matchers.argThat(contains(allocationId)), eq(jobManagerLeaderId), any(Time.class)); + } finally { + // check if a concurrent error occurred + testingFatalErrorHandler.rethrowException(); + + rpc.stopService(); + } + } + + /** + * Tests that accepted slots go into state assigned and the others are returned to the resource + * manager. + */ + @Test + public void testSlotAcceptance() throws Exception { + final JobID jobId = new JobID(); + + final TestingSerialRpcService rpc = new TestingSerialRpcService(); + final Configuration configuration = new Configuration(); + final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); + final ResourceID resourceId = new ResourceID("foobar"); + final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234); + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + final TimerService timerService = mock(TimerService.class); + final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), mock(ResourceProfile.class)), timerService); + final JobManagerTable jobManagerTable = new JobManagerTable(); + final JobLeaderService jobLeaderService = new JobLeaderService(resourceId); + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + + final String resourceManagerAddress = "rm"; + final UUID resourceManagerLeaderId = UUID.randomUUID(); + + final String jobManagerAddress = "jm"; + final UUID jobManagerLeaderId = UUID.randomUUID(); + + final LeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(resourceManagerAddress, resourceManagerLeaderId); + final LeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(jobManagerAddress, jobManagerLeaderId); + haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService); + haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService); + + final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); + final InstanceID registrationId = new InstanceID(); + + when(resourceManagerGateway.registerTaskExecutor( + eq(resourceManagerLeaderId), + any(String.class), + eq(resourceId), + any(SlotReport.class), + any(Time.class))).thenReturn(FlinkCompletableFuture.completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L))); + + final ResourceID jmResourceId = new ResourceID(jobManagerAddress); + final int blobPort = 42; + + final AllocationID allocationId1 = new AllocationID(); + final AllocationID allocationId2 = new AllocationID(); + + final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); + + when(jobMasterGateway.registerTaskManager( + any(String.class), + eq(resourceId), + eq(jobManagerLeaderId), + any(Time.class) + )).thenReturn(FlinkCompletableFuture.completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); + when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress); + + when(jobMasterGateway.offerSlots(any(Iterable.class), eq(jobManagerLeaderId), any(Time.class))) + .thenReturn(FlinkCompletableFuture.completed((Iterable)Collections.singleton(allocationId1))); + + rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); + rpc.registerGateway(jobManagerAddress, jobMasterGateway); + + try { + TaskExecutor taskManager = new TaskExecutor( + taskManagerConfiguration, + taskManagerLocation, + rpc, + mock(MemoryManager.class), + mock(IOManager.class), + mock(NetworkEnvironment.class), + haServices, + mock(MetricRegistry.class), + mock(TaskManagerMetricGroup.class), + mock(BroadcastVariableManager.class), + mock(FileCache.class), + taskSlotTable, + jobManagerTable, + jobLeaderService, + testingFatalErrorHandler); + + taskManager.start(); + + taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds(10000L)); + taskSlotTable.allocateSlot(1, jobId, allocationId2, Time.milliseconds(10000L)); + + // we have to add the job after the TaskExecutor, because otherwise the service has not + // been properly started. + jobLeaderService.addJob(jobId, jobManagerAddress); + + verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), eq(registrationId), eq(new SlotID(resourceId, 1))); + + assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1)); + assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2)); + assertTrue(taskSlotTable.isSlotFree(1)); + } finally { + // check if a concurrent error occurred + testingFatalErrorHandler.rethrowException(); + + rpc.stopService(); + } + } + + private static class TestingFatalErrorHandler implements FatalErrorHandler { + private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class); + private final AtomicReference atomicThrowable; + + public TestingFatalErrorHandler() { + atomicThrowable = new AtomicReference<>(null); + } + + public void rethrowException() throws TestingException { + Throwable throwable = atomicThrowable.get(); + + if (throwable != null) { + throw new TestingException(throwable); + } + } + + public boolean hasExceptionOccurred() { + return atomicThrowable.get() != null; + } + + public Throwable getException() { + return atomicThrowable.get(); + } + + @Override + public void onFatalError(Throwable exception) { + LOG.error("OnFatalError:", exception); + atomicThrowable.compareAndSet(null, exception); + } + + //------------------------------------------------------------------ + // static utility classes + //------------------------------------------------------------------ + + private static final class TestingException extends Exception { + public TestingException(String message) { + super(message); + } + + public TestingException(String message, Throwable cause) { + super(message, cause); + } + + public TestingException(Throwable cause) { + super(cause); + } + + private static final long serialVersionUID = -4648195335470914498L; + } + } + /** * Tests that all allocation requests for slots are ignored if the slot has been reported as * free by the TaskExecutor but this report hasn't been confirmed by the ResourceManager. * * This is essential for the correctness of the state of the ResourceManager. */ + @Ignore @Test - public void testRejectAllocationRequestsForOutOfSyncSlots() { + public void testRejectAllocationRequestsForOutOfSyncSlots() throws SlotAllocationException { final ResourceID resourceID = ResourceID.generate(); final String address1 = "/resource/manager/address/one"; final UUID leaderId = UUID.randomUUID(); + final JobID jobId = new JobID(); + final String jobManagerAddress = "foobar"; final TestingSerialRpcService rpc = new TestingSerialRpcService(); try { @@ -215,6 +634,8 @@ public void testRejectAllocationRequestsForOutOfSyncSlots() { mock(BroadcastVariableManager.class), mock(FileCache.class), mock(TaskSlotTable.class), + mock(JobManagerTable.class), + mock(JobLeaderService.class), mock(FatalErrorHandler.class)); taskManager.start(); @@ -232,14 +653,14 @@ public void testRejectAllocationRequestsForOutOfSyncSlots() { // test that allocating a slot works final SlotID slotID = new SlotID(resourceID, 0); - TMSlotRequestReply tmSlotRequestReply = taskManager.requestSlot(slotID, new AllocationID(), leaderId); + TMSlotRequestReply tmSlotRequestReply = taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId); assertTrue(tmSlotRequestReply instanceof TMSlotRequestRegistered); + // TODO: Figure out the concrete allocation behaviour between RM and TM. Maybe we don't need the SlotID... // test that we can't allocate slots which are blacklisted due to pending confirmation of the RM final SlotID unconfirmedFreeSlotID = new SlotID(resourceID, 1); - taskManager.addUnconfirmedFreeSlotNotification(unconfirmedFreeSlotID); TMSlotRequestReply tmSlotRequestReply2 = - taskManager.requestSlot(unconfirmedFreeSlotID, new AllocationID(), leaderId); + taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId); assertTrue(tmSlotRequestReply2 instanceof TMSlotRequestRejected); // re-register @@ -250,7 +671,7 @@ public void testRejectAllocationRequestsForOutOfSyncSlots() { // now we should be successful because the slots status has been synced // test that we can't allocate slots which are blacklisted due to pending confirmation of the RM TMSlotRequestReply tmSlotRequestReply3 = - taskManager.requestSlot(unconfirmedFreeSlotID, new AllocationID(), leaderId); + taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId); assertTrue(tmSlotRequestReply3 instanceof TMSlotRequestRegistered); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index eb7f3c5084d15..e027d7852e361 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; @@ -159,7 +160,7 @@ private static Task createTask() throws Exception { .thenReturn(mock(TaskKvStateRegistry.class)); TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), + new JobID(), new AllocationID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue<>(new ExecutionConfig()), "Test Task", 1, 0, 1, 0, new Configuration(), new Configuration(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index d4efd24385839..d1909fe532be0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; @@ -163,12 +164,13 @@ protected void run() { final ExecutionAttemptID eid = new ExecutionAttemptID(); final SerializedValue executionConfig = new SerializedValue<>(new ExecutionConfig()); - final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig, - "TestTask", 7, 2, 7, 0, new Configuration(), new Configuration(), - TestInvokableCorrect.class.getName(), - Collections.emptyList(), - Collections.emptyList(), - new ArrayList(), Collections.emptyList(), 0); + final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, new AllocationID(), + "TestJob", vid, eid, executionConfig, + "TestTask", 7, 2, 7, 0, new Configuration(), new Configuration(), + TestInvokableCorrect.class.getName(), + Collections.emptyList(), + Collections.emptyList(), + new ArrayList(), Collections.emptyList(), 0); new Within(d) { @@ -265,7 +267,7 @@ public void testJobSubmissionAndCanceling() { final ExecutionAttemptID eid2 = new ExecutionAttemptID(); final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( - jid1, "TestJob1", vid1, eid1, + jid1, new AllocationID(), "TestJob1", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "TestTask1", 5, 1, 5, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), @@ -274,7 +276,7 @@ public void testJobSubmissionAndCanceling() { new ArrayList(), Collections.emptyList(), 0); final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( - jid2, "TestJob2", vid2, eid2, + jid2, new AllocationID(), "TestJob2", vid2, eid2, new SerializedValue<>(new ExecutionConfig()), "TestTask2", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), @@ -404,13 +406,13 @@ public void testJobSubmissionAndStop() throws Exception { final SerializedValue executionConfig = new SerializedValue<>(new ExecutionConfig()); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, "TestJob", vid1, eid1, executionConfig, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, new AllocationID(), "TestJob", vid1, eid1, executionConfig, "TestTask1", 5, 1, 5, 0, new Configuration(), new Configuration(), StoppableInvokable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, "TestJob", vid2, eid2, executionConfig, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, new AllocationID(), "TestJob", vid2, eid2, executionConfig, "TestTask2", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.emptyList(), Collections.emptyList(), @@ -531,7 +533,7 @@ public void testGateChannelEdgeMismatch() { final ExecutionAttemptID eid2 = new ExecutionAttemptID(); final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( - jid, "TestJob", vid1, eid1, + jid, new AllocationID(), "TestJob", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "Sender", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), @@ -540,7 +542,7 @@ public void testGateChannelEdgeMismatch() { new ArrayList(), Collections.emptyList(), 0); final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( - jid, "TestJob", vid2, eid2, + jid, new AllocationID(), "TestJob", vid2, eid2, new SerializedValue<>(new ExecutionConfig()), "Receiver", 7, 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), @@ -636,7 +638,7 @@ public void testRunJobWithForwardChannel() { ); final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( - jid, "TestJob", vid1, eid1, + jid, new AllocationID(), "TestJob", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "Sender", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), @@ -644,7 +646,7 @@ public void testRunJobWithForwardChannel() { Collections.emptyList(), 0); final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( - jid, "TestJob", vid2, eid2, + jid, new AllocationID(), "TestJob", vid2, eid2, new SerializedValue<>(new ExecutionConfig()), "Receiver", 7, 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), @@ -781,7 +783,7 @@ public void testCancellingDependentAndStateUpdateFails() { ); final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( - jid, "TestJob", vid1, eid1, + jid, new AllocationID(), "TestJob", vid1, eid1, new SerializedValue<>(new ExecutionConfig()), "Sender", 1, 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), @@ -789,7 +791,7 @@ public void testCancellingDependentAndStateUpdateFails() { new ArrayList(), Collections.emptyList(), 0); final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( - jid, "TestJob", vid2, eid2, + jid, new AllocationID(), "TestJob", vid2, eid2, new SerializedValue<>(new ExecutionConfig()), "Receiver", 7, 2, 7, 0, new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(), @@ -929,7 +931,7 @@ public void testRemotePartitionNotFound() throws Exception { new InputGateDeploymentDescriptor(resultId, 0, icdd); final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - jid, "TestJob", vid, eid, + jid, new AllocationID(), "TestJob", vid, eid, new SerializedValue<>(new ExecutionConfig()), "Receiver", 1, 0, 1, 0, new Configuration(), new Configuration(), @@ -1022,7 +1024,7 @@ public void testLocalPartitionNotFound() throws Exception { new InputGateDeploymentDescriptor(resultId, 0, icdd); final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - jid, "TestJob", vid, eid, + jid, new AllocationID(), "TestJob", vid, eid, new SerializedValue<>(new ExecutionConfig()), "Receiver", 1, 0, 1, 0, new Configuration(), new Configuration(), @@ -1097,24 +1099,25 @@ public void testTriggerStackTraceSampleMessage() throws Exception { // Single blocking task final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - new JobID(), - "Job", - new JobVertexID(), - new ExecutionAttemptID(), - new SerializedValue<>(new ExecutionConfig()), - "Task", - 1, - 0, - 1, - 0, - new Configuration(), - new Configuration(), - Tasks.BlockingNoOpInvokable.class.getName(), - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - 0); + new JobID(), + new AllocationID(), + "Job", + new JobVertexID(), + new ExecutionAttemptID(), + new SerializedValue<>(new ExecutionConfig()), + "Task", + 1, + 0, + 1, + 0, + new Configuration(), + new Configuration(), + Tasks.BlockingNoOpInvokable.class.getName(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); // Submit the task new Within(d) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java index 5d3eb3a8c038a..7d466f37cf488 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java @@ -69,7 +69,7 @@ public void doMocking(AbstractInvokable taskMock) throws Exception { when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class)); when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class)); when(tddMock.getInvokableClassName()).thenReturn("className"); - when(tddMock.getAllocationID()).thenReturn(mock(AllocationID.class)); + when(tddMock.getAllocationId()).thenReturn(mock(AllocationID.class)); task = new Task( tddMock, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 50fc181146453..c5a9f2d0e6dff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -24,6 +24,7 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -665,16 +666,18 @@ private TaskDeploymentDescriptor createTaskDeploymentDescriptor(ClassemptyList(), - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - 0); + new JobID(), + new AllocationID(), + "Test Job", new JobVertexID(), new ExecutionAttemptID(), + execConfig, + "Test Task", 1, 0, 1, 0, + new Configuration(), new Configuration(), + invokable.getName(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index fb1b3b3b30f52..e84f594b1766a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -25,6 +25,7 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -129,24 +130,25 @@ private static TaskDeploymentDescriptor createTaskDeploymentDescriptor( List> partitionableOperatorState = Collections.emptyList(); return new TaskDeploymentDescriptor( - new JobID(), - "test job name", - new JobVertexID(), - new ExecutionAttemptID(), - new SerializedValue<>(new ExecutionConfig()), - "test task name", - 1, 0, 1, 0, - new Configuration(), - taskConfig, - SourceStreamTask.class.getName(), - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - 0, - operatorState, - keyGroupState, - partitionableOperatorState); + new JobID(), + new AllocationID(), + "test job name", + new JobVertexID(), + new ExecutionAttemptID(), + new SerializedValue<>(new ExecutionConfig()), + "test task name", + 1, 0, 1, 0, + new Configuration(), + taskConfig, + SourceStreamTask.class.getName(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0, + operatorState, + keyGroupState, + partitionableOperatorState); } private static Task createTask(TaskDeploymentDescriptor tdd) throws IOException { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 95485950f16eb..abbf10c3d2044 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -220,17 +221,19 @@ private Task createTask( .thenReturn(mock(TaskKvStateRegistry.class)); TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), - new SerializedValue<>(new ExecutionConfig()), - "Test Task", 1, 0, 1, 0, - new Configuration(), - taskConfig.getConfiguration(), - invokable.getName(), - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - 0); + new JobID(), + new AllocationID(), + "Job Name", new JobVertexID(), new ExecutionAttemptID(), + new SerializedValue<>(new ExecutionConfig()), + "Test Task", 1, 0, 1, 0, + new Configuration(), + taskConfig.getConfiguration(), + invokable.getName(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); return new Task( tdd,