Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.jobmaster;

import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
Expand Down Expand Up @@ -135,14 +134,6 @@ default CompletableFuture<?> releaseSlot() {
*/
SlotRequestId getSlotRequestId();

/**
* Gets the slot sharing group id to which this slot belongs.
*
* @return slot sharing group id of this slot or null, if none.
*/
@Nullable
SlotSharingGroupId getSlotSharingGroupId();

/** Payload for a logical slot. */
interface Payload {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
Expand Down Expand Up @@ -50,7 +49,6 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
private final SlotContext slotContext;

// null if the logical slot does not belong to a slot sharing group, otherwise non-null
@Nullable private final SlotSharingGroupId slotSharingGroupId;

// locality of this slot wrt the requested preferred locations
private final Locality locality;
Expand All @@ -72,23 +70,20 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
public SingleLogicalSlot(
SlotRequestId slotRequestId,
SlotContext slotContext,
@Nullable SlotSharingGroupId slotSharingGroupId,
Locality locality,
SlotOwner slotOwner) {

this(slotRequestId, slotContext, slotSharingGroupId, locality, slotOwner, true);
this(slotRequestId, slotContext, locality, slotOwner, true);
}

public SingleLogicalSlot(
SlotRequestId slotRequestId,
SlotContext slotContext,
@Nullable SlotSharingGroupId slotSharingGroupId,
Locality locality,
SlotOwner slotOwner,
boolean willBeOccupiedIndefinitely) {
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
this.slotContext = Preconditions.checkNotNull(slotContext);
this.slotSharingGroupId = slotSharingGroupId;
this.locality = Preconditions.checkNotNull(locality);
this.slotOwner = Preconditions.checkNotNull(slotOwner);
this.willBeOccupiedIndefinitely = willBeOccupiedIndefinitely;
Expand Down Expand Up @@ -154,12 +149,6 @@ public SlotRequestId getSlotRequestId() {
return slotRequestId;
}

@Nullable
@Override
public SlotSharingGroupId getSlotSharingGroupId() {
return slotSharingGroupId;
}

public static SingleLogicalSlot allocateFromPhysicalSlot(
final SlotRequestId slotRequestId,
final PhysicalSlot physicalSlot,
Expand All @@ -171,7 +160,6 @@ public static SingleLogicalSlot allocateFromPhysicalSlot(
new SingleLogicalSlot(
slotRequestId,
physicalSlot,
null,
locality,
slotOwner,
slotWillBeOccupiedIndefinitely);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ private SingleLogicalSlot createLogicalSlot(
return new SingleLogicalSlot(
logicalSlotRequestId,
physicalSlot,
null,
Locality.UNKNOWN,
this,
slotWillBeOccupiedIndefinitely);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,7 @@ private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation loca

LogicalSlot slot =
new SingleLogicalSlot(
new SlotRequestId(),
slotContext,
null,
Locality.LOCAL,
mock(SlotOwner.class));
new SlotRequestId(), slotContext, Locality.LOCAL, mock(SlotOwner.class));

if (!vertex.getCurrentExecutionAttempt().tryAssignResource(slot)) {
throw new FlinkException("Could not assign resource.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.jobmaster;

import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
Expand Down Expand Up @@ -51,8 +50,6 @@ public class TestingLogicalSlot implements LogicalSlot {

private final SlotRequestId slotRequestId;

@Nullable private final SlotSharingGroupId slotSharingGroupId;

private boolean released;

TestingLogicalSlot(
Expand All @@ -61,7 +58,6 @@ public class TestingLogicalSlot implements LogicalSlot {
int slotNumber,
AllocationID allocationId,
SlotRequestId slotRequestId,
@Nullable SlotSharingGroupId slotSharingGroupId,
boolean automaticallyCompleteReleaseFuture,
SlotOwner slotOwner) {

Expand All @@ -71,7 +67,6 @@ public class TestingLogicalSlot implements LogicalSlot {
this.slotNumber = slotNumber;
this.allocationId = Preconditions.checkNotNull(allocationId);
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
this.slotSharingGroupId = slotSharingGroupId;
this.releaseFuture = new CompletableFuture<>();
this.automaticallyCompleteReleaseFuture = automaticallyCompleteReleaseFuture;
this.slotOwner = Preconditions.checkNotNull(slotOwner);
Expand Down Expand Up @@ -141,12 +136,6 @@ public SlotRequestId getSlotRequestId() {
return slotRequestId;
}

@Nullable
@Override
public SlotSharingGroupId getSlotSharingGroupId() {
return slotSharingGroupId;
}

public CompletableFuture<?> getReleaseFuture() {
return releaseFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
Expand All @@ -33,7 +32,6 @@ public class TestingLogicalSlotBuilder {
private int slotNumber = 0;
private AllocationID allocationId = new AllocationID();
private SlotRequestId slotRequestId = new SlotRequestId();
private SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
private SlotOwner slotOwner = new DummySlotOwner();
private boolean automaticallyCompleteReleaseFuture = true;

Expand Down Expand Up @@ -63,11 +61,6 @@ public TestingLogicalSlotBuilder setSlotRequestId(SlotRequestId slotRequestId) {
return this;
}

public TestingLogicalSlotBuilder setSlotSharingGroupId(SlotSharingGroupId slotSharingGroupId) {
this.slotSharingGroupId = slotSharingGroupId;
return this;
}

public TestingLogicalSlotBuilder setAutomaticallyCompleteReleaseFuture(
boolean automaticallyCompleteReleaseFuture) {
this.automaticallyCompleteReleaseFuture = automaticallyCompleteReleaseFuture;
Expand All @@ -86,7 +79,6 @@ public TestingLogicalSlot createTestingLogicalSlot() {
slotNumber,
allocationId,
slotRequestId,
slotSharingGroupId,
automaticallyCompleteReleaseFuture,
slotOwner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private SingleLogicalSlot createSingleLogicalSlot() {

private SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner) {
return new SingleLogicalSlot(
new SlotRequestId(), createSlotContext(), null, Locality.LOCAL, slotOwner);
new SlotRequestId(), createSlotContext(), Locality.LOCAL, slotOwner);
}

private static SlotContext createSlotContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId;
import static org.apache.flink.runtime.scheduler.SharedSlotTestingUtils.createExecutionSlotSharingGroup;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -107,7 +106,6 @@ public void testLogicalSlotAllocation() {
assertThat(logicalSlot.getTaskManagerGateway(), is(taskManagerGateway));
assertThat(logicalSlot.getPhysicalSlotNumber(), is(physicalSlotNumber));
assertThat(logicalSlot.getLocality(), is(Locality.UNKNOWN));
assertThat(logicalSlot.getSlotSharingGroupId(), nullValue());
}

@Test
Expand Down