diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java index 027c0dfd80ffe..5ea1999a94ca0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -135,14 +134,6 @@ default CompletableFuture releaseSlot() { */ SlotRequestId getSlotRequestId(); - /** - * Gets the slot sharing group id to which this slot belongs. - * - * @return slot sharing group id of this slot or null, if none. - */ - @Nullable - SlotSharingGroupId getSlotSharingGroupId(); - /** Payload for a logical slot. */ interface Payload { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java index f8e767b4125c3..ccf0bca5a6814 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.LogicalSlot; @@ -50,7 +49,6 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload { private final SlotContext slotContext; // null if the logical slot does not belong to a slot sharing group, otherwise non-null - @Nullable private final SlotSharingGroupId slotSharingGroupId; // locality of this slot wrt the requested preferred locations private final Locality locality; @@ -72,23 +70,20 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload { public SingleLogicalSlot( SlotRequestId slotRequestId, SlotContext slotContext, - @Nullable SlotSharingGroupId slotSharingGroupId, Locality locality, SlotOwner slotOwner) { - this(slotRequestId, slotContext, slotSharingGroupId, locality, slotOwner, true); + this(slotRequestId, slotContext, locality, slotOwner, true); } public SingleLogicalSlot( SlotRequestId slotRequestId, SlotContext slotContext, - @Nullable SlotSharingGroupId slotSharingGroupId, Locality locality, SlotOwner slotOwner, boolean willBeOccupiedIndefinitely) { this.slotRequestId = Preconditions.checkNotNull(slotRequestId); this.slotContext = Preconditions.checkNotNull(slotContext); - this.slotSharingGroupId = slotSharingGroupId; this.locality = Preconditions.checkNotNull(locality); this.slotOwner = Preconditions.checkNotNull(slotOwner); this.willBeOccupiedIndefinitely = willBeOccupiedIndefinitely; @@ -154,12 +149,6 @@ public SlotRequestId getSlotRequestId() { return slotRequestId; } - @Nullable - @Override - public SlotSharingGroupId getSlotSharingGroupId() { - return slotSharingGroupId; - } - public static SingleLogicalSlot allocateFromPhysicalSlot( final SlotRequestId slotRequestId, final PhysicalSlot physicalSlot, @@ -171,7 +160,6 @@ public static SingleLogicalSlot allocateFromPhysicalSlot( new SingleLogicalSlot( slotRequestId, physicalSlot, - null, locality, slotOwner, slotWillBeOccupiedIndefinitely); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java index 4d180352ce9d0..24f0444df7d3f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java @@ -183,7 +183,6 @@ private SingleLogicalSlot createLogicalSlot( return new SingleLogicalSlot( logicalSlotRequestId, physicalSlot, - null, Locality.UNKNOWN, this, slotWillBeOccupiedIndefinitely); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java index a2e6f23756367..bc5d84a4a967a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java @@ -237,11 +237,7 @@ private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation loca LogicalSlot slot = new SingleLogicalSlot( - new SlotRequestId(), - slotContext, - null, - Locality.LOCAL, - mock(SlotOwner.class)); + new SlotRequestId(), slotContext, Locality.LOCAL, mock(SlotOwner.class)); if (!vertex.getCurrentExecutionAttempt().tryAssignResource(slot)) { throw new FlinkException("Could not assign resource."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java index 2245c7492f802..32e8359079eb8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlot.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -51,8 +50,6 @@ public class TestingLogicalSlot implements LogicalSlot { private final SlotRequestId slotRequestId; - @Nullable private final SlotSharingGroupId slotSharingGroupId; - private boolean released; TestingLogicalSlot( @@ -61,7 +58,6 @@ public class TestingLogicalSlot implements LogicalSlot { int slotNumber, AllocationID allocationId, SlotRequestId slotRequestId, - @Nullable SlotSharingGroupId slotSharingGroupId, boolean automaticallyCompleteReleaseFuture, SlotOwner slotOwner) { @@ -71,7 +67,6 @@ public class TestingLogicalSlot implements LogicalSlot { this.slotNumber = slotNumber; this.allocationId = Preconditions.checkNotNull(allocationId); this.slotRequestId = Preconditions.checkNotNull(slotRequestId); - this.slotSharingGroupId = slotSharingGroupId; this.releaseFuture = new CompletableFuture<>(); this.automaticallyCompleteReleaseFuture = automaticallyCompleteReleaseFuture; this.slotOwner = Preconditions.checkNotNull(slotOwner); @@ -141,12 +136,6 @@ public SlotRequestId getSlotRequestId() { return slotRequestId; } - @Nullable - @Override - public SlotSharingGroupId getSlotSharingGroupId() { - return slotSharingGroupId; - } - public CompletableFuture getReleaseFuture() { return releaseFuture; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java index f5c04439c522f..ac3366f8d08ae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingLogicalSlotBuilder.java @@ -20,7 +20,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; @@ -33,7 +32,6 @@ public class TestingLogicalSlotBuilder { private int slotNumber = 0; private AllocationID allocationId = new AllocationID(); private SlotRequestId slotRequestId = new SlotRequestId(); - private SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId(); private SlotOwner slotOwner = new DummySlotOwner(); private boolean automaticallyCompleteReleaseFuture = true; @@ -63,11 +61,6 @@ public TestingLogicalSlotBuilder setSlotRequestId(SlotRequestId slotRequestId) { return this; } - public TestingLogicalSlotBuilder setSlotSharingGroupId(SlotSharingGroupId slotSharingGroupId) { - this.slotSharingGroupId = slotSharingGroupId; - return this; - } - public TestingLogicalSlotBuilder setAutomaticallyCompleteReleaseFuture( boolean automaticallyCompleteReleaseFuture) { this.automaticallyCompleteReleaseFuture = automaticallyCompleteReleaseFuture; @@ -86,7 +79,6 @@ public TestingLogicalSlot createTestingLogicalSlot() { slotNumber, allocationId, slotRequestId, - slotSharingGroupId, automaticallyCompleteReleaseFuture, slotOwner); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java index 99f75c2edadf7..ce981da04f590 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java @@ -71,7 +71,7 @@ private SingleLogicalSlot createSingleLogicalSlot() { private SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner) { return new SingleLogicalSlot( - new SlotRequestId(), createSlotContext(), null, Locality.LOCAL, slotOwner); + new SlotRequestId(), createSlotContext(), Locality.LOCAL, slotOwner); } private static SlotContext createSlotContext() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java index 885c2c9e243e7..ac633c62293f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java @@ -40,7 +40,6 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId; import static org.apache.flink.runtime.scheduler.SharedSlotTestingUtils.createExecutionSlotSharingGroup; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -107,7 +106,6 @@ public void testLogicalSlotAllocation() { assertThat(logicalSlot.getTaskManagerGateway(), is(taskManagerGateway)); assertThat(logicalSlot.getPhysicalSlotNumber(), is(physicalSlotNumber)); assertThat(logicalSlot.getLocality(), is(Locality.UNKNOWN)); - assertThat(logicalSlot.getSlotSharingGroupId(), nullValue()); } @Test