Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coordinator drop segment selection through cost balancer #5529

Merged
merged 6 commits into from Apr 3, 2018
@@ -21,7 +21,9 @@

import io.druid.timeline.DataSegment;

import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;

public interface BalancerStrategy
{
@@ -31,5 +33,10 @@

BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders);

default Iterator<ServerHolder> pickServersToDrop(DataSegment toDropSegment, NavigableSet<ServerHolder> serverHolders)

This comment has been minimized.

Copy link
@gianm

gianm Apr 2, 2018

Contributor

This should have some javadoc explaining what the returned iterator should be. It's non-obvious: the "obvious" return value would be a set of servers to drop from, where the order doesn't matter. But the actual return value is ordered by preferredness, and may contain more servers than we actually want to drop from.

This comment has been minimized.

Copy link
@clintropolis

clintropolis Apr 2, 2018

Author Member

I went ahead and added javadoc for whole interface, lmk if I got anything wrong

{
return serverHolders.descendingIterator();

This comment has been minimized.

Copy link
@gianm

gianm Apr 2, 2018

Contributor

Would be nice to include a comment about why this descendingIterator is meaningful. (Like the comment on the old code: // Use the reverse order to get the holders with least available size first.)

}

void emitStats(String tier, CoordinatorStats stats, List<ServerHolder> serverHolderList);
}
@@ -32,9 +32,13 @@
import org.joda.time.Interval;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.NavigableSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

public class CostBalancerStrategy implements BalancerStrategy
{
@@ -219,6 +223,34 @@ public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHo
return sampler.getRandomBalancerSegmentHolder(serverHolders);
}

@Override
public Iterator<ServerHolder> pickServersToDrop(DataSegment toDrop, NavigableSet<ServerHolder> serverHolders)
{
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList();

for (final ServerHolder server : serverHolders) {
futures.add(
exec.submit(
() -> Pair.of(computeCost(toDrop, server, true), server)
)
);
}

final ListenableFuture<List<Pair<Double, ServerHolder>>> resultsFuture = Futures.allAsList(futures);

try {
List<Pair<Double, ServerHolder>> results = resultsFuture.get();
return results.stream()

This comment has been minimized.

Copy link
@gianm

gianm Apr 2, 2018

Contributor

This code is slightly obtuse (what's the double? why reverse it?) and would benefit from a comment.

.sorted(Comparator.comparingDouble((Pair<Double, ServerHolder> o) -> o.lhs).reversed())
.map(x -> x.rhs).collect(Collectors.toList())
.iterator();
}
catch (Exception e) {
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit();
}
return Collections.emptyIterator();
}

/**
* Calculates the initial cost of the Druid segment configuration.
*
@@ -341,14 +373,7 @@ protected double computeCost(
for (final ServerHolder server : serverHolders) {
futures.add(
exec.submit(
new Callable<Pair<Double, ServerHolder>>()
{
@Override
public Pair<Double, ServerHolder> call()
{
return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server);
}
}
() -> Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server)
)
);
}
@@ -19,8 +19,9 @@

package io.druid.server.coordinator.rules;

import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.common.IAE;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
@@ -39,6 +40,7 @@
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Collectors;

@@ -209,7 +211,7 @@ private ServerHolder assignPrimary(
}

/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
* @param tierToSkip if not null, this tier will be skipped from doing assignment, use when primary replica was
* assigned.
*/
@@ -320,7 +322,7 @@ private void drop(
} else {
final int currentReplicantsInTier = entry.getIntValue();
final int numToDrop = currentReplicantsInTier - targetReplicants.getOrDefault(tier, 0);
numDropped = dropForTier(numToDrop, holders, segment);
numDropped = dropForTier(numToDrop, holders, segment, params.getBalancerStrategy());
}

stats.addToTieredStat(DROPPED_COUNT, tier, numDropped);
@@ -346,13 +348,17 @@ private boolean loadingInProgress(final DruidCluster druidCluster)
private static int dropForTier(
final int numToDrop,
final NavigableSet<ServerHolder> holdersInTier,
final DataSegment segment
final DataSegment segment,
final BalancerStrategy balancerStrategy
)
{
int numDropped = 0;

// Use the reverse order to get the holders with least available size first.
final Iterator<ServerHolder> iterator = holdersInTier.descendingIterator();
final NavigableSet<ServerHolder> isServingSubset =
holdersInTier.stream().filter(s -> s.isServingSegment(segment)).collect(Collectors.toCollection(TreeSet::new));

This comment has been minimized.

Copy link
@gianm

gianm Apr 2, 2018

Contributor

There's still a (now pointless) check for isServingSegment below. It should be removed lest it confuse people.

This comment has been minimized.

Copy link
@clintropolis

clintropolis Apr 2, 2018

Author Member

I left the check in case in the time between the initial filtering and the loop something changed and the server was no longer serving the segment, but I can remove the extra check if you think it's safe.

This comment has been minimized.

Copy link
@gianm

gianm Apr 2, 2018

Contributor

If there is a race between pickServersToDrop and this line, then there's still a race even with the check. I think it's ok to leave the check in, but maybe log a warning if it actually gets tripped. (We don't expect it to unless there is some race)

This comment has been minimized.

Copy link
@clintropolis

clintropolis Apr 2, 2018

Author Member

👍 added log.warn


final Iterator<ServerHolder> iterator = balancerStrategy.pickServersToDrop(segment, isServingSubset);

while (numDropped < numToDrop) {
if (!iterator.hasNext()) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
@@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.DruidServer;
@@ -712,6 +713,98 @@ public void testDropTooManyInSameTier()
EasyMock.verify(mockPeon);
}

@Test
public void testDropTooManyInSameTierWithLoadQueue()
{
mockCoordinator();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();

LoadQueuePeon loadingPeon = EasyMock.createMock(LoadQueuePeon.class);
mockLoadingPeon(loadingPeon, availableSegments.size() + 10, availableSegments.size());

EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
),
new IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);

DruidServer server1 = new DruidServer(
"serverNorm",
"hostNorm",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
server1.addDataSegment(availableSegments.get(0));
server1.addDataSegment(availableSegments.get(1));

DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
for (DataSegment segment : availableSegments.subList(1, availableSegments.size())) {
server2.addDataSegment(segment);
}

DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
loadingPeon
),
new ServerHolder(
server2.toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);

SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);

ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);

DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.build();

DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();

Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "normal"));
Assert.assertEquals(12L, stats.getGlobalStat("deletedCount"));

exec.shutdown();
EasyMock.verify(mockPeon);
EasyMock.verify(loadingPeon);
}

@Test
public void testDropTooManyInDifferentTiers()
{
@@ -761,9 +854,9 @@ public void testDropTooManyInDifferentTiers()
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"normal",
@@ -942,6 +1035,7 @@ public void testDropServerActuallyServesSegment()

LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.replay(anotherMockPeon);

DruidCluster druidCluster = new DruidCluster(
@@ -1409,11 +1503,25 @@ private void mockCoordinator()

private void mockEmptyPeon()
{
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
mockEmptyPeon(mockPeon);
}

private void mockEmptyPeon(LoadQueuePeon peon)
{
EasyMock.expect(peon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(peon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(peon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(peon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(peon);
}

private void mockLoadingPeon(LoadQueuePeon peon, long size, int segments)
{
EasyMock.expect(peon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(peon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(peon.getLoadQueueSize()).andReturn(size).atLeastOnce();
EasyMock.expect(peon.getNumberOfSegmentsInQueue()).andReturn(segments).anyTimes();
EasyMock.replay(peon);
}

private CoordinatorDynamicConfig createCoordinatorDynamicConfig()
@@ -286,6 +286,9 @@ public void testDrop()
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockBalancerStrategy.pickServersToDrop(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(2);
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);

LoadRule rule = createLoadRule(ImmutableMap.of(
@@ -430,7 +433,9 @@ public void testDropWithNonExistentTier()
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();

EasyMock.expect(mockBalancerStrategy.pickServersToDrop(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(1);
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);

LoadRule rule = createLoadRule(ImmutableMap.of(
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.