Skip to content

Commit

Permalink
[FLINK-9932] Harden slot allocation protocol
Browse files Browse the repository at this point in the history
Harden slot allocation protocol by accepting task submissions for slots which are only
allocated. Before, it was necessary that the slot was marked as active. This, however,
required that task submissions come strictly after completing the slot offering which
is not the case. With this change, we mark all allocated and active slots as active
if the TaskExecutor receives a task submission.
  • Loading branch information
tillrohrmann committed Oct 15, 2018
1 parent 1d2aa70 commit d7740bd
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 89 deletions.
Expand Up @@ -451,7 +451,7 @@ public CompletableFuture<Acknowledge> submitTask(
throw new TaskSubmissionException(message);
}

if (!taskSlotTable.existsActiveSlot(jobId, tdd.getAllocationId())) {
if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
final String message = "No task slot allocated for job ID " + jobId +
" and allocation ID " + tdd.getAllocationId() + '.';
log.debug(message);
Expand Down Expand Up @@ -1062,18 +1062,6 @@ private void offerSlotsToJobManager(final JobID jobId) {

while (reservedSlotsIterator.hasNext()) {
SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
try {
if (!taskSlotTable.markSlotActive(offer.getAllocationId())) {
// the slot is either free or releasing at the moment
final String message = "Could not mark slot " + jobId + " active.";
log.debug(message);
jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new Exception(message));
}
} catch (SlotNotFoundException e) {
final String message = "Could not mark slot " + jobId + " active.";
jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new Exception(message));
continue;
}
reservedSlots.add(offer);
}

Expand Down Expand Up @@ -1103,6 +1091,24 @@ private void offerSlotsToJobManager(final JobID jobId) {
if (isJobManagerConnectionValid(jobId, jobMasterId)) {
// mark accepted slots active
for (SlotOffer acceptedSlot : acceptedSlots) {
try {
if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
// the slot is either free or releasing at the moment
final String message = "Could not mark slot " + jobId + " active.";
log.debug(message);
jobMasterGateway.failSlot(
getResourceID(),
acceptedSlot.getAllocationId(),
new FlinkException(message));
}
} catch (SlotNotFoundException e) {
final String message = "Could not mark slot " + jobId + " active.";
jobMasterGateway.failSlot(
getResourceID(),
acceptedSlot.getAllocationId(),
new FlinkException(message));
}

reservedSlots.remove(acceptedSlot);
}

Expand Down
Expand Up @@ -379,17 +379,17 @@ public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
}

/**
* Check whether there exists an active slot for the given job and allocation id.
* Try to mark the specified slot as active if it has been allocated by the given job.
*
* @param jobId of the allocated slot
* @param allocationId identifying the allocation
* @return True if there exists a task slot which is active for the given job and allocation id.
* @return True if the task slot could be marked active.
*/
public boolean existsActiveSlot(JobID jobId, AllocationID allocationId) {
public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
TaskSlot taskSlot = getTaskSlot(allocationId);

if (taskSlot != null) {
return taskSlot.isActive(jobId, allocationId);
if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) {
return taskSlot.markActive();
} else {
return false;
}
Expand Down
Expand Up @@ -35,8 +35,6 @@
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.Environment;
Expand Down Expand Up @@ -69,9 +67,6 @@
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.registration.RegistrationResponse;
Expand All @@ -90,10 +85,10 @@
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -725,7 +720,7 @@ public void testTaskSubmission() throws Exception {
jobManagerTable.put(jobId, jobManagerConnection);

final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
when(taskSlotTable.existsActiveSlot(eq(jobId), eq(allocationId))).thenReturn(true);
when(taskSlotTable.tryMarkSlotActive(eq(jobId), eq(allocationId))).thenReturn(true);
when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);

TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
Expand Down Expand Up @@ -990,8 +985,8 @@ public void testSlotAcceptance() throws Exception {

assertThat(instanceIDSlotIDAllocationIDTuple3, equalTo(expectedResult));

assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));
assertTrue(taskSlotTable.tryMarkSlotActive(jobId, allocationId1));
assertFalse(taskSlotTable.tryMarkSlotActive(jobId, allocationId2));
assertTrue(taskSlotTable.isSlotFree(1));
} finally {
taskManager.shutDown();
Expand All @@ -1008,61 +1003,37 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception {
final JobManagerTable jobManagerTable = new JobManagerTable();
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);

final String resourceManagerAddress = "rm";
final UUID resourceManagerLeaderId = UUID.randomUUID();

final String jobManagerAddress = "jm";
final JobMasterId jobMasterId = JobMasterId.generate();

resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, resourceManagerLeaderId);
jobManagerLeaderRetriever.notifyListener(jobManagerAddress, jobMasterId.toUUID());

final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());

final CompletableFuture<Tuple3<InstanceID, SlotID, AllocationID>> availableSlotFuture = new CompletableFuture<>();
resourceManagerGateway.setNotifySlotAvailableConsumer(availableSlotFuture::complete);

final ResourceID jmResourceId = new ResourceID(jobManagerAddress);

final AllocationID allocationId1 = new AllocationID();
final AllocationID allocationId2 = new AllocationID();

final SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);

final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);

when(jobMasterGateway.registerTaskManager(
any(String.class),
eq(taskManagerLocation),
any(Time.class)
)).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId)));
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
when(jobMasterGateway.updateTaskExecutionState(any(TaskExecutionState.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));

rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
rpc.registerGateway(jobManagerAddress, jobMasterGateway);

final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class);
when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());

final JobManagerConnection jobManagerConnection = new JobManagerConnection(
jobId,
jmResourceId,
jobMasterGateway,
mock(TaskManagerActions.class),
mock(CheckpointResponder.class),
libraryCacheManager,
mock(ResultPartitionConsumableNotifier.class),
mock(PartitionProducerStateChecker.class));
final OneShotLatch offerSlotsLatch = new OneShotLatch();
final OneShotLatch taskInTerminalState = new OneShotLatch();
final CompletableFuture<Collection<SlotOffer>> offerResultFuture = new CompletableFuture<>();
final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
.setOfferSlotsFunction((resourceID, slotOffers) -> {
offerSlotsLatch.trigger();
return offerResultFuture;
})
.setUpdateTaskExecutionStateFunction(taskExecutionState -> {
if (taskExecutionState.getExecutionState().isTerminal()) {
taskInTerminalState.trigger();
}
return CompletableFuture.completedFuture(Acknowledge.get());
})
.build();

final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());

when(taskManagerMetricGroup.addTaskForJob(
any(JobID.class), anyString(), any(JobVertexID.class), any(ExecutionAttemptID.class),
anyString(), anyInt(), anyInt())
).thenReturn(taskMetricGroup);
rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);

final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);

Expand All @@ -1086,7 +1057,7 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception {
haServices,
taskManagerServices,
new HeartbeatServices(1000L, 1000L),
taskManagerMetricGroup,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
testingFatalErrorHandler);
Expand Down Expand Up @@ -1114,7 +1085,7 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception {
"test task",
1,
1,
TestInvokable.class.getName(),
NoOpInvokable.class.getName(),
new Configuration());

SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(jobInformation);
Expand All @@ -1130,37 +1101,30 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception {
0,
0,
null,
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList());

CompletableFuture<Collection<SlotOffer>> offerResultFuture = new CompletableFuture<>();

// submit task first and then return acceptance response
when(
jobMasterGateway.offerSlots(
any(ResourceID.class),
any(Collection.class),
any(Time.class)))
.thenReturn(offerResultFuture);
Collections.emptyList(),
Collections.emptyList());

// we have to add the job after the TaskExecutor, because otherwise the service has not
// been properly started. This will also offer the slots to the job master
jobLeaderService.addJob(jobId, jobManagerAddress);
jobLeaderService.addJob(jobId, jobMasterGateway.getAddress());

verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Collection.class), any(Time.class));
offerSlotsLatch.await();

// submit the task without having acknowledge the offered slots
tmGateway.submitTask(tdd, jobMasterId, timeout);
tmGateway.submitTask(tdd, jobMasterGateway.getFencingToken(), timeout).get();

// acknowledge the offered slots
offerResultFuture.complete(Collections.singleton(offer1));

final Tuple3<InstanceID, SlotID, AllocationID> instanceIDSlotIDAllocationIDTuple3 = availableSlotFuture.get();
assertThat(instanceIDSlotIDAllocationIDTuple3.f1, equalTo(new SlotID(taskManagerLocation.getResourceID(), 1)));

assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));
assertTrue(taskSlotTable.tryMarkSlotActive(jobId, allocationId1));
assertFalse(taskSlotTable.tryMarkSlotActive(jobId, allocationId2));
assertTrue(taskSlotTable.isSlotFree(1));

// wait for the task completion
taskInTerminalState.await();
} finally {
taskManager.shutDown();
taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
Expand Down
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskexecutor.slot;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;

import org.apache.commons.collections.IteratorUtils;
import org.junit.Test;

import javax.annotation.Nonnull;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/**
* Tests for the {@link TaskSlotTable}.
*/
public class TaskSlotTableTest extends TestLogger {

private static final Time SLOT_TIMEOUT = Time.seconds(100L);

/**
* Tests that one can can mark allocated slots as active.
*/
@Test
public void testTryMarkSlotActive() throws SlotNotFoundException {
final TaskSlotTable taskSlotTable = createTaskSlotTable(Collections.nCopies(3, ResourceProfile.UNKNOWN));

try {
taskSlotTable.start(new TestingSlotActionsBuilder().build());

final JobID jobId1 = new JobID();
final AllocationID allocationId1 = new AllocationID();
taskSlotTable.allocateSlot(0, jobId1, allocationId1, SLOT_TIMEOUT);
final AllocationID allocationId2 = new AllocationID();
taskSlotTable.allocateSlot(1, jobId1, allocationId2, SLOT_TIMEOUT);
final AllocationID allocationId3 = new AllocationID();
final JobID jobId2 = new JobID();
taskSlotTable.allocateSlot(2, jobId2, allocationId3, SLOT_TIMEOUT);

taskSlotTable.markSlotActive(allocationId1);

assertThat(taskSlotTable.isAllocated(0, jobId1, allocationId1), is(true));
assertThat(taskSlotTable.isAllocated(1, jobId1, allocationId2), is(true));
assertThat(taskSlotTable.isAllocated(2, jobId2, allocationId3), is(true));

assertThat(IteratorUtils.toList(taskSlotTable.getActiveSlots(jobId1)), is(equalTo(Arrays.asList(allocationId1))));

assertThat(taskSlotTable.tryMarkSlotActive(jobId1, allocationId1), is(true));
assertThat(taskSlotTable.tryMarkSlotActive(jobId1, allocationId2), is(true));
assertThat(taskSlotTable.tryMarkSlotActive(jobId1, allocationId3), is(false));

assertThat(Sets.newHashSet(taskSlotTable.getActiveSlots(jobId1)), is(equalTo(new HashSet<>(Arrays.asList(allocationId2, allocationId1)))));
} finally {
taskSlotTable.stop();
}
}

@Nonnull
private TaskSlotTable createTaskSlotTable(final Collection<ResourceProfile> resourceProfiles) {
return new TaskSlotTable(
resourceProfiles,
new TimerService<>(TestingUtils.defaultExecutor(), 10000L));
}

}

0 comments on commit d7740bd

Please sign in to comment.