Skip to content

Commit

Permalink
[hotfix] Introduce SlotPoolResource and TestingRpcServiceResource
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Jun 27, 2018
1 parent d5919d9 commit 217c312
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSchedulingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSchedulingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
Expand Down Expand Up @@ -109,7 +109,7 @@ public void setup() throws Exception {
final TestingSlotPool slotPool = new TestingSlotPool(
rpcService,
jobId,
PreviousAllocationSchedulingStrategy.getInstance());
LocationPreferenceSchedulingStrategy.getInstance());
testingSlotProvider = new TestingSlotPoolSlotProvider(slotPool);

final JobMasterId jobMasterId = JobMasterId.generate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;

import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -48,7 +53,15 @@
/**
* Test cases for {@link CoLocationConstraint} with the {@link SlotPool}.
*/
public class SlotPoolCoLocationTest extends SlotPoolSchedulingTestBase {
public class SlotPoolCoLocationTest extends TestLogger {

@ClassRule
public static final TestingRpcServiceResource rpcServiceResource = new TestingRpcServiceResource();

@Rule
public final SlotPoolResource slotPoolResource = new SlotPoolResource(
rpcServiceResource.getTestingRpcService(),
PreviousAllocationSchedulingStrategy.getInstance());

/**
* Tests the scheduling of two tasks with a parallelism of 2 and a co-location constraint.
Expand All @@ -57,11 +70,14 @@ public class SlotPoolCoLocationTest extends SlotPoolSchedulingTestBase {
public void testSimpleCoLocatedSlotScheduling() throws ExecutionException, InterruptedException {
final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);

final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();

testingResourceManagerGateway.setRequestSlotConsumer(
(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));

final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();

final SlotPoolGateway slotPoolGateway = slotPoolResource.getSlotPoolGateway();
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();

CoLocationGroup group = new CoLocationGroup();
Expand All @@ -73,6 +89,7 @@ public void testSimpleCoLocatedSlotScheduling() throws ExecutionException, Inter
JobVertexID jobVertexId1 = new JobVertexID();
JobVertexID jobVertexId2 = new JobVertexID();

final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
CompletableFuture<LogicalSlot> logicalSlotFuture11 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId1,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcService;

import org.junit.rules.ExternalResource;

import javax.annotation.Nonnull;

import java.util.concurrent.CompletableFuture;

/**
* {@link ExternalResource} which provides a {@link SlotPool}.
*/
public class SlotPoolResource extends ExternalResource {

@Nonnull
private final RpcService rpcService;

@Nonnull
private final SchedulingStrategy schedulingStrategy;

private SlotPool slotPool;

private SlotPoolGateway slotPoolGateway;

private TestingResourceManagerGateway testingResourceManagerGateway;

public SlotPoolResource(@Nonnull RpcService rpcService, @Nonnull SchedulingStrategy schedulingStrategy) {
this.rpcService = rpcService;
this.schedulingStrategy = schedulingStrategy;
slotPool = null;
slotPoolGateway = null;
testingResourceManagerGateway = null;
}

public SlotProvider getSlotProvider() {
checkInitialized();
return slotPool.getSlotProvider();
}

public TestingResourceManagerGateway getTestingResourceManagerGateway() {
checkInitialized();
return testingResourceManagerGateway;
}

public SlotPoolGateway getSlotPoolGateway() {
checkInitialized();
return slotPoolGateway;
}

private void checkInitialized() {
assert(slotPool != null);
}

@Override
protected void before() throws Throwable {
if (slotPool != null) {
terminateSlotPool();
}

testingResourceManagerGateway = new TestingResourceManagerGateway();

slotPool = new SlotPool(
rpcService,
new JobID(),
schedulingStrategy);

slotPool.start(JobMasterId.generate(), "foobar");

slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);

slotPool.connectToResourceManager(testingResourceManagerGateway);
}

@Override
protected void after() {
if (slotPool != null) {
terminateSlotPool();
slotPool = null;
}
}

private void terminateSlotPool() {
slotPool.shutDown();
CompletableFuture<Void> terminationFuture = slotPool.getTerminationFuture();
terminationFuture.join();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;

import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -49,18 +54,29 @@
/**
* Test cases for slot sharing with the {@link SlotPool}.
*/
public class SlotPoolSlotSharingTest extends SlotPoolSchedulingTestBase {
public class SlotPoolSlotSharingTest extends TestLogger {

@ClassRule
public static final TestingRpcServiceResource testingRpcServiceResource = new TestingRpcServiceResource();

@Rule
public final SlotPoolResource slotPoolResource = new SlotPoolResource(
testingRpcServiceResource.getTestingRpcService(),
PreviousAllocationSchedulingStrategy.getInstance());

@Test
public void testSingleQueuedSharedSlotScheduling() throws Exception {
final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
testingResourceManagerGateway.setRequestSlotConsumer(
(SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));

LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
final SlotPoolGateway slotPoolGateway = slotPoolResource.getSlotPoolGateway();
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();

SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
CompletableFuture<LogicalSlot> logicalSlotFuture = slotProvider.allocateSlot(
new ScheduledUnit(
new JobVertexID(),
Expand Down Expand Up @@ -95,9 +111,11 @@ public void testSingleQueuedSharedSlotScheduling() throws Exception {
@Test
public void testFailingQueuedSharedSlotScheduling() throws ExecutionException, InterruptedException {
final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
testingResourceManagerGateway.setRequestSlotConsumer(
(SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));

final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
CompletableFuture<LogicalSlot> logicalSlotFuture = slotProvider.allocateSlot(
new ScheduledUnit(
new JobVertexID(),
Expand All @@ -110,6 +128,7 @@ public void testFailingQueuedSharedSlotScheduling() throws ExecutionException, I
final AllocationID allocationId = allocationIdFuture.get();

// this should fail the returned logical slot future
final SlotPoolGateway slotPoolGateway = slotPoolResource.getSlotPoolGateway();
slotPoolGateway.failAllocation(allocationId, new FlinkException("Testing Exception"));

try {
Expand All @@ -126,17 +145,20 @@ public void testFailingQueuedSharedSlotScheduling() throws ExecutionException, I
@Test
public void testQueuedSharedSlotScheduling() throws InterruptedException, ExecutionException {
final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
testingResourceManagerGateway.setRequestSlotConsumer(
(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));

final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();

final SlotPoolGateway slotPoolGateway = slotPoolResource.getSlotPoolGateway();
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();

final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
final JobVertexID jobVertexId1 = new JobVertexID();
final JobVertexID jobVertexId2 = new JobVertexID();

final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId1,
Expand Down Expand Up @@ -223,6 +245,7 @@ public void testQueuedSharedSlotScheduling() throws InterruptedException, Execut
public void testQueuedMultipleSlotSharingGroups() throws ExecutionException, InterruptedException {
final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(4);

final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
testingResourceManagerGateway.setRequestSlotConsumer(
(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));

Expand All @@ -234,8 +257,10 @@ public void testQueuedMultipleSlotSharingGroups() throws ExecutionException, Int
final JobVertexID jobVertexId3 = new JobVertexID();
final JobVertexID jobVertexId4 = new JobVertexID();

final SlotPoolGateway slotPoolGateway = slotPoolResource.getSlotPoolGateway();
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();

final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
new ScheduledUnit(
jobVertexId1,
Expand Down

0 comments on commit 217c312

Please sign in to comment.