Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ private SortedMap<DruidServer, List<SegmentDescriptor>> groupSegmentsByServer(Se
{
final SortedMap<DruidServer, List<SegmentDescriptor>> serverSegments = new TreeMap<>();
for (SegmentServerSelector segmentServer : segments) {
final QueryableDruidServer queryableDruidServer = segmentServer.getServer().pick();
final QueryableDruidServer queryableDruidServer = segmentServer.getServer().pick(query);

if (queryableDruidServer == null) {
log.makeAlert(
Expand Down Expand Up @@ -812,7 +812,8 @@ String computeResultLevelCachingEtag(
Hasher hasher = Hashing.sha1().newHasher();
boolean hasOnlyHistoricalSegments = true;
for (SegmentServerSelector p : segments) {
if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) {
QueryableDruidServer queryableServer = p.getServer().pick(query);
if (queryableServer == null || !queryableServer.getServer().isSegmentReplicationTarget()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it should ideally fail if queryableServer is null, because it means there is no server to process this segment. However, this seems OK because this method will return immediately when it's null and fail at here.

hasOnlyHistoricalSegments = false;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import org.apache.druid.query.Query;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
Expand All @@ -38,27 +39,29 @@ public AbstractTierSelectorStrategy(ServerSelectorStrategy serverSelectorStrateg
{
this.serverSelectorStrategy = serverSelectorStrategy;
}

@Nullable
@Override
public QueryableDruidServer pick(
public <T> QueryableDruidServer pick(
Query<T> query,
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment
)
{
return Iterables.getOnlyElement(pick(prioritizedServers, segment, 1), null);
return Iterables.getOnlyElement(pick(query, prioritizedServers, segment, 1), null);
}

@Override
public List<QueryableDruidServer> pick(
public <T> List<QueryableDruidServer> pick(
Query<T> query,
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment,
int numServersToPick
)
{
List<QueryableDruidServer> result = new ArrayList<>(numServersToPick);
for (Set<QueryableDruidServer> priorityServers : prioritizedServers.values()) {
result.addAll(serverSelectorStrategy.pick(priorityServers, segment, numServersToPick - result.size()));
result.addAll(serverSelectorStrategy.pick(query, priorityServers, segment, numServersToPick - result.size()));
if (result.size() == numServersToPick) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand All @@ -34,6 +35,7 @@ public class ConnectionCountServerSelectorStrategy implements ServerSelectorStra
private static final Comparator<QueryableDruidServer> COMPARATOR =
Comparator.comparingInt(s -> ((DirectDruidClient) s.getQueryRunner()).getNumOpenConnections());

@Nullable
@Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import com.google.common.collect.Lists;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

public class RandomServerSelectorStrategy implements ServerSelectorStrategy
{
@Nullable
@Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import org.apache.druid.client.DataSegmentInterner;
import org.apache.druid.query.Query;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
Expand All @@ -36,7 +37,7 @@

/**
*/
public class ServerSelector implements DiscoverySelector<QueryableDruidServer>, Overshadowable<ServerSelector>
public class ServerSelector implements Overshadowable<ServerSelector>
{

private final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> historicalServers;
Expand Down Expand Up @@ -160,14 +161,13 @@ public List<DruidServerMetadata> getAllServers()
}

@Nullable
@Override
public QueryableDruidServer pick()
public <T> QueryableDruidServer pick(@Nullable Query<T> query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this can still return null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, it sure can! thanks

{
synchronized (this) {
if (!historicalServers.isEmpty()) {
return strategy.pick(historicalServers, segment.get());
return strategy.pick(query, historicalServers, segment.get());
}
return strategy.pick(realtimeServers, segment.get());
return strategy.pick(query, realtimeServers, segment.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.Iterables;
import org.apache.druid.query.Query;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;

Expand All @@ -33,7 +36,29 @@
})
public interface ServerSelectorStrategy
{
QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment);
@Nullable
default <T> QueryableDruidServer pick(@Nullable Query<T> query, Set<QueryableDruidServer> servers, DataSegment segment)
{
return Iterables.getOnlyElement(pick(query, servers, segment, 1), null);
}

default <T> List<QueryableDruidServer> pick(@Nullable Query<T> query, Set<QueryableDruidServer> servers, DataSegment segment,
int numServersToPick)
{
return pick(servers, segment, numServersToPick);
}

@Deprecated
@Nullable
default QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
{
return pick(null, servers, segment);
}

@Deprecated
default List<QueryableDruidServer> pick(Set<QueryableDruidServer> servers, DataSegment segment, int numServersToPick)
{
return pick(null, servers, segment, numServersToPick);
}

List<QueryableDruidServer> pick(Set<QueryableDruidServer> servers, DataSegment segment, int numServersToPick);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import org.apache.druid.query.Query;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;

import java.util.Comparator;
import java.util.List;
import java.util.Set;
Expand All @@ -41,12 +43,36 @@ public interface TierSelectorStrategy
{
Comparator<Integer> getComparator();

@Deprecated
@Nullable
QueryableDruidServer pick(Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers, DataSegment segment);
default QueryableDruidServer pick(Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers, DataSegment segment)
{
return pick(null, prioritizedServers, segment);
}

@Deprecated
default List<QueryableDruidServer> pick(
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment,
int numServersToPick)
{
return pick(null, prioritizedServers, segment, numServersToPick);
}

@Nullable
default <T> QueryableDruidServer pick(@Nullable Query<T> query,
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment)
{
return pick(prioritizedServers, segment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to keep old interfaces, I think the old ones should have a default implementation that calls this new interface with null query. Then we can remove the implementations of old interfaces in AbstractTierSelectorStrategy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done for backward compatibility - I was trying to make this change as minimal as possible on the first cut. We don't have any custom implementations here and I haven't seen any on a cursory look, but I don't want to presume. I think it's cleaner to remove the ones that don't take Query and having an interface of all defaults that call each other makes the abstract methods less obvious. What do you think? I'm happy to defer to your judgement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong opinion here. The current one looks good to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

works for me, current one it is!

}

List<QueryableDruidServer> pick(
default <T> List<QueryableDruidServer> pick(
@Nullable Query<T> query,
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment,
int numServersToPick
);
int numServersToPick)
{
return pick(prioritizedServers, segment, numServersToPick);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testSingleServerAddedRemovedSegment() throws Exception
ServerSelector selector = (actualPartitionHolder.iterator().next()).getObject();
Assert.assertFalse(selector.isEmpty());
Assert.assertEquals(segment, selector.getSegment());
Assert.assertEquals(druidServer, selector.pick().getServer());
Assert.assertEquals(druidServer, selector.pick(null).getServer());

unannounceSegmentForServer(druidServer, segment, zkPathsConfig);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
Expand Down Expand Up @@ -384,7 +384,7 @@ private void assertValues(
ServerSelector selector = ((SingleElementPartitionChunk<ServerSelector>) actualPartitionHolder.iterator()
.next()).getObject();
Assert.assertFalse(selector.isEmpty());
Assert.assertEquals(expectedPair.rhs.rhs.lhs, selector.pick().getServer());
Assert.assertEquals(expectedPair.rhs.rhs.lhs, selector.pick(null).getServer());
Assert.assertEquals(expectedPair.rhs.rhs.rhs, selector.getSegment());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ private SegmentServerSelector makeServerSelector(boolean isHistorical, int parti
0
);
expect(server.isSegmentReplicationTarget()).andReturn(isHistorical).anyTimes();
expect(serverSelector.pick()).andReturn(queryableDruidServer).anyTimes();
expect(serverSelector.pick(query)).andReturn(queryableDruidServer).anyTimes();
expect(queryableDruidServer.getServer()).andReturn(server).anyTimes();
expect(serverSelector.getSegment()).andReturn(segment).anyTimes();
replay(serverSelector, queryableDruidServer, server);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void testRun() throws Exception

Assert.assertEquals(2, client2.getNumOpenConnections());

Assert.assertEquals(serverSelector.pick(), queryableDruidServer2);
Assert.assertEquals(serverSelector.pick(null), queryableDruidServer2);

EasyMock.verify(httpClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.Query;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
Expand All @@ -31,15 +32,37 @@
import org.junit.Assert;
import org.junit.Test;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class TierSelectorStrategyTest
{

@Test
public void testHighestPriorityTierSelectorStrategyRealtime()
{
DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class);
QueryableDruidServer lowPriority = new QueryableDruidServer(
new DruidServer("test1", "localhost", null, 0, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
client
);
QueryableDruidServer highPriority = new QueryableDruidServer(
new DruidServer("test1", "localhost", null, 0, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 1),
client
);

testTierSelectorStrategy(
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()),
highPriority, lowPriority
);
}

@Test
public void testHighestPriorityTierSelectorStrategy()
{
Expand Down Expand Up @@ -168,19 +191,19 @@ public void testIncompleteCustomPriorityTierSelectorStrategy()
new DruidServer("test1", "localhost", null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 3),
client
);

TierSelectorStrategy tierSelectorStrategy = new CustomTierSelectorStrategy(
new ConnectionCountServerSelectorStrategy(),
new CustomTierSelectorStrategyConfig()
{
@Override
public List<Integer> getPriorities()
{
return Arrays.asList(2, 0, -1);
}
}
);
testTierSelectorStrategy(
new CustomTierSelectorStrategy(
new ConnectionCountServerSelectorStrategy(),
new CustomTierSelectorStrategyConfig()
{
@Override
public List<Integer> getPriorities()
{
return Arrays.asList(2, 0, -1);
}
}
),
tierSelectorStrategy,
p3, p1, p0, p4, p2
);
}
Expand Down Expand Up @@ -216,9 +239,35 @@ private void testTierSelectorStrategy(
serverSelector.addServerAndUpdateSegment(server, serverSelector.getSegment());
}

Assert.assertEquals(expectedSelection[0], serverSelector.pick());
Assert.assertEquals(expectedSelection[0], serverSelector.pick(null));
Assert.assertEquals(expectedSelection[0], serverSelector.pick(EasyMock.createMock(Query.class)));
Assert.assertEquals(expectedCandidates, serverSelector.getCandidates(-1));
Assert.assertEquals(expectedCandidates.subList(0, 2), serverSelector.getCandidates(2));
}

@Test
public void testServerSelectorStrategyDefaults()
{
DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class);
QueryableDruidServer p0 = new QueryableDruidServer(
new DruidServer("test1", "localhost", null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, -1),
client
);
Set<QueryableDruidServer> servers = new HashSet<>();
servers.add(p0);
RandomServerSelectorStrategy strategy = new RandomServerSelectorStrategy();
Assert.assertEquals(strategy.pick(servers, EasyMock.createMock(DataSegment.class)), p0);
Assert.assertEquals(strategy.pick(EasyMock.createMock(Query.class), servers, EasyMock.createMock(DataSegment.class)), p0);
ServerSelectorStrategy defaultDeprecatedServerSelectorStrategy = new ServerSelectorStrategy() {
@Override
public <T> List<QueryableDruidServer> pick(@Nullable Query<T> query, Set<QueryableDruidServer> servers, DataSegment segment,
int numServersToPick)
{
return strategy.pick(servers, segment, numServersToPick);
}
};
Assert.assertEquals(defaultDeprecatedServerSelectorStrategy.pick(servers, EasyMock.createMock(DataSegment.class)), p0);
Assert.assertEquals(defaultDeprecatedServerSelectorStrategy.pick(servers, EasyMock.createMock(DataSegment.class), 1).get(0), p0);
}

}