Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coordinator drop segment selection through cost balancer #5529

Merged
merged 6 commits into from
Apr 3, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import io.druid.timeline.DataSegment;

import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;

public interface BalancerStrategy
{
Expand All @@ -31,5 +33,10 @@ public interface BalancerStrategy

BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders);

default Iterator<ServerHolder> pickServersToDrop(DataSegment toDropSegment, NavigableSet<ServerHolder> serverHolders)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have some javadoc explaining what the returned iterator should be. It's non-obvious: the "obvious" return value would be a set of servers to drop from, where the order doesn't matter. But the actual return value is ordered by preferredness, and may contain more servers than we actually want to drop from.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and added javadoc for whole interface, lmk if I got anything wrong

{
return serverHolders.descendingIterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to include a comment about why this descendingIterator is meaningful. (Like the comment on the old code: // Use the reverse order to get the holders with least available size first.)

}

void emitStats(String tier, CoordinatorStats stats, List<ServerHolder> serverHolderList);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
import org.joda.time.Interval;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.NavigableSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

public class CostBalancerStrategy implements BalancerStrategy
{
Expand Down Expand Up @@ -219,6 +223,34 @@ public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHo
return sampler.getRandomBalancerSegmentHolder(serverHolders);
}

@Override
public Iterator<ServerHolder> pickServersToDrop(DataSegment toDrop, NavigableSet<ServerHolder> serverHolders)
{
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList();

for (final ServerHolder server : serverHolders) {
futures.add(
exec.submit(
() -> Pair.of(computeCost(toDrop, server, true), server)
)
);
}

final ListenableFuture<List<Pair<Double, ServerHolder>>> resultsFuture = Futures.allAsList(futures);

try {
List<Pair<Double, ServerHolder>> results = resultsFuture.get();
return results.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is slightly obtuse (what's the double? why reverse it?) and would benefit from a comment.

.sorted(Comparator.comparingDouble((Pair<Double, ServerHolder> o) -> o.lhs).reversed())
.map(x -> x.rhs).collect(Collectors.toList())
.iterator();
}
catch (Exception e) {
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit();
}
return Collections.emptyIterator();
}

/**
* Calculates the initial cost of the Druid segment configuration.
*
Expand Down Expand Up @@ -341,14 +373,7 @@ protected Pair<Double, ServerHolder> chooseBestServer(
for (final ServerHolder server : serverHolders) {
futures.add(
exec.submit(
new Callable<Pair<Double, ServerHolder>>()
{
@Override
public Pair<Double, ServerHolder> call()
{
return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server);
}
}
() -> Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

package io.druid.server.coordinator.rules;

import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.common.IAE;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
Expand All @@ -39,6 +40,7 @@
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -209,7 +211,7 @@ private ServerHolder assignPrimary(
}

/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
* @param tierToSkip if not null, this tier will be skipped from doing assignment, use when primary replica was
* assigned.
*/
Expand Down Expand Up @@ -320,7 +322,7 @@ private void drop(
} else {
final int currentReplicantsInTier = entry.getIntValue();
final int numToDrop = currentReplicantsInTier - targetReplicants.getOrDefault(tier, 0);
numDropped = dropForTier(numToDrop, holders, segment);
numDropped = dropForTier(numToDrop, holders, segment, params.getBalancerStrategy());
}

stats.addToTieredStat(DROPPED_COUNT, tier, numDropped);
Expand All @@ -346,13 +348,17 @@ private boolean loadingInProgress(final DruidCluster druidCluster)
private static int dropForTier(
final int numToDrop,
final NavigableSet<ServerHolder> holdersInTier,
final DataSegment segment
final DataSegment segment,
final BalancerStrategy balancerStrategy
)
{
int numDropped = 0;

// Use the reverse order to get the holders with least available size first.
final Iterator<ServerHolder> iterator = holdersInTier.descendingIterator();
final NavigableSet<ServerHolder> isServingSubset =
holdersInTier.stream().filter(s -> s.isServingSegment(segment)).collect(Collectors.toCollection(TreeSet::new));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's still a (now pointless) check for isServingSegment below. It should be removed lest it confuse people.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the check in case in the time between the initial filtering and the loop something changed and the server was no longer serving the segment, but I can remove the extra check if you think it's safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a race between pickServersToDrop and this line, then there's still a race even with the check. I think it's ok to leave the check in, but maybe log a warning if it actually gets tripped. (We don't expect it to unless there is some race)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 added log.warn


final Iterator<ServerHolder> iterator = balancerStrategy.pickServersToDrop(segment, isServingSubset);

while (numDropped < numToDrop) {
if (!iterator.hasNext()) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.DruidServer;
Expand Down Expand Up @@ -712,6 +713,98 @@ public void testDropTooManyInSameTier()
EasyMock.verify(mockPeon);
}

@Test
public void testDropTooManyInSameTierWithLoadQueue()
{
mockCoordinator();
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();

LoadQueuePeon loadingPeon = EasyMock.createMock(LoadQueuePeon.class);
mockLoadingPeon(loadingPeon, availableSegments.size() + 10, availableSegments.size());

EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"),
ImmutableMap.<String, Integer>of("normal", 1)
),
new IntervalDropRule(Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);

DruidServer server1 = new DruidServer(
"serverNorm",
"hostNorm",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
server1.addDataSegment(availableSegments.get(0));
server1.addDataSegment(availableSegments.get(1));

DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
null,
1000,
ServerType.HISTORICAL,
"normal",
0
);
for (DataSegment segment : availableSegments.subList(1, availableSegments.size())) {
server2.addDataSegment(segment);
}

DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
loadingPeon
),
new ServerHolder(
server2.toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);

SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster);

ListeningExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy =
new CostBalancerStrategyFactory().createBalancerStrategy(exec);

DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.build();

DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();

Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "normal"));
Assert.assertEquals(12L, stats.getGlobalStat("deletedCount"));

exec.shutdown();
EasyMock.verify(mockPeon);
EasyMock.verify(loadingPeon);
}

@Test
public void testDropTooManyInDifferentTiers()
{
Expand Down Expand Up @@ -761,9 +854,9 @@ public void testDropTooManyInDifferentTiers()
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
new ServerHolder(
server1.toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))),
"normal",
Expand Down Expand Up @@ -942,6 +1035,7 @@ public void testDropServerActuallyServesSegment()

LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.replay(anotherMockPeon);

DruidCluster druidCluster = new DruidCluster(
Expand Down Expand Up @@ -1409,11 +1503,25 @@ private void mockCoordinator()

private void mockEmptyPeon()
{
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
mockEmptyPeon(mockPeon);
}

private void mockEmptyPeon(LoadQueuePeon peon)
{
EasyMock.expect(peon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(peon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(peon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(peon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(peon);
}

private void mockLoadingPeon(LoadQueuePeon peon, long size, int segments)
{
EasyMock.expect(peon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(peon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(peon.getLoadQueueSize()).andReturn(size).atLeastOnce();
EasyMock.expect(peon.getNumberOfSegmentsInQueue()).andReturn(segments).anyTimes();
EasyMock.replay(peon);
}

private CoordinatorDynamicConfig createCoordinatorDynamicConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ public void testDrop()
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockBalancerStrategy.pickServersToDrop(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(2);
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);

LoadRule rule = createLoadRule(ImmutableMap.of(
Expand Down Expand Up @@ -430,7 +433,9 @@ public void testDropWithNonExistentTier()
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();

EasyMock.expect(mockBalancerStrategy.pickServersToDrop(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(1);
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);

LoadRule rule = createLoadRule(ImmutableMap.of(
Expand Down