Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

/** A {@link VertexParallelismInformation} implementation that provides common validation. */
public class DefaultVertexParallelismInfo implements VertexParallelismInformation {
private final int minParallelism;
private int parallelism;
private int maxParallelism;
private final Function<Integer, Optional<String>> rescaleMaxValidator;
Expand All @@ -45,6 +46,15 @@ public DefaultVertexParallelismInfo(
int parallelism,
int maxParallelism,
Function<Integer, Optional<String>> rescaleMaxValidator) {
this(1, parallelism, maxParallelism, rescaleMaxValidator);
}

public DefaultVertexParallelismInfo(
int minParallelism,
int parallelism,
int maxParallelism,
Function<Integer, Optional<String>> rescaleMaxValidator) {
this.minParallelism = minParallelism;
this.parallelism = checkInitialParallelism(parallelism);
this.maxParallelism = normalizeAndCheckMaxParallelism(maxParallelism);
this.rescaleMaxValidator = Preconditions.checkNotNull(rescaleMaxValidator);
Expand Down Expand Up @@ -79,6 +89,11 @@ private static int checkBounds(String name, int parallelism) {
return parallelism;
}

@Override
public int getMinParallelism() {
return minParallelism;
}

@Override
public int getParallelism() {
return this.parallelism;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -50,15 +51,20 @@ public static Optional<VertexParallelismStore> applyJobResourceRequirements(
for (final JobVertexID jobVertexId : jobResourceRequirements.getJobVertices()) {
final VertexParallelismInformation oldVertexParallelismInfo =
oldVertexParallelismStore.getParallelismInfo(jobVertexId);
final int parallelism =
jobResourceRequirements.getParallelism(jobVertexId).getUpperBound();
final JobVertexResourceRequirements.Parallelism parallelismSettings =
jobResourceRequirements.getParallelism(jobVertexId);
final int minParallelism = parallelismSettings.getLowerBound();
final int parallelism = parallelismSettings.getUpperBound();
newVertexParallelismStore.setParallelismInfo(
jobVertexId,
new DefaultVertexParallelismInfo(
minParallelism,
parallelism,
oldVertexParallelismInfo.getMaxParallelism(),
RESCALE_MAX_REJECT));
changed |= oldVertexParallelismInfo.getParallelism() != parallelism;
changed |=
oldVertexParallelismInfo.getMinParallelism() != minParallelism
|| oldVertexParallelismInfo.getParallelism() != parallelism;
}
return changed ? Optional.of(newVertexParallelismStore) : Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@
* change during runtime.
*/
public interface VertexParallelismInformation {

/**
* Returns a vertex's min parallelism.
*
* @return the min parallelism for the vertex
*/
int getMinParallelism();

/**
* Returns a vertex's parallelism.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public JobVertexID getJobVertexID() {
return jobVertex.getID();
}

@Override
public int getMinParallelism() {
return parallelismInfo.getMinParallelism();
}

@Override
public int getParallelism() {
return parallelismInfo.getParallelism();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public interface JobInformation {
interface VertexInformation {
JobVertexID getJobVertexID();

int getMinParallelism();

int getParallelism();

int getMaxParallelism();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

/** {@link SlotAllocator} implementation that supports slot sharing. */
Expand Down Expand Up @@ -72,37 +74,35 @@ public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(
public ResourceCounter calculateRequiredSlots(
Iterable<JobInformation.VertexInformation> vertices) {
int numTotalRequiredSlots = 0;
for (Integer requiredSlots : getMaxParallelismForSlotSharingGroups(vertices).values()) {
numTotalRequiredSlots += requiredSlots;
for (SlotSharingGroupMetaInfo slotSharingGroupMetaInfo :
SlotSharingGroupMetaInfo.from(vertices).values()) {
numTotalRequiredSlots += slotSharingGroupMetaInfo.getMaxUpperBound();
}
return ResourceCounter.withResource(ResourceProfile.UNKNOWN, numTotalRequiredSlots);
}

private static Map<SlotSharingGroupId, Integer> getMaxParallelismForSlotSharingGroups(
Iterable<JobInformation.VertexInformation> vertices) {
final Map<SlotSharingGroupId, Integer> maxParallelismForSlotSharingGroups = new HashMap<>();
for (JobInformation.VertexInformation vertex : vertices) {
maxParallelismForSlotSharingGroups.compute(
vertex.getSlotSharingGroup().getSlotSharingGroupId(),
(slotSharingGroupId, currentMaxParallelism) ->
currentMaxParallelism == null
? vertex.getParallelism()
: Math.max(currentMaxParallelism, vertex.getParallelism()));
}
return maxParallelismForSlotSharingGroups;
}

@Override
public Optional<VertexParallelism> determineParallelism(
JobInformation jobInformation, Collection<? extends SlotInfo> freeSlots) {

// => less slots than slot-sharing groups
if (jobInformation.getSlotSharingGroups().size() > freeSlots.size()) {
final Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> slotSharingGroupMetaInfo =
SlotSharingGroupMetaInfo.from(jobInformation.getVertices());

final int minimumRequiredSlots =
slotSharingGroupMetaInfo.values().stream()
.map(SlotSharingGroupMetaInfo::getMinLowerBound)
.reduce(0, Integer::sum);

if (minimumRequiredSlots > freeSlots.size()) {
return Optional.empty();
}

final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism =
determineSlotsPerSharingGroup(jobInformation, freeSlots.size());
determineSlotsPerSharingGroup(
jobInformation,
freeSlots.size(),
minimumRequiredSlots,
slotSharingGroupMetaInfo);

final Map<JobVertexID, Integer> allVertexParallelism = new HashMap<>();

Expand Down Expand Up @@ -150,34 +150,58 @@ public Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment(
* distributed over the remaining groups.
*/
private static Map<SlotSharingGroupId, Integer> determineSlotsPerSharingGroup(
JobInformation jobInformation, int freeSlots) {
JobInformation jobInformation,
int freeSlots,
int minRequiredSlots,
Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> slotSharingGroupMetaInfo) {

int numUnassignedSlots = freeSlots;
int numUnassignedSlotSharingGroups = jobInformation.getSlotSharingGroups().size();
int numMinSlotsRequiredByRemainingGroups = minRequiredSlots;

final Map<SlotSharingGroupId, Integer> slotSharingGroupParallelism = new HashMap<>();

for (Map.Entry<SlotSharingGroupId, Integer> slotSharingGroup :
sortSlotSharingGroupsByDesiredParallelism(jobInformation)) {
for (SlotSharingGroupId slotSharingGroup :
sortSlotSharingGroupsByHighestParallelismRange(slotSharingGroupMetaInfo)) {
final int minParallelism =
slotSharingGroupMetaInfo.get(slotSharingGroup).getMinLowerBound();

// if we reached this point we know we have more slots than we need to fulfill the
// minimum requirements for each slot sharing group.
// this means that a certain number of slots are already implicitly reserved (to fulfill
// the minimum requirement of other groups); so we only need to distribute the remaining
// "optional" slots while only accounting for the requirements beyond the minimum

// the number of slots this group can use beyond the minimum
final int maxOptionalSlots =
slotSharingGroupMetaInfo.get(slotSharingGroup).getMaxUpperBound()
- minParallelism;
// the number of slots that are not implicitly reserved for minimum requirements
final int freeOptionalSlots = numUnassignedSlots - numMinSlotsRequiredByRemainingGroups;
// the number of slots this group is allowed to use beyond the minimum requirements
final int optionalSlotShare = freeOptionalSlots / numUnassignedSlotSharingGroups;

final int groupParallelism =
Math.min(
slotSharingGroup.getValue(),
numUnassignedSlots / numUnassignedSlotSharingGroups);
minParallelism + Math.min(maxOptionalSlots, optionalSlotShare);

slotSharingGroupParallelism.put(slotSharingGroup.getKey(), groupParallelism);
slotSharingGroupParallelism.put(slotSharingGroup, groupParallelism);

numMinSlotsRequiredByRemainingGroups -= minParallelism;
numUnassignedSlots -= groupParallelism;
numUnassignedSlotSharingGroups--;
}

return slotSharingGroupParallelism;
}

private static List<Map.Entry<SlotSharingGroupId, Integer>>
sortSlotSharingGroupsByDesiredParallelism(JobInformation jobInformation) {
private static List<SlotSharingGroupId> sortSlotSharingGroupsByHighestParallelismRange(
Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> slotSharingGroupMetaInfo) {

return getMaxParallelismForSlotSharingGroups(jobInformation.getVertices()).entrySet()
.stream()
.sorted(Comparator.comparingInt(Map.Entry::getValue))
return slotSharingGroupMetaInfo.entrySet().stream()
.sorted(
Comparator.comparingInt(
entry -> entry.getValue().getMaxLowerUpperBoundRange()))
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -274,4 +298,68 @@ public Collection<ExecutionVertexID> getContainedExecutionVertices() {
return containedExecutionVertices;
}
}

private static class SlotSharingGroupMetaInfo {

private final int minLowerBound;
private final int maxUpperBound;
private final int maxLowerUpperBoundRange;

private SlotSharingGroupMetaInfo(
int minLowerBound, int maxUpperBound, int maxLowerUpperBoundRange) {
this.minLowerBound = minLowerBound;
this.maxUpperBound = maxUpperBound;
this.maxLowerUpperBoundRange = maxLowerUpperBoundRange;
}

public int getMinLowerBound() {
return minLowerBound;
}

public int getMaxUpperBound() {
return maxUpperBound;
}

public int getMaxLowerUpperBoundRange() {
return maxLowerUpperBoundRange;
}

public static Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> from(
Iterable<JobInformation.VertexInformation> vertices) {

return getPerSlotSharingGroups(
vertices,
vertexInformation ->
new SlotSharingGroupMetaInfo(
vertexInformation.getMinParallelism(),
vertexInformation.getParallelism(),
vertexInformation.getParallelism()
- vertexInformation.getMinParallelism()),
(metaInfo1, metaInfo2) ->
new SlotSharingGroupMetaInfo(
Math.min(metaInfo1.getMinLowerBound(), metaInfo2.minLowerBound),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Schepler! May you explain here why it's not Math.max(metaInfo1.getMinLowerBound(), metaInfo2.minLowerBound) but min here?

Math.max(
metaInfo1.getMaxUpperBound(),
metaInfo2.getMaxUpperBound()),
Math.max(
metaInfo1.getMaxLowerUpperBoundRange(),
metaInfo2.getMaxLowerUpperBoundRange())));
}

private static <T> Map<SlotSharingGroupId, T> getPerSlotSharingGroups(
Iterable<JobInformation.VertexInformation> vertices,
Function<JobInformation.VertexInformation, T> mapper,
BiFunction<T, T, T> reducer) {
final Map<SlotSharingGroupId, T> extractedPerSlotSharingGroups = new HashMap<>();
for (JobInformation.VertexInformation vertex : vertices) {
extractedPerSlotSharingGroups.compute(
vertex.getSlotSharingGroup().getSlotSharingGroupId(),
(slotSharingGroupId, currentData) ->
currentData == null
? mapper.apply(vertex)
: reducer.apply(currentData, mapper.apply(vertex)));
}
return extractedPerSlotSharingGroups;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public void testSetInfo() {
}

private static final class MockVertexParallelismInfo implements VertexParallelismInformation {
@Override
public int getMinParallelism() {
return 0;
}

@Override
public int getParallelism() {
return 0;
Expand Down
Loading