diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 10e8f333049e..4d4551ed2f86 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -1258,6 +1258,7 @@ The Druid SQL server is configured through the following properties on the broke |`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|1| |`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M| |`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true| +|`druid.sql.planner.awaitInitializationOnStart`|Boolean|Whether the the Broker will wait for its SQL metadata view to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.broker.segment.awaitInitializationOnStart`, a related setting.|true| |`druid.sql.planner.maxQueryCount`|Maximum number of queries to issue, including nested queries. Set to 1 to disable sub-queries, or set to 0 for unlimited.|8| |`druid.sql.planner.maxSemiJoinRowsInMemory`|Maximum number of rows to keep in memory for executing two-stage semi-join queries like `SELECT * FROM Employee WHERE DeptName IN (SELECT DeptName FROM Dept)`.|100000| |`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.html). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.html) instead.|100000| @@ -1291,6 +1292,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti |`druid.announcer.type`|batch or http|Segment discovery method to use. "http" enables discovering segments using HTTP instead of zookeeper.|batch| |`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none| |`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none| +|`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true| ## Historical diff --git a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java index 9a0fc54163a9..0abb45673631 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java +++ b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java @@ -33,6 +33,9 @@ public class BrokerSegmentWatcherConfig @JsonProperty private Set watchedDataSources = null; + @JsonProperty + private boolean awaitInitializationOnStart = true; + public Set getWatchedTiers() { return watchedTiers; @@ -42,4 +45,9 @@ public Set getWatchedDataSources() { return watchedDataSources; } + + public boolean isAwaitInitializationOnStart() + { + return awaitInitializationOnStart; + } } diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 574cdc874b2f..0b28a1b9ebb6 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -27,10 +27,12 @@ import org.apache.druid.client.selector.QueryableDruidServer; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.client.selector.TierSelectorStrategy; +import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.EscalatedClient; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; @@ -49,6 +51,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.function.Function; @@ -56,6 +59,7 @@ /** */ +@ManageLifecycle public class BrokerServerView implements TimelineServerView { private static final Logger log = new Logger(BrokerServerView.class); @@ -74,19 +78,20 @@ public class BrokerServerView implements TimelineServerView private final FilteredServerInventoryView baseView; private final TierSelectorStrategy tierSelectorStrategy; private final ServiceEmitter emitter; + private final BrokerSegmentWatcherConfig segmentWatcherConfig; private final Predicate> segmentFilter; - private volatile boolean initialized = false; + private final CountDownLatch initialized = new CountDownLatch(1); @Inject public BrokerServerView( - QueryToolChestWarehouse warehouse, - QueryWatcher queryWatcher, - @Smile ObjectMapper smileMapper, - @EscalatedClient HttpClient httpClient, - FilteredServerInventoryView baseView, - TierSelectorStrategy tierSelectorStrategy, - ServiceEmitter emitter, + final QueryToolChestWarehouse warehouse, + final QueryWatcher queryWatcher, + final @Smile ObjectMapper smileMapper, + final @EscalatedClient HttpClient httpClient, + final FilteredServerInventoryView baseView, + final TierSelectorStrategy tierSelectorStrategy, + final ServiceEmitter emitter, final BrokerSegmentWatcherConfig segmentWatcherConfig ) { @@ -97,6 +102,7 @@ public BrokerServerView( this.baseView = baseView; this.tierSelectorStrategy = tierSelectorStrategy; this.emitter = emitter; + this.segmentWatcherConfig = segmentWatcherConfig; this.clients = new ConcurrentHashMap<>(); this.selectors = new HashMap<>(); this.timelines = new HashMap<>(); @@ -143,7 +149,7 @@ public ServerView.CallbackAction segmentRemoved(final DruidServerMetadata server @Override public CallbackAction segmentViewInitialized() { - initialized = true; + initialized.countDown(); runTimelineCallbacks(TimelineCallback::timelineInitialized); return ServerView.CallbackAction.CONTINUE; } @@ -165,9 +171,25 @@ public ServerView.CallbackAction serverRemoved(DruidServer server) ); } + @LifecycleStart + public void start() throws InterruptedException + { + if (segmentWatcherConfig.isAwaitInitializationOnStart()) { + final long startMillis = System.currentTimeMillis(); + log.info("%s waiting for initialization.", getClass().getSimpleName()); + awaitInitialization(); + log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), System.currentTimeMillis() - startMillis); + } + } + public boolean isInitialized() { - return initialized; + return initialized.getCount() == 0; + } + + public void awaitInitialization() throws InterruptedException + { + initialized.await(); } private QueryableDruidServer addServer(DruidServer server) @@ -183,7 +205,15 @@ private QueryableDruidServer addServer(DruidServer server) private DirectDruidClient makeDirectClient(DruidServer server) { - return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getScheme(), server.getHost(), emitter); + return new DirectDruidClient( + warehouse, + queryWatcher, + smileMapper, + httpClient, + server.getScheme(), + server.getHost(), + emitter + ); } private QueryableDruidServer removeServer(DruidServer server) diff --git a/server/src/main/java/org/apache/druid/server/coordination/broker/DruidBroker.java b/server/src/main/java/org/apache/druid/server/coordination/broker/DruidBroker.java deleted file mode 100644 index 41e6d9ad0b5c..000000000000 --- a/server/src/main/java/org/apache/druid/server/coordination/broker/DruidBroker.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.coordination.broker; - -import com.google.common.base.Predicates; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.inject.Inject; -import org.apache.druid.client.FilteredServerInventoryView; -import org.apache.druid.client.ServerView; -import org.apache.druid.curator.discovery.ServiceAnnouncer; -import org.apache.druid.guice.ManageLifecycle; -import org.apache.druid.guice.annotations.Self; -import org.apache.druid.java.util.common.lifecycle.LifecycleStart; -import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.server.DruidNode; - -@ManageLifecycle -public class DruidBroker -{ - private final DruidNode self; - private final ServiceAnnouncer serviceAnnouncer; - - private volatile boolean started = false; - - @Inject - public DruidBroker( - final FilteredServerInventoryView serverInventoryView, - final @Self DruidNode self, - final ServiceAnnouncer serviceAnnouncer - ) - { - this.self = self; - this.serviceAnnouncer = serviceAnnouncer; - - serverInventoryView.registerSegmentCallback( - MoreExecutors.sameThreadExecutor(), - new ServerView.BaseSegmentCallback() - { - @Override - public ServerView.CallbackAction segmentViewInitialized() - { - serviceAnnouncer.announce(self); - return ServerView.CallbackAction.UNREGISTER; - } - }, - // We are not interested in any segment callbacks except view initialization - Predicates.alwaysFalse() - ); - } - - @LifecycleStart - public void start() - { - synchronized (self) { - if (started) { - return; - } - started = true; - } - } - - @LifecycleStop - public void stop() - { - synchronized (self) { - if (!started) { - return; - } - serviceAnnouncer.unannounce(self); - started = false; - } - } -} diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index 009a6fedbc54..982223aabd26 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -20,7 +20,6 @@ package org.apache.druid.cli; import com.google.common.collect.ImmutableList; -import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.name.Names; import io.airlift.airline.Command; @@ -51,7 +50,6 @@ import org.apache.druid.server.BrokerQueryResource; import org.apache.druid.server.ClientInfoResource; import org.apache.druid.server.ClientQuerySegmentWalker; -import org.apache.druid.server.coordination.broker.DruidBroker; import org.apache.druid.server.http.BrokerResource; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.MetricsModule; @@ -94,7 +92,7 @@ protected List getModules() binder.bindConstant().annotatedWith(PruneLoadSpec.class).to(true); binder.bind(CachingClusteredClient.class).in(LazySingleton.class); - binder.bind(BrokerServerView.class).in(LazySingleton.class); + LifecycleModule.register(binder, BrokerServerView.class); binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class); @@ -117,7 +115,6 @@ protected List getModules() Jerseys.addResource(binder, ClientInfoResource.class); LifecycleModule.register(binder, BrokerQueryResource.class); - LifecycleModule.register(binder, DruidBroker.class); Jerseys.addResource(binder, HttpServerInventoryViewResource.class); @@ -125,11 +122,14 @@ protected List getModules() LifecycleModule.register(binder, Server.class); - binder - .bind(DiscoverySideEffectsProvider.Child.class) - .toProvider(new DiscoverySideEffectsProvider(NodeType.BROKER, ImmutableList.of(LookupNodeService.class))) - .in(LazySingleton.class); - LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); + + bindAnnouncer( + binder, + DiscoverySideEffectsProvider.builder(NodeType.BROKER) + .serviceClasses(ImmutableList.of(LookupNodeService.class)) + .useLegacyAnnouncer(true) + .build() + ); }, new LookupModule(), new SqlModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index dd4aa53caa0a..7f36e0cb6065 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -21,10 +21,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Inject; -import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.name.Names; @@ -217,12 +215,11 @@ public void configure(Binder binder) DruidCoordinatorCleanupPendingSegments.class ); - binder - .bind(DiscoverySideEffectsProvider.Child.class) - .annotatedWith(Coordinator.class) - .toProvider(new DiscoverySideEffectsProvider(NodeType.COORDINATOR, ImmutableList.of())) - .in(LazySingleton.class); - LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, Coordinator.class)); + bindAnnouncer( + binder, + Coordinator.class, + DiscoverySideEffectsProvider.builder(NodeType.COORDINATOR).build() + ); } @Provides diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java b/services/src/main/java/org/apache/druid/cli/CliHistorical.java index 7d3c7e8767b3..d8432d3d55c8 100644 --- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java +++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java @@ -20,13 +20,11 @@ package org.apache.druid.cli; import com.google.common.collect.ImmutableList; -import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.name.Names; import io.airlift.airline.Command; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CacheMonitor; -import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.CacheModule; @@ -103,16 +101,12 @@ protected List getModules() binder.install(new CacheModule()); MetricsModule.register(binder, CacheMonitor.class); - binder - .bind(DiscoverySideEffectsProvider.Child.class) - .toProvider( - new DiscoverySideEffectsProvider( - NodeType.HISTORICAL, - ImmutableList.of(DataNodeService.class, LookupNodeService.class) - ) - ) - .in(LazySingleton.class); - LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); + bindAnnouncer( + binder, + DiscoverySideEffectsProvider.builder(NodeType.HISTORICAL) + .serviceClasses(ImmutableList.of(LookupNodeService.class)) + .build() + ); }, new LookupModule() ); diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index e266233f8264..25c4e13e32dc 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -100,7 +100,7 @@ public void configure(Binder binder) binder.bind(ForkingTaskRunner.class).in(LazySingleton.class); binder.bind(IndexingServiceClient.class).toProvider(Providers.of(null)); - binder.bind(new TypeLiteral>(){}) + binder.bind(new TypeLiteral>() {}) .toProvider(Providers.of(null)); binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null)); PolyBind.createChoice( @@ -130,13 +130,12 @@ public void configure(Binder binder) LifecycleModule.register(binder, Server.class); - binder - .bind(DiscoverySideEffectsProvider.Child.class) - .toProvider( - new DiscoverySideEffectsProvider(NodeType.MIDDLE_MANAGER, ImmutableList.of(WorkerNodeService.class)) - ) - .in(LazySingleton.class); - LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); + bindAnnouncer( + binder, + DiscoverySideEffectsProvider.builder(NodeType.MIDDLE_MANAGER) + .serviceClasses(ImmutableList.of(WorkerNodeService.class)) + .build() + ); } @Provides diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 540722195e8a..1cd3954936af 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -194,7 +194,7 @@ public void configure(Binder binder) binder.bind(SupervisorManager.class).in(LazySingleton.class); binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class); - binder.bind(new TypeLiteral>(){}) + binder.bind(new TypeLiteral>() {}) .toProvider(Providers.of(null)); binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null)); @@ -237,12 +237,11 @@ public void configure(Binder binder) LifecycleModule.register(binder, Server.class); } - binder - .bind(DiscoverySideEffectsProvider.Child.class) - .annotatedWith(IndexingService.class) - .toProvider(new DiscoverySideEffectsProvider(NodeType.OVERLORD, ImmutableList.of())) - .in(LazySingleton.class); - LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, IndexingService.class)); + bindAnnouncer( + binder, + IndexingService.class, + DiscoverySideEffectsProvider.builder(NodeType.OVERLORD).build() + ); } private void configureTaskStorage(Binder binder) @@ -284,10 +283,14 @@ private void configureRunners(Binder binder) biddy.addBinding("local").to(ForkingTaskRunnerFactory.class); binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class); - biddy.addBinding(RemoteTaskRunnerFactory.TYPE_NAME).to(RemoteTaskRunnerFactory.class).in(LazySingleton.class); + biddy.addBinding(RemoteTaskRunnerFactory.TYPE_NAME) + .to(RemoteTaskRunnerFactory.class) + .in(LazySingleton.class); binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class); - biddy.addBinding(HttpRemoteTaskRunnerFactory.TYPE_NAME).to(HttpRemoteTaskRunnerFactory.class).in(LazySingleton.class); + biddy.addBinding(HttpRemoteTaskRunnerFactory.TYPE_NAME) + .to(HttpRemoteTaskRunnerFactory.class) + .in(LazySingleton.class); binder.bind(HttpRemoteTaskRunnerFactory.class).in(LazySingleton.class); JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null); diff --git a/services/src/main/java/org/apache/druid/cli/CliRouter.java b/services/src/main/java/org/apache/druid/cli/CliRouter.java index 8c0d83e4d938..4adeab86d0e5 100644 --- a/services/src/main/java/org/apache/druid/cli/CliRouter.java +++ b/services/src/main/java/org/apache/druid/cli/CliRouter.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; -import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.TypeLiteral; @@ -121,11 +120,10 @@ public void configure(Binder binder) LifecycleModule.register(binder, Server.class); DiscoveryModule.register(binder, Self.class); - binder - .bind(DiscoverySideEffectsProvider.Child.class) - .toProvider(new DiscoverySideEffectsProvider(NodeType.ROUTER, ImmutableList.of())) - .in(LazySingleton.class); - LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); + bindAnnouncer( + binder, + DiscoverySideEffectsProvider.builder(NodeType.ROUTER).build() + ); } @Provides diff --git a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java index 7f60f1f15d2c..77d409cb6e78 100644 --- a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java +++ b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java @@ -20,19 +20,26 @@ package org.apache.druid.cli; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Injector; +import com.google.inject.Key; import com.google.inject.Provider; +import org.apache.druid.curator.discovery.ServiceAnnouncer; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.DruidService; import org.apache.druid.discovery.NodeType; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.DruidNode; +import java.lang.annotation.Annotation; import java.util.List; /** @@ -58,6 +65,32 @@ public void run() } } + public static void bindAnnouncer( + final Binder binder, + final DiscoverySideEffectsProvider provider + ) + { + binder.bind(DiscoverySideEffectsProvider.Child.class) + .toProvider(provider) + .in(LazySingleton.class); + + LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); + } + + public static void bindAnnouncer( + final Binder binder, + final Class annotation, + final DiscoverySideEffectsProvider provider + ) + { + binder.bind(DiscoverySideEffectsProvider.Child.class) + .annotatedWith(annotation) + .toProvider(provider) + .in(LazySingleton.class); + + LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, annotation)); + } + /** * This is a helper class used by CliXXX classes to announce {@link DiscoveryDruidNode} * as part of {@link Lifecycle.Stage#LAST}. @@ -66,12 +99,50 @@ protected static class DiscoverySideEffectsProvider implements Provider> serviceClasses = ImmutableList.of(); + private boolean useLegacyAnnouncer; + + public Builder(final NodeType nodeType) + { + this.nodeType = nodeType; + } + + public Builder serviceClasses(final List> serviceClasses) + { + this.serviceClasses = serviceClasses; + return this; + } + + public Builder useLegacyAnnouncer(final boolean useLegacyAnnouncer) + { + this.useLegacyAnnouncer = useLegacyAnnouncer; + return this; + } + + public DiscoverySideEffectsProvider build() + { + return new DiscoverySideEffectsProvider(nodeType, serviceClasses, useLegacyAnnouncer); + } + } + + public static Builder builder(final NodeType nodeType) + { + return new Builder(nodeType); + } + + @Inject + @Self private DruidNode druidNode; @Inject private DruidNodeAnnouncer announcer; + @Inject + private ServiceAnnouncer legacyAnnouncer; + @Inject private Lifecycle lifecycle; @@ -80,11 +151,17 @@ public static class Child {} private final NodeType nodeType; private final List> serviceClasses; + private final boolean useLegacyAnnouncer; - public DiscoverySideEffectsProvider(NodeType nodeType, List> serviceClasses) + private DiscoverySideEffectsProvider( + final NodeType nodeType, + final List> serviceClasses, + final boolean useLegacyAnnouncer + ) { this.nodeType = nodeType; this.serviceClasses = serviceClasses; + this.useLegacyAnnouncer = useLegacyAnnouncer; } @Override @@ -105,11 +182,21 @@ public Child get() public void start() { announcer.announce(discoveryDruidNode); + + if (useLegacyAnnouncer) { + legacyAnnouncer.announce(discoveryDruidNode.getDruidNode()); + } } @Override public void stop() { + // Reverse order vs. start(). + + if (useLegacyAnnouncer) { + legacyAnnouncer.unannounce(discoveryDruidNode.getDruidNode()); + } + announcer.unannounce(discoveryDruidNode); } }, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java index e46ff8b24734..fe9e72ffb4be 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java @@ -60,6 +60,9 @@ public class PlannerConfig @JsonProperty private boolean requireTimeCondition = false; + @JsonProperty + private boolean awaitInitializationOnStart = true; + @JsonProperty private DateTimeZone sqlTimeZone = DateTimeZone.UTC; @@ -113,6 +116,11 @@ public DateTimeZone getSqlTimeZone() return sqlTimeZone; } + public boolean isAwaitInitializationOnStart() + { + return awaitInitializationOnStart; + } + public PlannerConfig withOverrides(final Map context) { if (context == null) { @@ -142,6 +150,7 @@ public PlannerConfig withOverrides(final Map context) ); newConfig.requireTimeCondition = isRequireTimeCondition(); newConfig.sqlTimeZone = getSqlTimeZone(); + newConfig.awaitInitializationOnStart = isAwaitInitializationOnStart(); return newConfig; } @@ -181,6 +190,7 @@ public boolean equals(final Object o) useApproximateTopN == that.useApproximateTopN && useFallback == that.useFallback && requireTimeCondition == that.requireTimeCondition && + awaitInitializationOnStart == that.awaitInitializationOnStart && Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod) && Objects.equals(sqlTimeZone, that.sqlTimeZone); } @@ -199,6 +209,7 @@ public int hashCode() useApproximateTopN, useFallback, requireTimeCondition, + awaitInitializationOnStart, sqlTimeZone ); } @@ -216,6 +227,7 @@ public String toString() ", useApproximateTopN=" + useApproximateTopN + ", useFallback=" + useFallback + ", requireTimeCondition=" + requireTimeCondition + + ", awaitInitializationOnStart=" + awaitInitializationOnStart + ", sqlTimeZone=" + sqlTimeZone + '}'; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index ab43307527c3..40dcb56ee840 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -19,7 +19,6 @@ package org.apache.druid.sql.calcite.schema; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; @@ -98,7 +97,7 @@ public class DruidSchema extends AbstractSchema private final ConcurrentMap tables; // For awaitInitialization. - private final CountDownLatch initializationLatch = new CountDownLatch(1); + private final CountDownLatch initialized = new CountDownLatch(1); // Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized private final Object lock = new Object(); @@ -175,7 +174,7 @@ public ServerView.CallbackAction segmentRemoved(final DataSegment segment) } @LifecycleStart - public void start() + public void start() throws InterruptedException { cacheExec.submit( new Runnable() @@ -254,7 +253,7 @@ public void run() } } - initializationLatch.countDown(); + initialized.countDown(); } catch (InterruptedException e) { // Fall through. @@ -288,6 +287,13 @@ public void run() } } ); + + if (config.isAwaitInitializationOnStart()) { + final long startMillis = System.currentTimeMillis(); + log.info("%s waiting for initialization.", getClass().getSimpleName()); + awaitInitialization(); + log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), System.currentTimeMillis() - startMillis); + } } @LifecycleStop @@ -296,10 +302,9 @@ public void stop() cacheExec.shutdownNow(); } - @VisibleForTesting public void awaitInitialization() throws InterruptedException { - initializationLatch.await(); + initialized.await(); } @Override diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 56c570fecd86..488a3558dda4 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -566,8 +566,8 @@ public static DruidSchema createMockSchema( TEST_AUTHENTICATOR_ESCALATOR ); - schema.start(); try { + schema.start(); schema.awaitInitialization(); } catch (InterruptedException e) {