Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8088] Associate logical slots with the slot request id #5089

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;

import javax.annotation.Nullable;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -124,13 +126,14 @@ protected void startClusterComponents(

@Override
protected ResourceManager<?> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;

import javax.annotation.Nullable;

/**
* Entry point for Mesos session clusters.
*/
Expand Down Expand Up @@ -114,13 +116,14 @@ protected void startClusterComponents(

@Override
protected ResourceManager<?> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ protected void startClusterComponents(
highAvailabilityServices,
heartbeatServices,
metricRegistry,
this);
this,
null);

jobManagerServices = JobManagerServices.fromConfiguration(configuration, blobServer);

Expand Down Expand Up @@ -272,7 +273,8 @@ protected abstract ResourceManager<?> createResourceManager(
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception;
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception;

protected abstract JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ protected void startClusterComponents(
highAvailabilityServices,
heartbeatServices,
metricRegistry,
this);
this,
dispatcherRestEndpoint.getRestAddress());

dispatcher = createDispatcher(
configuration,
Expand Down Expand Up @@ -238,5 +239,6 @@ protected abstract ResourceManager<?> createResourceManager(
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception;
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;

import javax.annotation.Nullable;

/**
* Entry point for the standalone session cluster.
*/
Expand All @@ -52,7 +54,8 @@ protected ResourceManager<?> createResourceManager(
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
FatalErrorHandler fatalErrorHandler,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@
* limitations under the License.
*/

package org.apache.flink.runtime.jobmanager.slots;
package org.apache.flink.runtime.instance;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotException;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;

import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -41,10 +48,7 @@
public class AllocatedSlot {

/** The ID under which the slot is allocated. Uniquely identifies the slot. */
private final AllocationID slotAllocationId;

/** The ID of the job this slot is allocated for */
private final JobID jobID;
private final AllocationID allocationId;

/** The location information of the TaskManager to which this slot belongs */
private final TaskManagerLocation taskManagerLocation;
Expand All @@ -56,23 +60,29 @@ public class AllocatedSlot {
private final TaskManagerGateway taskManagerGateway;

/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
private final int slotNumber;
private final int physicalSlotNumber;

private final SlotOwner slotOwner;

private final AtomicReference<LogicalSlot> logicalSlotReference;

// ------------------------------------------------------------------------

public AllocatedSlot(
AllocationID slotAllocationId,
JobID jobID,
AllocationID allocationId,
TaskManagerLocation location,
int slotNumber,
int physicalSlotNumber,
ResourceProfile resourceProfile,
TaskManagerGateway taskManagerGateway) {
this.slotAllocationId = checkNotNull(slotAllocationId);
this.jobID = checkNotNull(jobID);
TaskManagerGateway taskManagerGateway,
SlotOwner slotOwner) {
this.allocationId = checkNotNull(allocationId);
this.taskManagerLocation = checkNotNull(location);
this.slotNumber = slotNumber;
this.physicalSlotNumber = physicalSlotNumber;
this.resourceProfile = checkNotNull(resourceProfile);
this.taskManagerGateway = checkNotNull(taskManagerGateway);
this.slotOwner = checkNotNull(slotOwner);

logicalSlotReference = new AtomicReference<>(null);
}

// ------------------------------------------------------------------------
Expand All @@ -82,8 +92,8 @@ public AllocatedSlot(
*
* @return The ID under which the slot is allocated
*/
public AllocationID getSlotAllocationId() {
return slotAllocationId;
public AllocationID getAllocationId() {
return allocationId;
}

/**
Expand All @@ -97,22 +107,13 @@ public ResourceID getTaskManagerId() {
return getTaskManagerLocation().getResourceID();
}

/**
* Returns the ID of the job this allocated slot belongs to.
*
* @return the ID of the job this allocated slot belongs to
*/
public JobID getJobID() {
return jobID;
}

/**
* Gets the number of the slot.
*
* @return The number of the slot on the TaskManager.
*/
public int getSlotNumber() {
return slotNumber;
public int getPhysicalSlotNumber() {
return physicalSlotNumber;
}

/**
Expand Down Expand Up @@ -144,6 +145,86 @@ public TaskManagerGateway getTaskManagerGateway() {
return taskManagerGateway;
}

/**
* Triggers the release of the logical slot.
*/
public void triggerLogicalSlotRelease() {
final LogicalSlot logicalSlot = logicalSlotReference.get();

if (logicalSlot != null) {
logicalSlot.releaseSlot();
}
}

/**
* Releases the logical slot.
*
* @return true if the logical slot could be released, false otherwise.
*/
public boolean releaseLogicalSlot() {
final LogicalSlot logicalSlot = logicalSlotReference.get();

if (logicalSlot != null) {
if (logicalSlot instanceof Slot) {
final Slot slot = (Slot) logicalSlot;
if (slot.markReleased()) {
logicalSlotReference.set(null);
return true;
}
} else {
throw new RuntimeException("Unsupported logical slot type encountered " + logicalSlot.getClass());
}

}

return false;
}

/**
* Allocates a logical {@link SimpleSlot}.
*
* @param slotRequestId identifying the corresponding slot request
* @param locality specifying the locality of the allocated slot
* @return an allocated logical simple slot
* @throws SlotException if we could not allocate a simple slot
*/
public SimpleSlot allocateSimpleSlot(SlotRequestID slotRequestId, Locality locality) throws SlotException {
final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext(
slotRequestId);

final SimpleSlot simpleSlot = new SimpleSlot(allocatedSlotContext, slotOwner, physicalSlotNumber);

if (logicalSlotReference.compareAndSet(null, simpleSlot)) {
simpleSlot.setLocality(locality);
return simpleSlot;
} else {
throw new SlotException("Could not allocate logical simple slot because the allocated slot is already used.");
}
}

/**
* Allocates a logical {@link SharedSlot}.
*
* @param slotRequestId identifying the corresponding slot request
* @param slotSharingGroupAssignment the slot sharing group to which the shared slot shall belong
* @return an allocated logical shared slot
* @throws SlotException if we could not allocate a shared slot
*/
public SharedSlot allocateSharedSlot(SlotRequestID slotRequestId, SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException {

final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext(
slotRequestId);
final SharedSlot sharedSlot = new SharedSlot(allocatedSlotContext, slotOwner, slotSharingGroupAssignment);

if (logicalSlotReference.compareAndSet(null, sharedSlot)) {


return sharedSlot;
} else {
throw new SlotException("Could not allocate logical shared slot because the allocated slot is already used.");
}
}

// ------------------------------------------------------------------------

/**
Expand All @@ -164,6 +245,43 @@ public final boolean equals(Object obj) {

@Override
public String toString() {
return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + slotNumber;
return "AllocatedSlot " + allocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
}

/**
* Slot context for {@link AllocatedSlot}.
*/
private final class AllocatedSlotContext implements SlotContext {

private final SlotRequestID slotRequestId;

private AllocatedSlotContext(SlotRequestID slotRequestId) {
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
}

@Override
public SlotRequestID getSlotRequestId() {
return slotRequestId;
}

@Override
public AllocationID getAllocationId() {
return allocationId;
}

@Override
public TaskManagerLocation getTaskManagerLocation() {
return taskManagerLocation;
}

@Override
public int getPhysicalSlotNumber() {
return physicalSlotNumber;
}

@Override
public TaskManagerGateway getTaskManagerGateway() {
return taskManagerGateway;
}
}
}