Skip to content

Commit

Permalink
Coordinator drop segment selection through cost balancer (apache#5529)
Browse files Browse the repository at this point in the history
* drop selection through cost balancer

* use collections.emptyIterator

* add test to ensure does not drop from server with larger loading queue with cost balancer

* javadocs and comments to clear things up

* random drop for completeness
  • Loading branch information
clintropolis authored and jon-wei committed Apr 4, 2018
1 parent efaeb5b commit 7b03411
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,61 @@

import io.druid.timeline.DataSegment;

import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;

/**
* This interface describes the coordinator balancing strategy, which is responsible for making decisions on where
* to place {@link DataSegment}s on historical servers (described by {@link ServerHolder}). The balancing strategy
* is used by {@link io.druid.server.coordinator.rules.LoadRule} to assign and drop segments, and by
* {@link io.druid.server.coordinator.helper.DruidCoordinatorBalancer} to migrate segments between historicals.
*/
public interface BalancerStrategy
{
/**
* Find the best server to move a {@link DataSegment} to according the the balancing strategy.
* @param proposalSegment segment to move
* @param serverHolders servers to consider as move destinations
* @return The server to move to, or null if no move should be made or no server is suitable
*/
ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List<ServerHolder> serverHolders);

/**
* Find the best server on which to place a {@link DataSegment} replica according to the balancing strategy
* @param proposalSegment segment to replicate
* @param serverHolders servers to consider as replica holders
* @return The server to replicate to, or null if no suitable server is found
*/
ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List<ServerHolder> serverHolders);

/**
* Pick the best segment to move from one of the supplied set of servers according to the balancing strategy.
* @param serverHolders set of historicals to consider for moving segments
* @return {@link BalancerSegmentHolder} containing segment to move and server it current resides on
*/
BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders);

/**
* Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first
* for a given drop strategy. One or more segments may be dropped, depending on how much the segment is
* over-replicated.
* @param toDropSegment segment to drop from one or more servers
* @param serverHolders set of historicals to consider dropping from
* @return Iterator for set of historicals, ordered by drop preference
*/
default Iterator<ServerHolder> pickServersToDrop(DataSegment toDropSegment, NavigableSet<ServerHolder> serverHolders)
{
// By default, use the reverse order to get the holders with least available size first.
return serverHolders.descendingIterator();
}

/**
* Add balancing strategy stats during the 'balanceTier' operation of
* {@link io.druid.server.coordinator.helper.DruidCoordinatorBalancer} to be included
* @param tier historical tier being balanced
* @param stats stats object to add balancing strategy stats to
* @param serverHolderList servers in tier being balanced
*/
void emitStats(String tier, CoordinatorStats stats, List<ServerHolder> serverHolderList);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@
import org.joda.time.Interval;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.NavigableSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

public class CostBalancerStrategy implements BalancerStrategy
{
Expand Down Expand Up @@ -220,6 +224,37 @@ public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHo
return sampler.getRandomBalancerSegmentHolder(serverHolders);
}

@Override
public Iterator<ServerHolder> pickServersToDrop(DataSegment toDrop, NavigableSet<ServerHolder> serverHolders)
{
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList();

for (final ServerHolder server : serverHolders) {
futures.add(
exec.submit(
() -> Pair.of(computeCost(toDrop, server, true), server)
)
);
}

final ListenableFuture<List<Pair<Double, ServerHolder>>> resultsFuture = Futures.allAsList(futures);

try {
// results is an un-ordered list of a pair consisting of the 'cost' of a segment being on a server and the server
List<Pair<Double, ServerHolder>> results = resultsFuture.get();
return results.stream()
// Comparator.comapringDouble will order by lowest cost...
// reverse it because we want to drop from the highest cost servers first
.sorted(Comparator.comparingDouble((Pair<Double, ServerHolder> o) -> o.lhs).reversed())
.map(x -> x.rhs).collect(Collectors.toList())
.iterator();
}
catch (Exception e) {
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit();
}
return Collections.emptyIterator();
}

/**
* Calculates the initial cost of the Druid segment configuration.
*
Expand Down Expand Up @@ -342,14 +377,7 @@ protected Pair<Double, ServerHolder> chooseBestServer(
for (final ServerHolder server : serverHolders) {
futures.add(
exec.submit(
new Callable<Pair<Double, ServerHolder>>()
{
@Override
public Pair<Double, ServerHolder> call() throws Exception
{
return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server);
}
}
() -> Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

import io.druid.timeline.DataSegment;

import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Random;

public class RandomBalancerStrategy implements BalancerStrategy
Expand Down Expand Up @@ -54,6 +57,12 @@ public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
return sampler.getRandomBalancerSegmentHolder(serverHolders);
}

@Override
public Iterator<ServerHolder> pickServersToDrop(DataSegment toDropSegment, NavigableSet<ServerHolder> serverHolders)
{
return serverHolders.stream().sorted(Comparator.comparingDouble(o -> new Random().nextDouble())).iterator();
}

@Override
public void emitStats(String tier, CoordinatorStats stats, List<ServerHolder> serverHolderList)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

package io.druid.server.coordinator.rules;

import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.common.IAE;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
Expand All @@ -39,6 +40,7 @@
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -209,7 +211,7 @@ private ServerHolder assignPrimary(
}

/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
* @param tierToSkip if not null, this tier will be skipped from doing assignment, use when primary replica was
* assigned.
*/
Expand Down Expand Up @@ -320,7 +322,7 @@ private void drop(
} else {
final int currentReplicantsInTier = entry.getIntValue();
final int numToDrop = currentReplicantsInTier - targetReplicants.getOrDefault(tier, 0);
numDropped = dropForTier(numToDrop, holders, segment);
numDropped = dropForTier(numToDrop, holders, segment, params.getBalancerStrategy());
}

stats.addToTieredStat(DROPPED_COUNT, tier, numDropped);
Expand All @@ -346,13 +348,17 @@ private boolean loadingInProgress(final DruidCluster druidCluster)
private static int dropForTier(
final int numToDrop,
final NavigableSet<ServerHolder> holdersInTier,
final DataSegment segment
final DataSegment segment,
final BalancerStrategy balancerStrategy
)
{
int numDropped = 0;

// Use the reverse order to get the holders with least available size first.
final Iterator<ServerHolder> iterator = holdersInTier.descendingIterator();
final NavigableSet<ServerHolder> isServingSubset =
holdersInTier.stream().filter(s -> s.isServingSegment(segment)).collect(Collectors.toCollection(TreeSet::new));

final Iterator<ServerHolder> iterator = balancerStrategy.pickServersToDrop(segment, isServingSubset);

while (numDropped < numToDrop) {
if (!iterator.hasNext()) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
Expand All @@ -364,6 +370,12 @@ private static int dropForTier(
if (holder.isServingSegment(segment)) {
holder.getPeon().dropSegment(segment, null);
++numDropped;
} else {
log.warn(
"Server [%s] is no longer serving segment [%s], skipping drop.",
holder.getServer().getName(),
segment.getIdentifier()
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@ public void testDrop() throws Exception
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockBalancerStrategy.pickServersToDrop(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(2);
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);

LoadRule rule = createLoadRule(ImmutableMap.of(
Expand Down Expand Up @@ -554,7 +557,9 @@ public void testDropWithNonExistentTier() throws Exception
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();

EasyMock.expect(mockBalancerStrategy.pickServersToDrop(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(1);
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);

LoadRule rule = createLoadRule(ImmutableMap.of(
Expand Down

0 comments on commit 7b03411

Please sign in to comment.