Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool #5091

Merged
merged 7 commits into from
Dec 14, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;

import javax.annotation.Nullable;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -124,13 +126,14 @@ protected void startClusterComponents(

@Override
protected ResourceManager<?> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;

import javax.annotation.Nullable;

/**
* Entry point for Mesos session clusters.
*/
Expand Down Expand Up @@ -114,13 +116,14 @@ protected void startClusterComponents(

@Override
protected ResourceManager<?> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
public abstract class AbstractQueryableStateTestBase extends TestLogger {

private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS);
public static final long RETRY_TIMEOUT = 50L;

private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
Expand Down Expand Up @@ -196,11 +197,12 @@ public Integer getKey(Tuple2<Integer, Long> value) {

final AtomicLongArray counts = new AtomicLongArray(numKeys);

final List<CompletableFuture<ReducingState<Tuple2<Integer, Long>>>> futures = new ArrayList<>(numKeys);

boolean allNonZero = false;
while (!allNonZero && deadline.hasTimeLeft()) {
allNonZero = true;

final List<CompletableFuture<ReducingState<Tuple2<Integer, Long>>>> futures = new ArrayList<>(numKeys);
futures.clear();

for (int i = 0; i < numKeys; i++) {
final int key = i;
Expand Down Expand Up @@ -712,7 +714,7 @@ public Integer getKey(Tuple2<Integer, Long> value) {
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -785,7 +787,7 @@ public Integer getKey(Tuple2<Integer, Long> value) {
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -877,7 +879,7 @@ public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<O
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -973,7 +975,7 @@ public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<O
results.put(key, res);
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -1050,7 +1052,7 @@ public Integer getKey(Tuple2<Integer, Long> value) {
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -1129,6 +1131,7 @@ private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple
private final int numKeys;
private final ThreadLocalRandom random = ThreadLocalRandom.current();
private volatile boolean isRunning = true;
private int counter = 0;

TestKeyRangeSource(int numKeys) {
this.numKeys = numKeys;
Expand All @@ -1151,9 +1154,13 @@ public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
synchronized (ctx.getCheckpointLock()) {
record.f0 = random.nextInt(numKeys);
ctx.collect(record);
counter++;
}

if (counter % 50 == 0) {
// mild slow down
Thread.sleep(1L);
}
// mild slow down
Thread.sleep(1L);
}
}

Expand Down Expand Up @@ -1327,7 +1334,7 @@ private static <K, S extends State, V> CompletableFuture<S> getKvState(
final TypeInformation<K> keyTypeInfo,
final StateDescriptor<S, V> stateDescriptor,
final boolean failForUnknownKeyOrNamespace,
final ScheduledExecutor executor) throws InterruptedException {
final ScheduledExecutor executor) {

final CompletableFuture<S> resultFuture = new CompletableFuture<>();
getKvStateIgnoringCertainExceptions(
Expand All @@ -1346,10 +1353,9 @@ private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(
final TypeInformation<K> keyTypeInfo,
final StateDescriptor<S, V> stateDescriptor,
final boolean failForUnknownKeyOrNamespace,
final ScheduledExecutor executor) throws InterruptedException {
final ScheduledExecutor executor) {

if (!resultFuture.isDone()) {
Thread.sleep(100L);
CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
expected.whenCompleteAsync((result, throwable) -> {
if (throwable != null) {
Expand All @@ -1360,13 +1366,9 @@ private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(
) {
resultFuture.completeExceptionally(throwable.getCause());
} else if (deadline.hasTimeLeft()) {
try {
getKvStateIgnoringCertainExceptions(
deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
stateDescriptor, failForUnknownKeyOrNamespace, executor);
} catch (InterruptedException e) {
e.printStackTrace();
}
getKvStateIgnoringCertainExceptions(
deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
stateDescriptor, failForUnknownKeyOrNamespace, executor);
}
} else {
resultFuture.complete(result);
Expand Down Expand Up @@ -1410,7 +1412,7 @@ private void executeValueQuery(
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

import java.io.Serializable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ protected void startClusterComponents(
highAvailabilityServices,
heartbeatServices,
metricRegistry,
this);
this,
null);

jobManagerServices = JobManagerServices.fromConfiguration(configuration, blobServer);

Expand Down Expand Up @@ -272,7 +273,8 @@ protected abstract ResourceManager<?> createResourceManager(
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception;
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception;

protected abstract JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ protected void startClusterComponents(
highAvailabilityServices,
heartbeatServices,
metricRegistry,
this);
this,
dispatcherRestEndpoint.getRestAddress());

dispatcher = createDispatcher(
configuration,
Expand Down Expand Up @@ -238,5 +239,6 @@ protected abstract ResourceManager<?> createResourceManager(
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception;
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;

import javax.annotation.Nullable;

/**
* Entry point for the standalone session cluster.
*/
Expand All @@ -52,7 +54,8 @@ protected ResourceManager<?> createResourceManager(
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
Expand All @@ -51,6 +52,8 @@

import org.slf4j.Logger;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -441,9 +444,11 @@ public CompletableFuture<Execution> allocateAndAssignSlotForExecution(
// this method only works if the execution is in the state 'CREATED'
if (transitionState(CREATED, SCHEDULED)) {

final SlotSharingGroupId slotSharingGroupId = sharingGroup != null ? sharingGroup.getSlotSharingGroupId() : null;

ScheduledUnit toSchedule = locationConstraint == null ?
new ScheduledUnit(this, sharingGroup) :
new ScheduledUnit(this, sharingGroup, locationConstraint);
new ScheduledUnit(this, slotSharingGroupId) :
new ScheduledUnit(this, slotSharingGroupId, locationConstraint);

// calculate the preferred locations
final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = calculatePreferredLocations(locationPreferenceConstraint);
Expand All @@ -461,7 +466,7 @@ public CompletableFuture<Execution> allocateAndAssignSlotForExecution(
return this;
} else {
// release the slot
logicalSlot.releaseSlot();
logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));

throw new CompletionException(new FlinkException("Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
}
Expand Down Expand Up @@ -513,7 +518,7 @@ public void deploy() throws JobException {

// race double check, did we fail/cancel and do we need to release the slot?
if (this.state != DEPLOYING) {
slot.releaseSlot();
slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING."));
return;
}

Expand Down Expand Up @@ -622,7 +627,7 @@ else if (current == CREATED || current == SCHEDULED) {
try {
vertex.getExecutionGraph().deregisterExecution(this);

releaseAssignedResource();
releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));
}
finally {
vertex.executionCanceled(this);
Expand Down Expand Up @@ -890,7 +895,7 @@ void markFinished(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics met

updateAccumulatorsAndMetrics(userAccumulators, metrics);

releaseAssignedResource();
releaseAssignedResource(null);

vertex.getExecutionGraph().deregisterExecution(this);
}
Expand Down Expand Up @@ -943,7 +948,7 @@ else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {

if (transitionState(current, CANCELED)) {
try {
releaseAssignedResource();
releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));

vertex.getExecutionGraph().deregisterExecution(this);
}
Expand Down Expand Up @@ -1035,7 +1040,7 @@ private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumul
updateAccumulatorsAndMetrics(userAccumulators, metrics);

try {
releaseAssignedResource();
releaseAssignedResource(t);
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
Expand Down Expand Up @@ -1176,12 +1181,14 @@ private void sendUpdatePartitionInfoRpcCall(
/**
* Releases the assigned resource and completes the release future
* once the assigned resource has been successfully released
*
* @param cause for the resource release, null if none
*/
private void releaseAssignedResource() {
private void releaseAssignedResource(@Nullable Throwable cause) {
final LogicalSlot slot = assignedResource;

if (slot != null) {
slot.releaseSlot().whenComplete(
slot.releaseSlot(cause).whenComplete(
(Object ignored, Throwable throwable) -> {
if (throwable != null) {
releaseFuture.completeExceptionally(throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand Down