Skip to content

Commit

Permalink
[FLINK-9456] Reuse TestingResourceActions in SlotManagerTest#testNoti…
Browse files Browse the repository at this point in the history
…fyFailedAllocationWhenTaskManagerTerminated
  • Loading branch information
tillrohrmann committed Jul 1, 2018
1 parent 50c0ea8 commit 89cfeaa
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 68 deletions.
Expand Up @@ -90,6 +90,7 @@ public AllocationID getAllocationId() {
return allocationId;
}

@Nullable
public JobID getJobId() {
return jobId;
}
Expand Down
Expand Up @@ -541,6 +541,7 @@ private void registerSlot(
*
* @param slotId to update
* @param allocationId specifying the current allocation of the slot
* @param jobId specifying the job to which the slot is allocated
* @return True if the slot could be updated; otherwise false
*/
private boolean updateSlot(SlotID slotId, AllocationID allocationId, JobID jobId) {
Expand All @@ -565,10 +566,10 @@ private boolean updateSlot(SlotID slotId, AllocationID allocationId, JobID jobId
}

private void updateSlotState(
TaskManagerSlot slot,
TaskManagerRegistration taskManagerRegistration,
@Nullable AllocationID allocationId,
@Nullable JobID jobId) {
TaskManagerSlot slot,
TaskManagerRegistration taskManagerRegistration,
@Nullable AllocationID allocationId,
@Nullable JobID jobId) {
if (null != allocationId) {
switch (slot.getState()) {
case PENDING:
Expand Down Expand Up @@ -773,10 +774,14 @@ private void removeSlot(SlotID slotId) {
}

AllocationID oldAllocationId = slot.getAllocationId();

if (oldAllocationId != null) {
fulfilledSlotRequests.remove(oldAllocationId);

resourceActions.notifyAllocationFailure(
slot.getJobId(), oldAllocationId, new Exception("The assigned slot " + slot.getSlotId() + " was removed."));
slot.getJobId(),
oldAllocationId,
new FlinkException("The assigned slot " + slot.getSlotId() + " was removed."));
}
} else {
LOG.debug("There was no slot registered with slot id {}.", slotId);
Expand Down
Expand Up @@ -22,6 +22,10 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.util.Preconditions;

/**
* Exception which signals that a slot is already occupied by the given
* {@link AllocationID}.
*/
public class SlotOccupiedException extends SlotAllocationException {
private static final long serialVersionUID = -3986333914244338888L;

Expand All @@ -32,19 +36,7 @@ public class SlotOccupiedException extends SlotAllocationException {
public SlotOccupiedException(String message, AllocationID allocationId, JobID jobId) {
super(message);
this.allocationId = Preconditions.checkNotNull(allocationId);
this.jobId = jobId;
}

public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId, JobID jobId) {
super(message, cause);
this.allocationId = Preconditions.checkNotNull(allocationId);
this.jobId = jobId;
}

public SlotOccupiedException(Throwable cause, AllocationID allocationId, JobID jobId) {
super(cause);
this.allocationId = Preconditions.checkNotNull(allocationId);
this.jobId = jobId;
this.jobId = Preconditions.checkNotNull(jobId);
}

public AllocationID getAllocationId() {
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
Expand Down Expand Up @@ -49,11 +50,14 @@
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import java.util.ArrayList;
import javax.annotation.Nonnull;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand All @@ -64,9 +68,10 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -1161,7 +1166,8 @@ public void testReportAllocatedSlot() throws Exception {
*/
@Test
public void testSlotRequestFailure() throws Exception {
try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions())) {
try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(),
new TestingResourceActionsBuilder().createTestingResourceActions())) {

final SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
slotManager.registerSlotRequest(slotRequest);
Expand Down Expand Up @@ -1216,93 +1222,106 @@ public void testSlotRequestFailure() throws Exception {
@Test
public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Exception {

final List<Tuple2<JobID, AllocationID>> notifiedTaskManagerInfos = new ArrayList<>();
final Queue<Tuple2<JobID, AllocationID>> allocationFailures = new ArrayDeque<>(5);

try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() {
@Override
public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
notifiedTaskManagerInfos.add(new Tuple2<>(jobId, allocationId));
}})) {
final TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
.setNotifyAllocationFailureConsumer(
(Tuple3<JobID, AllocationID, Exception> failureMessage) ->
allocationFailures.offer(Tuple2.of(failureMessage.f0, failureMessage.f1)))
.createTestingResourceActions();

try (final SlotManager slotManager = createSlotManager(
ResourceManagerId.generate(),
resourceManagerActions)) {

// register slot request for job1.
JobID jobId1 = new JobID();
final SlotRequest slotRequest11 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
final SlotRequest slotRequest12 = new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
final SlotRequest slotRequest11 = createSlotRequest(jobId1);
final SlotRequest slotRequest12 = createSlotRequest(jobId1);
slotManager.registerSlotRequest(slotRequest11);
slotManager.registerSlotRequest(slotRequest12);

// create task-manager-1 with 2 slots.
final ResourceID taskExecutorResourceId1 = ResourceID.generate();
final TestingTaskExecutorGateway testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder()
.createTestingTaskExecutorGateway();
final TestingTaskExecutorGateway testingTaskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutionConnection1 = new TaskExecutorConnection(taskExecutorResourceId1, testingTaskExecutorGateway1);
final Set<SlotStatus> tm1SlotStatusList = new HashSet<>();
tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 0), ResourceProfile.UNKNOWN));
tm1SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId1, 1), ResourceProfile.UNKNOWN));
final SlotReport slotReport1 = createSlotReport(taskExecutorResourceId1, 2);

// register the task-manager-1 to the slot manager, this will trigger the slot allocation for job1.
slotManager.registerTaskManager(taskExecutionConnection1, new SlotReport(tm1SlotStatusList));
slotManager.registerTaskManager(taskExecutionConnection1, slotReport1);

// register slot request for job2.
JobID jobId2 = new JobID();
final SlotRequest slotRequest21 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
final SlotRequest slotRequest22 = new SlotRequest(jobId2, new AllocationID(), ResourceProfile.UNKNOWN, "foobar2");
final SlotRequest slotRequest21 = createSlotRequest(jobId2);
final SlotRequest slotRequest22 = createSlotRequest(jobId2);
slotManager.registerSlotRequest(slotRequest21);
slotManager.registerSlotRequest(slotRequest22);

// register slot request for job3.
JobID jobId3 = new JobID();
final SlotRequest slotRequest31 = new SlotRequest(jobId3, new AllocationID(), ResourceProfile.UNKNOWN, "foobar3");
final SlotRequest slotRequest31 = createSlotRequest(jobId3);
slotManager.registerSlotRequest(slotRequest31);

// create task-manager-2 with 3 slots.
final ResourceID taskExecutorResourceId2 = ResourceID.generate();
final TestingTaskExecutorGateway testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder()
.createTestingTaskExecutorGateway();
final TestingTaskExecutorGateway testingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
final TaskExecutorConnection taskExecutionConnection2 = new TaskExecutorConnection(taskExecutorResourceId2, testingTaskExecutorGateway2);
final Set<SlotStatus> tm2SlotStatusList = new HashSet<>();
tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 0), ResourceProfile.UNKNOWN));
tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 1), ResourceProfile.UNKNOWN));
tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 2), ResourceProfile.UNKNOWN));
tm2SlotStatusList.add(new SlotStatus(new SlotID(taskExecutorResourceId2, 3), ResourceProfile.UNKNOWN));
final SlotReport slotReport2 = createSlotReport(taskExecutorResourceId2, 3);

// register the task-manager-2 to the slot manager, this will trigger the slot allocation for job2 and job3.
slotManager.registerTaskManager(taskExecutionConnection2, new SlotReport(tm2SlotStatusList));

// --------------------- valid the notify task manager terminated ------------------------
slotManager.registerTaskManager(taskExecutionConnection2, slotReport2);

// valid for job1.
// validate for job1.
slotManager.unregisterTaskManager(taskExecutionConnection1.getInstanceID());

assertEquals(2, notifiedTaskManagerInfos.size());
assertThat(allocationFailures, hasSize(2));

assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(0).f0));
assertThat(jobId1, equalTo(notifiedTaskManagerInfos.get(1).f0));
Tuple2<JobID, AllocationID> allocationFailure;
final Set<AllocationID> failedAllocations = new HashSet<>(2);

assertEquals(Stream.of(slotRequest11.getAllocationId(), slotRequest12.getAllocationId()).collect(Collectors.toSet()),
Stream.of(notifiedTaskManagerInfos.get(0).f1, notifiedTaskManagerInfos.get(1).f1).collect(Collectors.toSet()));
while ((allocationFailure = allocationFailures.poll()) != null) {
assertThat(allocationFailure.f0, equalTo(jobId1));
failedAllocations.add(allocationFailure.f1);
}

notifiedTaskManagerInfos.clear();
assertThat(failedAllocations, containsInAnyOrder(slotRequest11.getAllocationId(), slotRequest12.getAllocationId()));

// valid the result for job2 and job3.
// validate the result for job2 and job3.
slotManager.unregisterTaskManager(taskExecutionConnection2.getInstanceID());

assertEquals(3, notifiedTaskManagerInfos.size());
assertThat(allocationFailures, hasSize(3));

Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo = notifiedTaskManagerInfos.stream().collect(Collectors.groupingBy(tuple -> tuple.f0));
Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo = allocationFailures.stream().collect(Collectors.groupingBy(tuple -> tuple.f0));

assertEquals(2, job2AndJob3FailedAllocationInfo.size());
assertThat(job2AndJob3FailedAllocationInfo.entrySet(), hasSize(2));

// valid for job2
assertEquals(Stream.of(slotRequest21.getAllocationId(), slotRequest22.getAllocationId()).collect(Collectors.toSet()),
job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet()));
final Set<AllocationID> job2FailedAllocations = extractFailedAllocationsForJob(jobId2, job2AndJob3FailedAllocationInfo);
final Set<AllocationID> job3FailedAllocations = extractFailedAllocationsForJob(jobId3, job2AndJob3FailedAllocationInfo);

// valid for job3
assertEquals(Stream.of(slotRequest31.getAllocationId()).collect(Collectors.toSet()),
job2AndJob3FailedAllocationInfo.get(jobId3).stream().map(tuple2 -> tuple2.f1).collect(Collectors.toSet()));
assertThat(job2FailedAllocations, containsInAnyOrder(slotRequest21.getAllocationId(), slotRequest22.getAllocationId()));
assertThat(job3FailedAllocations, containsInAnyOrder(slotRequest31.getAllocationId()));
}
}

private Set<AllocationID> extractFailedAllocationsForJob(JobID jobId2, Map<JobID, List<Tuple2<JobID, AllocationID>>> job2AndJob3FailedAllocationInfo) {
return job2AndJob3FailedAllocationInfo.get(jobId2).stream().map(t -> t.f1).collect(Collectors.toSet());
}

@Nonnull
private SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) {
final Set<SlotStatus> slotStatusSet = new HashSet<>(numberSlots);
for (int i = 0; i < numberSlots; i++) {
slotStatusSet.add(new SlotStatus(new SlotID(taskExecutorResourceId, i), ResourceProfile.UNKNOWN));
}

return new SlotReport(slotStatusSet);
}

@Nonnull
private SlotRequest createSlotRequest(JobID jobId1) {
return new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
}

private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
SlotManager slotManager = new SlotManager(
TestingUtils.defaultScheduledExecutor(),
Expand Down
Expand Up @@ -19,26 +19,52 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.InstanceID;

import javax.annotation.Nonnull;

import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* Testing implementation of the {@link ResourceActions}.
*/
public class TestingResourceActions implements ResourceActions {

@Nonnull
private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;

@Nonnull
private final Consumer<ResourceProfile> allocateResourceConsumer;

@Nonnull
private final Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer;

public TestingResourceActions(
@Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer,
@Nonnull Consumer<ResourceProfile> allocateResourceConsumer,
@Nonnull Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer) {
this.releaseResourceConsumer = releaseResourceConsumer;
this.allocateResourceConsumer = allocateResourceConsumer;
this.notifyAllocationFailureConsumer = notifyAllocationFailureConsumer;
}


@Override
public void releaseResource(InstanceID instanceId, Exception cause) {

releaseResourceConsumer.accept(instanceId, cause);
}

@Override
public void allocateResource(ResourceProfile resourceProfile) {

allocateResourceConsumer.accept(resourceProfile);
}

@Override
public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {

notifyAllocationFailureConsumer.accept(Tuple3.of(jobId, allocationId, cause));
}
}
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.resourcemanager.slotmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.InstanceID;

import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* Builder for the {@link TestingResourceActions}.
*/
public class TestingResourceActionsBuilder {
private BiConsumer<InstanceID, Exception> releaseResourceConsumer = (ignoredA, ignoredB) -> {};
private Consumer<ResourceProfile> allocateResourceConsumer = (ignored) -> {};
private Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer = (ignored) -> {};

public TestingResourceActionsBuilder setReleaseResourceConsumer(BiConsumer<InstanceID, Exception> releaseResourceConsumer) {
this.releaseResourceConsumer = releaseResourceConsumer;
return this;
}

public TestingResourceActionsBuilder setAllocateResourceConsumer(Consumer<ResourceProfile> allocateResourceConsumer) {
this.allocateResourceConsumer = allocateResourceConsumer;
return this;
}

public TestingResourceActionsBuilder setNotifyAllocationFailureConsumer(Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer) {
this.notifyAllocationFailureConsumer = notifyAllocationFailureConsumer;
return this;
}

public TestingResourceActions createTestingResourceActions() {
return new TestingResourceActions(releaseResourceConsumer, allocateResourceConsumer, notifyAllocationFailureConsumer);
}
}

0 comments on commit 89cfeaa

Please sign in to comment.