Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18709][Coordination] Implement PhysicalSlotProvider #13018

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.runtime.jobmaster.SlotRequestId;

import java.util.concurrent.CompletableFuture;

/**
* The provider serves physical slot requests.
*/
interface PhysicalSlotProvider {

/**
* Submit a request to allocate a physical slot.
*
* <p>The physical slot can be either allocated from the slots, which are already available for the job,
* or a new one can be requeted from the resource manager.
*
* @param physicalSlotRequest slot requirements
* @return a future of the allocated slot
*/
CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(PhysicalSlotRequest physicalSlotRequest);

/**
* Cancels the slot request with the given {@link SlotRequestId}.
*
* <p>If the request is already fulfilled with a physical slot, the slot will be released.
*
* @param slotRequestId identifying the slot request to cancel
* @param cause of the cancellation
*/
void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause);
}
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmaster.SlotRequestId;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;

class PhysicalSlotProviderImpl implements PhysicalSlotProvider {
private static final Logger LOG = LoggerFactory.getLogger(PhysicalSlotProviderImpl.class);

private final SlotSelectionStrategy slotSelectionStrategy;

private final SlotPool slotPool;

PhysicalSlotProviderImpl(SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) {
this.slotSelectionStrategy = checkNotNull(slotSelectionStrategy);
this.slotPool = checkNotNull(slotPool);
slotPool.disableBatchSlotRequestTimeoutCheck();
}

@Override
public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(PhysicalSlotRequest physicalSlotRequest) {
SlotRequestId slotRequestId = physicalSlotRequest.getSlotRequestId();
SlotProfile slotProfile = physicalSlotRequest.getSlotProfile();
ResourceProfile resourceProfile = slotProfile.getPhysicalSlotResourceProfile();

LOG.debug("Received slot request [{}] with resource requirements: {}", slotRequestId, resourceProfile);

Optional<PhysicalSlot> availablePhysicalSlot = tryAllocateFromAvailable(slotRequestId, slotProfile);

CompletableFuture<PhysicalSlot> slotFuture;
slotFuture = availablePhysicalSlot
.map(CompletableFuture::completedFuture)
.orElseGet(() -> requestNewSlot(
slotRequestId,
resourceProfile,
physicalSlotRequest.willSlotBeOccupiedIndefinitely()));

return slotFuture.thenApply(physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, physicalSlot));
}

private Optional<PhysicalSlot> tryAllocateFromAvailable(SlotRequestId slotRequestId, SlotProfile slotProfile) {
Collection<SlotSelectionStrategy.SlotInfoAndResources> slotInfoList =
slotPool.getAvailableSlotsInformation()
.stream()
.map(SlotSelectionStrategy.SlotInfoAndResources::fromSingleSlot)
.collect(Collectors.toList());

Optional<SlotSelectionStrategy.SlotInfoAndLocality> selectedAvailableSlot =
slotSelectionStrategy.selectBestSlotForProfile(slotInfoList, slotProfile);

return selectedAvailableSlot.flatMap(
slotInfoAndLocality -> slotPool.allocateAvailableSlot(
slotRequestId,
slotInfoAndLocality.getSlotInfo().getAllocationId())
);
}

private CompletableFuture<PhysicalSlot> requestNewSlot(
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
boolean willSlotBeOccupiedIndefinitely) {
return willSlotBeOccupiedIndefinitely ?
slotPool.requestNewAllocatedSlot(slotRequestId, resourceProfile, null) :
slotPool.requestNewAllocatedBatchSlot(slotRequestId, resourceProfile);
}

@Override
public void cancelSlotRequest(SlotRequestId slotRequestId, Throwable cause) {
slotPool.releaseSlot(slotRequestId, cause);
}
}
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -110,7 +111,7 @@ public void testBulkSlotAllocationFulfilledWithAvailableSlots() throws Exception

final CompletableFuture<Collection<PhysicalSlotRequest.Result>> slotFutures = allocateSlots(requests);

final Collection<PhysicalSlotRequest.Result> results = slotFutures.get(TIMEOUT.getSize(), TIMEOUT.getUnit());
final Collection<PhysicalSlotRequest.Result> results = slotFutures.get();
final Collection<SlotRequestId> resultRequestIds = results.stream()
.map(PhysicalSlotRequest.Result::getSlotRequestId)
.collect(Collectors.toList());
Expand All @@ -119,7 +120,7 @@ public void testBulkSlotAllocationFulfilledWithAvailableSlots() throws Exception
}

@Test
public void testBulkSlotAllocationFulfilledWithNewSlots() {
public void testBulkSlotAllocationFulfilledWithNewSlots() throws ExecutionException, InterruptedException {
final List<PhysicalSlotRequest> requests = Arrays.asList(
createPhysicalSlotRequest(),
createPhysicalSlotRequest());
Expand All @@ -131,8 +132,7 @@ public void testBulkSlotAllocationFulfilledWithNewSlots() {

addSlotToSlotPool();

assertThat(slotFutures.isDone(), is(true));
assertThat(slotFutures.isCompletedExceptionally(), is(false));
slotFutures.get();
}

@Test
Expand Down
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

/**
* Tests for {@link PhysicalSlotProviderImpl}.
*/
public class PhysicalSlotProviderImplTest {
private static ScheduledExecutorService singleThreadScheduledExecutorService;

private static ComponentMainThreadExecutor mainThreadExecutor;

private TestingSlotPoolImpl slotPool;

private PhysicalSlotProvider physicalSlotProvider;

@BeforeClass
public static void setupClass() {
singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
}

@AfterClass
public static void teardownClass() {
if (singleThreadScheduledExecutorService != null) {
singleThreadScheduledExecutorService.shutdownNow();
}
}

@Before
public void setup() throws Exception {
slotPool = new SlotPoolBuilder(mainThreadExecutor).build();
physicalSlotProvider = new PhysicalSlotProviderImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
}

@After
public void teardown() {
CompletableFuture.runAsync(() -> slotPool.close(), mainThreadExecutor).join();
}

@Test
public void testBulkSlotAllocationFulfilledWithAvailableSlots() throws InterruptedException, ExecutionException {
PhysicalSlotRequest request = createPhysicalSlotRequest();
addSlotToSlotPool();
CompletableFuture<PhysicalSlotRequest.Result> slotFuture = allocateSlot(request);
PhysicalSlotRequest.Result result = slotFuture.get();
zhuzhurk marked this conversation as resolved.
Show resolved Hide resolved
assertThat(result.getSlotRequestId(), is(request.getSlotRequestId()));
}

@Test
public void testBulkSlotAllocationFulfilledWithNewSlots() throws ExecutionException, InterruptedException {
final CompletableFuture<PhysicalSlotRequest.Result> slotFuture = allocateSlot(createPhysicalSlotRequest());
assertThat(slotFuture.isDone(), is(false));
addSlotToSlotPool();
slotFuture.get();
}

private CompletableFuture<PhysicalSlotRequest.Result> allocateSlot(PhysicalSlotRequest request) {
return CompletableFuture
.supplyAsync(
() -> physicalSlotProvider.allocatePhysicalSlot(request),
mainThreadExecutor)
.thenCompose(Function.identity());
}

private void addSlotToSlotPool() {
SlotPoolUtils.offerSlots(slotPool, mainThreadExecutor, Collections.singletonList(ResourceProfile.ANY));
}

private static PhysicalSlotRequest createPhysicalSlotRequest() {
return new PhysicalSlotRequest(
new SlotRequestId(),
SlotProfile.noLocality(ResourceProfile.UNKNOWN),
true);
}
}