Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20850][runtime] Removing usage of CoLocationConstraints #14584

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -33,8 +33,6 @@
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
Expand Down Expand Up @@ -91,8 +89,6 @@ public class ExecutionVertex
/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations. */
private final String taskNameWithSubtask;

private CoLocationConstraint locationConstraint;

/** The current or latest execution attempt of this vertex's task. */
private Execution currentExecution; // this field must never be null

Expand Down Expand Up @@ -153,14 +149,6 @@ public class ExecutionVertex
createTimestamp,
timeout);

// create a co-location scheduling hint, if necessary
CoLocationGroup clg = jobVertex.getCoLocationGroup();
if (clg != null) {
this.locationConstraint = clg.getLocationConstraint(subTaskIndex);
} else {
this.locationConstraint = null;
}

getExecutionGraph().registerExecution(currentExecution);

this.timeout = timeout;
Expand Down Expand Up @@ -237,10 +225,6 @@ public ExecutionEdge[][] getAllInputEdges() {
return inputEdges;
}

public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}

public InputSplit getNextInputSplit(String host) {
final int taskId = getParallelSubtaskIndex();
synchronized (inputSplits) {
Expand Down Expand Up @@ -647,11 +631,6 @@ private Execution resetForNewExecutionInternal(
}
}

CoLocationGroup grp = jobVertex.getCoLocationGroup();
if (grp != null) {
locationConstraint = grp.getLocationConstraint(subTaskIndex);
}

// register this execution at the execution graph, to receive call backs
getExecutionGraph().registerExecution(newExecution);

Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.IterableUtils;
Expand Down Expand Up @@ -331,15 +331,11 @@ public Set<SlotSharingGroup> getSlotSharingGroups() {
return Collections.unmodifiableSet(slotSharingGroups);
}

public Set<CoLocationGroupDesc> getCoLocationGroupDescriptors() {
// invoke distinct() on CoLocationGroup first to avoid creating
// multiple CoLocationGroupDec from one CoLocationGroup
final Set<CoLocationGroupDesc> coLocationGroups =
public Set<CoLocationGroup> getCoLocationGroup() {
XComp marked this conversation as resolved.
Show resolved Hide resolved
final Set<CoLocationGroup> coLocationGroups =
IterableUtils.toStream(getVertices())
.map(JobVertex::getCoLocationGroup)
.filter(Objects::nonNull)
.distinct()
.map(CoLocationGroupDesc::from)
.collect(Collectors.toSet());
return Collections.unmodifiableSet(coLocationGroups);
}
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -109,7 +110,7 @@ public class JobVertex implements java.io.Serializable {
@Nullable private SlotSharingGroup slotSharingGroup;

/** The group inside which the vertex subtasks share slots. */
@Nullable private CoLocationGroup coLocationGroup;
@Nullable private CoLocationGroupImpl coLocationGroup;

/**
* Optional, the name of the operator, such as 'Flat Map' or 'Join', to be included in the JSON
Expand Down Expand Up @@ -425,12 +426,12 @@ public void setStrictlyCoLocatedWith(JobVertex strictlyCoLocatedWith) {
"Strict co-location requires that both vertices are in the same slot sharing group.");
}

CoLocationGroup thisGroup = this.coLocationGroup;
CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup;
CoLocationGroupImpl thisGroup = this.coLocationGroup;
CoLocationGroupImpl otherGroup = strictlyCoLocatedWith.coLocationGroup;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be cleaner to add the addVertex() method to the CoLocationGroup interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to have a readonly interface as the colocation groups do not need to be changed after initialization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, this is only the initialization.


if (otherGroup == null) {
if (thisGroup == null) {
CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith);
CoLocationGroupImpl group = new CoLocationGroupImpl(this, strictlyCoLocatedWith);
this.coLocationGroup = group;
strictlyCoLocatedWith.coLocationGroup = group;
} else {
Expand All @@ -453,7 +454,7 @@ public CoLocationGroup getCoLocationGroup() {
return coLocationGroup;
}

public void updateCoLocationGroup(CoLocationGroup group) {
public void updateCoLocationGroup(CoLocationGroupImpl group) {
this.coLocationGroup = group;
}

Expand Down
Expand Up @@ -18,130 +18,46 @@

package org.apache.flink.runtime.jobmanager.scheduler;

import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* A CoLocationConstraint manages the location of a set of tasks (Execution Vertices). In
* co-location groups, the different subtasks of different JobVertices need to be executed on the
* same slot. This is realized by creating a special shared slot that holds these tasks.
*
* <p>This class tracks the location and the shared slot for this set of tasks.
* A {@code CoLocationConstraint} stores the ID of {@link CoLocationGroup} and an ID referring to
* the actual subtask (i.e. {@link ExecutionVertex}). In co-location groups, the different subtasks
* of different {@link JobVertex} instances need to be executed on the same slot. This is realized
* by creating a special shared slot that holds these tasks.
*/
public class CoLocationConstraint {

private final CoLocationGroup group;

private volatile TaskManagerLocation lockedLocation;

private volatile SlotRequestId slotRequestId;

CoLocationConstraint(CoLocationGroup group) {
Preconditions.checkNotNull(group);
this.group = group;
this.slotRequestId = null;
}
private final AbstractID coLocationGroupId;

// ------------------------------------------------------------------------
// Status & Properties
// ------------------------------------------------------------------------
private final int constraintIndex;

/**
* Gets the ID that identifies the co-location group.
*
* @return The ID that identifies the co-location group.
*/
public AbstractID getGroupId() {
return this.group.getId();
CoLocationConstraint(final AbstractID coLocationGroupId, final int constraintIndex) {
this.coLocationGroupId = checkNotNull(coLocationGroupId);
this.constraintIndex = constraintIndex;
}

/**
* Checks whether the location of this constraint has been assigned. The location is locked via
* the {@link #lockLocation(TaskManagerLocation)} method.
*
* @return True if the location has been assigned, false otherwise.
*/
public boolean isAssigned() {
return lockedLocation != null;
}

/**
* Gets the location assigned to this slot. This method only succeeds after the location has
* been locked via the {@link #lockLocation(TaskManagerLocation)} method and {@link
* #isAssigned()} returns true.
*
* @return The instance describing the location for the tasks of this constraint.
* @throws IllegalStateException Thrown if the location has not been assigned, yet.
*/
public TaskManagerLocation getLocation() {
if (lockedLocation != null) {
return lockedLocation;
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj != null && obj.getClass() == getClass()) {
CoLocationConstraint that = (CoLocationConstraint) obj;
return Objects.equals(that.coLocationGroupId, this.coLocationGroupId)
&& that.constraintIndex == this.constraintIndex;
} else {
throw new IllegalStateException("Location not yet locked");
return false;
}
}

// ------------------------------------------------------------------------
// Assigning resources and location
// ------------------------------------------------------------------------

/**
* Locks the location of this slot. The location can be locked only once and only after a shared
* slot has been assigned.
*
* <p>Note: This method exists for compatibility reasons with the new {@link SlotPool}.
*
* @param taskManagerLocation to lock this co-location constraint to
*/
public void lockLocation(TaskManagerLocation taskManagerLocation) {
checkNotNull(taskManagerLocation);
checkState(lockedLocation == null, "Location is already locked.");

lockedLocation = taskManagerLocation;
}

/**
* Sets the slot request id of the currently assigned slot to the co-location constraint. All
* other tasks belonging to this co-location constraint will be deployed to the same slot.
*
* @param slotRequestId identifying the assigned slot for this co-location constraint
*/
public void setSlotRequestId(@Nullable SlotRequestId slotRequestId) {
this.slotRequestId = slotRequestId;
}

/**
* Returns the currently assigned slot request id identifying the slot to which tasks belonging
* to this co-location constraint will be deployed to.
*
* @return Slot request id of the assigned slot or null if none
*/
@Nullable
public SlotRequestId getSlotRequestId() {
return slotRequestId;
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

@Override
public String toString() {
return "CoLocationConstraint{"
+ "group="
+ group
+ ", lockedLocation="
+ lockedLocation
+ ", slotRequestId="
+ slotRequestId
+ '}';
public int hashCode() {
return 31 * coLocationGroupId.hashCode() + constraintIndex;
}
}

This file was deleted.