Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Add a scoring-based approach to modifying packing plan #1572

Merged
merged 4 commits into from Nov 24, 2016
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions heron/packing/src/java/BUILD
Expand Up @@ -42,6 +42,7 @@ java_library(
srcs = glob(["**/packing/builder/*.java","**/packing/ResourceExceededException.java"]),
deps = heron_java_proto_files() + [
"@com_google_guava_guava//jar",
"//heron/api/src/java:classification",
"//heron/spi/src/java:packing-spi-java",
":utils",
],
Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.packing.RamRequirement;
import com.twitter.heron.packing.ResourceExceededException;
import com.twitter.heron.packing.builder.ContainerIdScorer;
import com.twitter.heron.packing.builder.PackingPlanBuilder;
import com.twitter.heron.packing.utils.PackingUtils;
import com.twitter.heron.spi.common.Config;
Expand Down Expand Up @@ -284,11 +285,13 @@ private void removeInstancesFromContainers(PackingPlanBuilder packingPlanBuilder
ArrayList<RamRequirement> ramRequirements =
getSortedRAMInstances(componentsToScaleDown.keySet());

ContainerIdScorer scorer = new ContainerIdScorer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a reverse scorer for instance removal better? IMO, inserting and removing instances among the containers in reverse direct will result in better resource utilization.


for (RamRequirement ramRequirement : ramRequirements) {
String component = ramRequirement.getComponentName();
int numInstancesToRemove = -componentsToScaleDown.get(component);
for (int j = 0; j < numInstancesToRemove; j++) {
removeFFDInstance(packingPlanBuilder, component);
packingPlanBuilder.removeInstance(scorer, component);
}
}
}
Expand All @@ -302,30 +305,12 @@ private void placeFFDInstance(PackingPlanBuilder planBuilder,
if (this.numContainers == 0) {
planBuilder.updateNumContainers(++numContainers);
}
for (int containerId = 1; containerId <= numContainers; containerId++) {
try {
planBuilder.addInstance(containerId, instanceId);
return;
} catch (ResourceExceededException e) {
// ignore since we'll continue trying
}
}
planBuilder.updateNumContainers(++numContainers);
planBuilder.addInstance(numContainers, instanceId);
}

/**
* Remove an instance of a particular component from the containers
*
*/
private void removeFFDInstance(PackingPlanBuilder packingPlanBuilder, String component)
throws RuntimeException {
for (int containerId = 1; containerId <= numContainers; containerId++) {
if (packingPlanBuilder.removeInstance(containerId, component)) {
return;
}
try {
planBuilder.addInstance(new ContainerIdScorer(), instanceId);
} catch (ResourceExceededException e) {
planBuilder.updateNumContainers(++numContainers);
planBuilder.addInstance(numContainers, instanceId);
}
throw new PackingException("Cannot remove instance. No more instances of component "
+ component + " exist in the containers.");
}
}
Expand Up @@ -27,6 +27,7 @@
*/
class Container {

private int containerId;
private HashSet<PackingPlan.InstancePlan> instances;
private Resource capacity;
private int paddingPercentage;
Expand All @@ -50,12 +51,17 @@ public int getPaddingPercentage() {
* @param capacity the capacity of the container in terms of cpu, ram and disk
* @param paddingPercentage the padding percentage
*/
Container(Resource capacity, int paddingPercentage) {
Container(int containerId, Resource capacity, int paddingPercentage) {
this.containerId = containerId;
this.capacity = capacity;
this.instances = new HashSet<PackingPlan.InstancePlan>();
this.paddingPercentage = paddingPercentage;
}

public int getContainerId() {
return containerId;
}

/**
* Update the resources currently used by the container, when a new instance with specific
* resource requirements has been assigned to the container.
Expand Down
@@ -0,0 +1,48 @@
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.packing.builder;

/**
* Sorts containers ascending by container id. Id firstId and maxId as passed, starte with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a typo: starte -> start

* container with id firstId and ends with id firstId - 1
*/
public class ContainerIdScorer implements Scorer<Container> {

private Integer firstId;
private Integer maxId;

public ContainerIdScorer() {
this(0, 0);
}

public ContainerIdScorer(Integer firstId, Integer maxId) {
this.firstId = firstId;
this.maxId = maxId;
}

@Override
public boolean sortAscending() {
return true;
}

@Override
public double getScore(Container container) {
int containerId = container.getContainerId();
if (containerId >= firstId) {
return containerId - firstId;
} else {
return containerId + maxId;
}
}
}
Expand Up @@ -14,10 +14,15 @@
package com.twitter.heron.packing.builder;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
Expand Down Expand Up @@ -107,13 +112,80 @@ public PackingPlanBuilder addInstance(Integer containerId,
return this;
}

@SuppressWarnings("JavadocMethod")
/**
* Add an instance to the first container possible ranked by score.
* @return containerId of the container the instance was added to
* @throws com.twitter.heron.packing.ResourceExceededException if the instance could not be added
*/
public int addInstance(Scorer<Container> scorer,
InstanceId instanceId) throws ResourceExceededException {
List<Scorer<Container>> scorers = new LinkedList<>();
scorers.add(scorer);
return addInstance(scorers, instanceId);
}

@SuppressWarnings("JavadocMethod")
/**
* Add an instance to the first container possible ranked by score. If a scoring tie exists,
* uses the next scorer in the scorers list to break the tie.
* @return containerId of the container the instance was added to
* @throws com.twitter.heron.packing.ResourceExceededException if the instance could not be added
*/
private int addInstance(List<Scorer<Container>> scorers, InstanceId instanceId)
throws ResourceExceededException {
initContainers();
for (Container container : sortContainers(scorers, this.containers.values())) {
try {
addInstance(container.getContainerId(), instanceId);
return container.getContainerId();
} catch (ResourceExceededException e) {
// ignore since we'll continue trying
}
}
//Not enough containers.
throw new ResourceExceededException(String.format(
"Insufficient resources to add instance %s to any of the %d containers.",
instanceId, this.containers.size()));
}

public boolean removeInstance(Integer containerId, String componentName) {
initContainers();
Optional<PackingPlan.InstancePlan> instancePlan =
containers.get(containerId).removeAnyInstanceOfComponent(componentName);
return instancePlan.isPresent();
}

@SuppressWarnings("JavadocMethod")
/**
* Remove an instance from the first container possible ranked by score.
* @return containerId of the container the instance was removed from
* @throws com.twitter.heron.spi.packing.PackingException if the instance could not be removed
*/
public int removeInstance(Scorer<Container> scorer, String componentName) {
List<Scorer<Container>> scorers = new LinkedList<>();
scorers.add(scorer);
return removeInstance(scorers, componentName);
}

@SuppressWarnings("JavadocMethod")
/**
* Remove an instance from the first container possible ranked by score. If a scoring tie exists,
* uses the next scorer in the scorers list to break the tie.
* @return containerId of the container the instance was removed from
* @throws com.twitter.heron.spi.packing.PackingException if the instance could not be removed
*/
public int removeInstance(List<Scorer<Container>> scorers, String componentName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the RCRR implementation, during scale down we do not always start from the first container. For example if we need to scale down 2 components by 1 instance from containers [1...5] and the first component is removed from instance 2 then the removal for the next component will start from container 3. This will help with load balancing. This remove instance method does not capture that. However, it is fine if a good scoring method is used when we change the scale down approach as discussed in the other thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@billonahill I'm not sure if that is the same. In the previous example if we pass as input to the constructor container id 3 and containrId 5 then the search will stop at 5, right? Wehat we ideally want is to start from 3 and use this sequence: 4,5,1,2. Is that behavior captured?

initContainers();
for (Container container : sortContainers(scorers, this.containers.values())) {
if (removeInstance(container.getContainerId(), componentName)) {
return container.getContainerId();
}
}
throw new PackingException("Cannot remove instance. No more instances of component "
+ componentName + " exist in the containers.");
}

// build container plan sets by summing up instance resources
public PackingPlan build() {
assertResourceSettings();
Expand All @@ -133,8 +205,8 @@ private void initContainers() {
if (this.existingPacking == null) {
newContainerMap = new HashMap<>();
for (int containerId = 1; containerId <= numContainers; containerId++) {
newContainerMap.put(containerId,
new Container(this.maxContainerResource, this.requestedContainerPadding));
newContainerMap.put(containerId, new Container(
containerId, this.maxContainerResource, this.requestedContainerPadding));
}
} else {
newContainerMap = getContainers(
Expand All @@ -146,8 +218,9 @@ private void initContainers() {
SortedSet<Integer> sortedIds = new TreeSet<>(newContainerMap.keySet());
int nextContainerId = sortedIds.last() + 1;
for (int i = 0; i < numContainers - newContainerMap.size(); i++) {
newContainerMap.put(nextContainerId++, new Container(
newContainerMap.put(nextContainerId, new Container(nextContainerId,
newContainerMap.get(sortedIds.first()).getCapacity(), this.requestedContainerPadding));
nextContainerId++;
}
}

Expand Down Expand Up @@ -231,27 +304,6 @@ private static Set<PackingPlan.ContainerPlan> buildContainerPlans(
return containerPlans;
}

/**
* Sort the container plans based on the container Ids
*
* @return sorted array of container plans
*/
@VisibleForTesting
static PackingPlan.ContainerPlan[] sortOnContainerId(Set<PackingPlan.ContainerPlan> containers) {
ArrayList<Integer> containerIds = new ArrayList<>();
PackingPlan.ContainerPlan[] currentContainers =
new PackingPlan.ContainerPlan[containers.size()];
for (PackingPlan.ContainerPlan container : containers) {
containerIds.add(container.getId());
}
Collections.sort(containerIds);
for (PackingPlan.ContainerPlan container : containers) {
int position = containerIds.indexOf(container.getId());
currentContainers[position] = container;
}
return currentContainers;
}

/**
* Generates the containers that correspond to the current packing plan
* along with their associated instances.
Expand All @@ -263,18 +315,80 @@ static Map<Integer, Container> getContainers(PackingPlan currentPackingPlan,
int paddingPercentage) {
Map<Integer, Container> containers = new HashMap<>();

//sort containers based on containerIds;
PackingPlan.ContainerPlan[] currentContainerPlans =
sortOnContainerId(currentPackingPlan.getContainers());

Resource capacity = currentPackingPlan.getMaxContainerResources();
for (PackingPlan.ContainerPlan currentContainerPlan : currentContainerPlans) {
Container container = new Container(capacity, paddingPercentage);
for (PackingPlan.ContainerPlan currentContainerPlan : currentPackingPlan.getContainers()) {
Container container =
new Container(currentContainerPlan.getId(), capacity, paddingPercentage);
for (PackingPlan.InstancePlan instancePlan : currentContainerPlan.getInstances()) {
container.add(instancePlan);
}
containers.put(currentContainerPlan.getId(), container);
}
return containers;
}

@VisibleForTesting
static List<Container> sortContainers(List<Scorer<Container>> scorers,
Collection<Container> containers) {
List<Container> sorted = new ArrayList<>(containers);
Collections.sort(sorted, new ChainedContainerComparator<>(scorers));
return sorted;
}

private static class ChainedContainerComparator<T> implements Comparator<T> {
private final Comparator<T> comparator;
private final ChainedContainerComparator<T> tieBreaker;

ChainedContainerComparator(List<Scorer<T>> scorers) {
this((Queue<Scorer<T>>) new LinkedList<Scorer<T>>(scorers));
}

ChainedContainerComparator(Queue<Scorer<T>> scorers) {
if (scorers.isEmpty()) {
this.comparator = new EqualsComparator<T>();
this.tieBreaker = null;
} else {
this.comparator = new ContainerComparator<T>(scorers.remove());
this.tieBreaker = new ChainedContainerComparator<T>(scorers);
}
}

@Override
public int compare(T thisOne, T thatOne) {

int delta = comparator.compare(thisOne, thatOne);
if (delta != 0 || this.tieBreaker == null) {
return delta;
}
return tieBreaker.compare(thisOne, thatOne);
}
}

private static class ContainerComparator<T> implements Comparator<T> {
private Scorer<T> scorer;

ContainerComparator(Scorer<T> scorer) {
this.scorer = scorer;
}

@Override
public int compare(T thisOne, T thatOne) {
int sign = 1;
if (!scorer.sortAscending()) {
sign = -1;
}
return sign * (getScore(thisOne) - getScore(thatOne));
}

private int getScore(T container) {
return (int) (1000 * scorer.getScore(container));
}
}

private static class EqualsComparator<T> implements Comparator<T> {
@Override
public int compare(T thisOne, T thatOne) {
return 0;
}
}
}