Skip to content

Commit

Permalink
[FLINK-6434] [tests] Harden and speed up SlotPoolRpcTest
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Nov 10, 2017
1 parent f748197 commit 2f9eb51
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 148 deletions.
Expand Up @@ -42,10 +42,13 @@
import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.clock.Clock; import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.SystemClock; import org.apache.flink.runtime.util.clock.SystemClock;
import org.apache.flink.util.Preconditions;


import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import javax.annotation.Nullable;

import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
Expand Down Expand Up @@ -279,23 +282,25 @@ public void returnAllocatedSlot(Slot slot) {
} }


@Override @Override
public void cancelSlotAllocation(AllocationID allocationID) { public CompletableFuture<Acknowledge> cancelSlotAllocation(AllocationID allocationId) {
if (waitingForResourceManager.remove(allocationID) == null) { final PendingRequest pendingRequest = removePendingRequest(allocationId);


PendingRequest request = pendingRequests.remove(allocationID); if (pendingRequest != null) {
if (request != null) { failPendingRequest(pendingRequest, new CancellationException("Allocation " + allocationId + " cancelled."));
failPendingRequest(request, new CancellationException("Allocation " + allocationID + " cancelled")); } else {
} else { final Slot slot = allocatedSlots.get(allocationId);


Slot slot = allocatedSlots.get(allocationID); if (slot != null) {
if (slot != null) { LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", slot, allocationId);
LOG.info("Return allocated slot {} by cancelling allocation {}.", slot, allocationID); if (slot.markCancelled()) {
if (slot.markCancelled()) { internalReturnAllocatedSlot(slot);
internalReturnAllocatedSlot(slot);
}
} }
} else {
LOG.debug("There was no slot allocation with {} to be cancelled.", allocationId);
} }
} }

return CompletableFuture.completedFuture(Acknowledge.get());
} }


CompletableFuture<SimpleSlot> internalAllocateSlot( CompletableFuture<SimpleSlot> internalAllocateSlot(
Expand Down Expand Up @@ -328,6 +333,28 @@ CompletableFuture<SimpleSlot> internalAllocateSlot(
return future; return future;
} }


/**
* Checks whether there exists a pending request with the given allocation id and removes it
* from the internal data structures.
*
* @param allocationId identifying the pending request
* @return pending request if there is one, otherwise null
*/
@Nullable
private PendingRequest removePendingRequest(AllocationID allocationId) {
PendingRequest result = waitingForResourceManager.remove(allocationId);

if (result != null) {
// sanity check
assert !pendingRequests.containsKey(allocationId) : "A pending requests should only be part of either " +
"the pendingRequests or waitingForResourceManager but not both.";

return result;
} else {
return pendingRequests.remove(allocationId);
}
}

private void requestSlotFromResourceManager( private void requestSlotFromResourceManager(
final AllocationID allocationID, final AllocationID allocationID,
final CompletableFuture<SimpleSlot> future, final CompletableFuture<SimpleSlot> future,
Expand Down Expand Up @@ -396,6 +423,9 @@ private void checkTimeoutSlotAllocation(AllocationID allocationID) {
} }


private void failPendingRequest(PendingRequest pendingRequest, Exception e) { private void failPendingRequest(PendingRequest pendingRequest, Exception e) {
Preconditions.checkNotNull(pendingRequest);
Preconditions.checkNotNull(e);

if (!pendingRequest.getFuture().isDone()) { if (!pendingRequest.getFuture().isDone()) {
pendingRequest.getFuture().completeExceptionally(e); pendingRequest.getFuture().completeExceptionally(e);
} }
Expand All @@ -422,8 +452,9 @@ public void run() {
private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) { private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) {
PendingRequest request = waitingForResourceManager.remove(allocationID); PendingRequest request = waitingForResourceManager.remove(allocationID);
if (request != null) { if (request != null) {
failPendingRequest(request, new NoResourceAvailableException( failPendingRequest(
"No slot available and no connection to Resource Manager established.")); request,
new NoResourceAvailableException("No slot available and no connection to Resource Manager established."));
} }
} }


Expand Down Expand Up @@ -632,10 +663,13 @@ else if (availableSlots.tryRemove(allocationID)) {
* Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool. * Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool.
* *
* @param resourceID The id of the TaskManager * @param resourceID The id of the TaskManager
* @return Future acknowledge if th operation was successful
*/ */
@Override @Override
public void registerTaskManager(final ResourceID resourceID) { public CompletableFuture<Acknowledge> registerTaskManager(final ResourceID resourceID) {
registeredTaskManagers.add(resourceID); registeredTaskManagers.add(resourceID);

return CompletableFuture.completedFuture(Acknowledge.get());
} }


/** /**
Expand Down Expand Up @@ -684,13 +718,14 @@ AvailableSlots getAvailableSlots() {
return availableSlots; return availableSlots;
} }


public CompletableFuture<Integer> getNumberOfWaitingForResourceRequests() { @VisibleForTesting
return CompletableFuture.completedFuture(waitingForResourceManager.size()); public HashMap<AllocationID, PendingRequest> getPendingRequests() {
return pendingRequests;
} }


@Override @VisibleForTesting
public CompletableFuture<Integer> getNumberOfPendingRequests() { public HashMap<AllocationID, PendingRequest> getWaitingForResourceManager() {
return CompletableFuture.completedFuture(pendingRequests.size()); return waitingForResourceManager;
} }


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down
Expand Up @@ -18,7 +18,6 @@


package org.apache.flink.runtime.instance; package org.apache.flink.runtime.instance;


import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.AllocationID;
Expand Down Expand Up @@ -72,7 +71,7 @@ public interface SlotPoolGateway extends RpcGateway {
// registering / un-registering TaskManagers and slots // registering / un-registering TaskManagers and slots
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


void registerTaskManager(ResourceID resourceID); CompletableFuture<Acknowledge> registerTaskManager(ResourceID resourceID);


CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID); CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);


Expand All @@ -96,20 +95,11 @@ CompletableFuture<SimpleSlot> allocateSlot(
void returnAllocatedSlot(Slot slot); void returnAllocatedSlot(Slot slot);


/** /**
* Cancel a slot allocation. * Cancel a slot allocation. This method should be called when the CompletableFuture returned by
* This method should be called when the CompletableFuture returned by allocateSlot completed exceptionally. * allocateSlot completed exceptionally.
* *
* @param allocationID the unique id for the previous allocation * @param allocationId identifying the slot allocation request
* @return Future acknowledge if the slot allocation has been cancelled
*/ */
void cancelSlotAllocation(AllocationID allocationID); CompletableFuture<Acknowledge> cancelSlotAllocation(AllocationID allocationId);

// ------------------------------------------------------------------------
// exposing internal statistic, mainly for testing
// ------------------------------------------------------------------------

@VisibleForTesting
CompletableFuture<Integer> getNumberOfWaitingForResourceRequests();

@VisibleForTesting
CompletableFuture<Integer> getNumberOfPendingRequests();
} }

0 comments on commit 2f9eb51

Please sign in to comment.