Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,7 @@ The Druid SQL server is configured through the following properties on the broke
|`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|1|
|`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M|
|`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true|
|`druid.sql.planner.awaitInitializationOnStart`|Boolean|Whether the the Broker will wait for its SQL metadata view to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.broker.segment.awaitInitializationOnStart`, a related setting.|true|
|`druid.sql.planner.maxQueryCount`|Maximum number of queries to issue, including nested queries. Set to 1 to disable sub-queries, or set to 0 for unlimited.|8|
|`druid.sql.planner.maxSemiJoinRowsInMemory`|Maximum number of rows to keep in memory for executing two-stage semi-join queries like `SELECT * FROM Employee WHERE DeptName IN (SELECT DeptName FROM Dept)`.|100000|
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.html). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.html) instead.|100000|
Expand Down Expand Up @@ -1291,6 +1292,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti
|`druid.announcer.type`|batch or http|Segment discovery method to use. "http" enables discovering segments using HTTP instead of zookeeper.|batch|
|`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true|

## Historical

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public class BrokerSegmentWatcherConfig
@JsonProperty
private Set<String> watchedDataSources = null;

@JsonProperty
private boolean awaitInitializationOnStart = true;

public Set<String> getWatchedTiers()
{
return watchedTiers;
Expand All @@ -42,4 +45,9 @@ public Set<String> getWatchedDataSources()
{
return watchedDataSources;
}

public boolean isAwaitInitializationOnStart()
{
return awaitInitializationOnStart;
}
}
52 changes: 41 additions & 11 deletions server/src/main/java/org/apache/druid/client/BrokerServerView.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.druid.client.selector.QueryableDruidServer;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.EscalatedClient;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
Expand All @@ -49,13 +51,15 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
*/
@ManageLifecycle
public class BrokerServerView implements TimelineServerView
{
private static final Logger log = new Logger(BrokerServerView.class);
Expand All @@ -74,19 +78,20 @@ public class BrokerServerView implements TimelineServerView
private final FilteredServerInventoryView baseView;
private final TierSelectorStrategy tierSelectorStrategy;
private final ServiceEmitter emitter;
private final BrokerSegmentWatcherConfig segmentWatcherConfig;
private final Predicate<Pair<DruidServerMetadata, DataSegment>> segmentFilter;

private volatile boolean initialized = false;
private final CountDownLatch initialized = new CountDownLatch(1);

@Inject
public BrokerServerView(
QueryToolChestWarehouse warehouse,
QueryWatcher queryWatcher,
@Smile ObjectMapper smileMapper,
@EscalatedClient HttpClient httpClient,
FilteredServerInventoryView baseView,
TierSelectorStrategy tierSelectorStrategy,
ServiceEmitter emitter,
final QueryToolChestWarehouse warehouse,
final QueryWatcher queryWatcher,
final @Smile ObjectMapper smileMapper,
final @EscalatedClient HttpClient httpClient,
final FilteredServerInventoryView baseView,
final TierSelectorStrategy tierSelectorStrategy,
final ServiceEmitter emitter,
final BrokerSegmentWatcherConfig segmentWatcherConfig
)
{
Expand All @@ -97,6 +102,7 @@ public BrokerServerView(
this.baseView = baseView;
this.tierSelectorStrategy = tierSelectorStrategy;
this.emitter = emitter;
this.segmentWatcherConfig = segmentWatcherConfig;
this.clients = new ConcurrentHashMap<>();
this.selectors = new HashMap<>();
this.timelines = new HashMap<>();
Expand Down Expand Up @@ -143,7 +149,7 @@ public ServerView.CallbackAction segmentRemoved(final DruidServerMetadata server
@Override
public CallbackAction segmentViewInitialized()
{
initialized = true;
initialized.countDown();
runTimelineCallbacks(TimelineCallback::timelineInitialized);
return ServerView.CallbackAction.CONTINUE;
}
Expand All @@ -165,9 +171,25 @@ public ServerView.CallbackAction serverRemoved(DruidServer server)
);
}

@LifecycleStart
public void start() throws InterruptedException
{
if (segmentWatcherConfig.isAwaitInitializationOnStart()) {
final long startMillis = System.currentTimeMillis();
log.info("%s waiting for initialization.", getClass().getSimpleName());
awaitInitialization();
log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), System.currentTimeMillis() - startMillis);
}
}

public boolean isInitialized()
{
return initialized;
return initialized.getCount() == 0;
}

public void awaitInitialization() throws InterruptedException
{
initialized.await();
}

private QueryableDruidServer addServer(DruidServer server)
Expand All @@ -183,7 +205,15 @@ private QueryableDruidServer addServer(DruidServer server)

private DirectDruidClient makeDirectClient(DruidServer server)
{
return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getScheme(), server.getHost(), emitter);
return new DirectDruidClient(
warehouse,
queryWatcher,
smileMapper,
httpClient,
server.getScheme(),
server.getHost(),
emitter
);
}

private QueryableDruidServer removeServer(DruidServer server)
Expand Down

This file was deleted.

18 changes: 9 additions & 9 deletions services/src/main/java/org/apache/druid/cli/CliBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.cli;

import com.google.common.collect.ImmutableList;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
Expand Down Expand Up @@ -51,7 +50,6 @@
import org.apache.druid.server.BrokerQueryResource;
import org.apache.druid.server.ClientInfoResource;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.coordination.broker.DruidBroker;
import org.apache.druid.server.http.BrokerResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.MetricsModule;
Expand Down Expand Up @@ -94,7 +92,7 @@ protected List<? extends Module> getModules()
binder.bindConstant().annotatedWith(PruneLoadSpec.class).to(true);

binder.bind(CachingClusteredClient.class).in(LazySingleton.class);
binder.bind(BrokerServerView.class).in(LazySingleton.class);
LifecycleModule.register(binder, BrokerServerView.class);
binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class);

JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class);
Expand All @@ -117,19 +115,21 @@ protected List<? extends Module> getModules()
Jerseys.addResource(binder, ClientInfoResource.class);

LifecycleModule.register(binder, BrokerQueryResource.class);
LifecycleModule.register(binder, DruidBroker.class);

Jerseys.addResource(binder, HttpServerInventoryViewResource.class);

MetricsModule.register(binder, CacheMonitor.class);

LifecycleModule.register(binder, Server.class);

binder
.bind(DiscoverySideEffectsProvider.Child.class)
.toProvider(new DiscoverySideEffectsProvider(NodeType.BROKER, ImmutableList.of(LookupNodeService.class)))
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));

bindAnnouncer(
binder,
DiscoverySideEffectsProvider.builder(NodeType.BROKER)
.serviceClasses(ImmutableList.of(LookupNodeService.class))
.useLegacyAnnouncer(true)
.build()
);
},
new LookupModule(),
new SqlModule()
Expand Down
13 changes: 5 additions & 8 deletions services/src/main/java/org/apache/druid/cli/CliCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
Expand Down Expand Up @@ -217,12 +215,11 @@ public void configure(Binder binder)
DruidCoordinatorCleanupPendingSegments.class
);

binder
.bind(DiscoverySideEffectsProvider.Child.class)
.annotatedWith(Coordinator.class)
.toProvider(new DiscoverySideEffectsProvider(NodeType.COORDINATOR, ImmutableList.of()))
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, Coordinator.class));
bindAnnouncer(
binder,
Coordinator.class,
DiscoverySideEffectsProvider.builder(NodeType.COORDINATOR).build()
);
}

@Provides
Expand Down
18 changes: 6 additions & 12 deletions services/src/main/java/org/apache/druid/cli/CliHistorical.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
package org.apache.druid.cli;

import com.google.common.collect.ImmutableList;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CacheMonitor;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.CacheModule;
Expand Down Expand Up @@ -103,16 +101,12 @@ protected List<? extends Module> getModules()
binder.install(new CacheModule());
MetricsModule.register(binder, CacheMonitor.class);

binder
.bind(DiscoverySideEffectsProvider.Child.class)
.toProvider(
new DiscoverySideEffectsProvider(
NodeType.HISTORICAL,
ImmutableList.of(DataNodeService.class, LookupNodeService.class)
)
)
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
bindAnnouncer(
binder,
DiscoverySideEffectsProvider.builder(NodeType.HISTORICAL)
.serviceClasses(ImmutableList.of(LookupNodeService.class))
.build()
);
},
new LookupModule()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void configure(Binder binder)
binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);

binder.bind(IndexingServiceClient.class).toProvider(Providers.of(null));
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>(){})
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>() {})
.toProvider(Providers.of(null));
binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));
PolyBind.createChoice(
Expand Down Expand Up @@ -130,13 +130,12 @@ public void configure(Binder binder)

LifecycleModule.register(binder, Server.class);

binder
.bind(DiscoverySideEffectsProvider.Child.class)
.toProvider(
new DiscoverySideEffectsProvider(NodeType.MIDDLE_MANAGER, ImmutableList.of(WorkerNodeService.class))
)
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
bindAnnouncer(
binder,
DiscoverySideEffectsProvider.builder(NodeType.MIDDLE_MANAGER)
.serviceClasses(ImmutableList.of(WorkerNodeService.class))
.build()
);
}

@Provides
Expand Down
Loading