Skip to content

Commit

Permalink
Add SelfDiscoveryResource; rename org.apache.druid.discovery.No… (#6702)
Browse files Browse the repository at this point in the history
* Add SelfDiscoveryResource

* Rename org.apache.druid.discovery.NodeType to NodeRole. Refactor CuratorDruidNodeDiscoveryProvider. Make SelfDiscoveryResource to listen to updates only about a single node (itself).

* Extended docs

* Fix brace

* Remove redundant throws in Lifecycle.Handler.stop()

* Import order

* Remove unresolvable link

* Address comments

* tmp

* tmp

* Rollback docker changes

* Remove extra .sh files

* Move filter

* Fix SecurityResourceFilterTest
  • Loading branch information
leventov committed Dec 8, 2019
1 parent 441515c commit 1c62987
Show file tree
Hide file tree
Showing 60 changed files with 1,055 additions and 666 deletions.
13 changes: 8 additions & 5 deletions core/src/main/java/org/apache/druid/guice/LifecycleModule.java
Expand Up @@ -37,10 +37,13 @@
*/
public class LifecycleModule implements Module
{
// this scope includes final logging shutdown, so all other handlers in this lifecycle scope should avoid logging in
// the 'stop' method, either failing silently or failing violently and throwing an exception causing an ungraceful exit
/**
* This scope includes final logging shutdown, so all other handlers in this lifecycle scope should avoid logging in
* their stop() method, either failing silently or failing violently and throwing an exception causing an ungraceful
* exit.
*/
private final LifecycleScope initScope = new LifecycleScope(Lifecycle.Stage.INIT);
private final LifecycleScope scope = new LifecycleScope(Lifecycle.Stage.NORMAL);
private final LifecycleScope normalScope = new LifecycleScope(Lifecycle.Stage.NORMAL);
private final LifecycleScope serverScope = new LifecycleScope(Lifecycle.Stage.SERVER);
private final LifecycleScope annoucementsScope = new LifecycleScope(Lifecycle.Stage.ANNOUNCEMENTS);

Expand Down Expand Up @@ -118,7 +121,7 @@ public void configure(Binder binder)
getEagerBinder(binder); // Load up the eager binder so that it will inject the empty set at a minimum.

binder.bindScope(ManageLifecycleInit.class, initScope);
binder.bindScope(ManageLifecycle.class, scope);
binder.bindScope(ManageLifecycle.class, normalScope);
binder.bindScope(ManageLifecycleServer.class, serverScope);
binder.bindScope(ManageLifecycleAnnouncements.class, annoucementsScope);
}
Expand All @@ -141,7 +144,7 @@ public void start() throws Exception
}
};
initScope.setLifecycle(lifecycle);
scope.setLifecycle(lifecycle);
normalScope.setLifecycle(lifecycle);
serverScope.setLifecycle(lifecycle);
annoucementsScope.setLifecycle(lifecycle);

Expand Down
Expand Up @@ -28,9 +28,9 @@
import java.lang.annotation.Target;

/**
* Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle}
*
* This Scope gets defined by {@link LifecycleModule}
* Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on
* {@link org.apache.druid.java.util.common.lifecycle.Lifecycle.Stage#NORMAL} stage. This stage gets defined by {@link
* LifecycleModule}.
*/
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Expand Up @@ -28,9 +28,9 @@
import java.lang.annotation.Target;

/**
* Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on Stage.ANNOUNCEMENTS
*
* This Scope gets defined by {@link LifecycleModule}
* Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on
* {@link org.apache.druid.java.util.common.lifecycle.Lifecycle.Stage#ANNOUNCEMENTS} stage. This stage gets defined by
* {@link LifecycleModule}.
*/
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Expand Up @@ -28,9 +28,9 @@
import java.lang.annotation.Target;

/**
* Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on Stage.INIT
*
* This Scope gets defined by {@link LifecycleModule}
* Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on
* {@link org.apache.druid.java.util.common.lifecycle.Lifecycle.Stage#INIT} stage. This stage gets defined by {@link
* LifecycleModule}.
*/
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Expand Up @@ -28,9 +28,9 @@
import java.lang.annotation.Target;

/**
* Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on Stage.SERVER
*
* This Scope gets defined by {@link LifecycleModule}
* Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on
* {@link org.apache.druid.java.util.common.lifecycle.Lifecycle.Stage#SERVER} stage. This stage gets defined by {@link
* LifecycleModule}.
*/
@Target({ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;

/**
Expand Down Expand Up @@ -107,6 +108,13 @@ private Closer()
{
}

public <C extends Closeable> void registerAll(Collection<C> closeables)
{
for (C closeable : closeables) {
register(closeable);
}
}

/**
* Registers the given {@code Closeable} to be closed when this {@code Closer} is
* {@linkplain #close closed}.
Expand Down
Expand Up @@ -27,12 +27,18 @@

public final class JacksonUtils
{
public static final TypeReference<Map<String, Object>> TYPE_REFERENCE_MAP_STRING_OBJECT = new TypeReference<Map<String, Object>>()
{
};
public static final TypeReference<Map<String, String>> TYPE_REFERENCE_MAP_STRING_STRING = new TypeReference<Map<String, String>>()
{
};
public static final TypeReference<Map<String, Object>> TYPE_REFERENCE_MAP_STRING_OBJECT =
new TypeReference<Map<String, Object>>()
{
};
public static final TypeReference<Map<String, String>> TYPE_REFERENCE_MAP_STRING_STRING =
new TypeReference<Map<String, String>>()
{
};
public static final TypeReference<Map<String, Boolean>> TYPE_REFERENCE_MAP_STRING_BOOLEAN =
new TypeReference<Map<String, Boolean>>()
{
};

/** Silences Jackson's {@link IOException}. */
public static <T> T readValue(ObjectMapper mapper, byte[] bytes, Class<T> valueClass)
Expand Down
Expand Up @@ -42,27 +42,29 @@
* A manager of object Lifecycles.
*
* This object has methods for registering objects that should be started and stopped. The Lifecycle allows for
* four stages: Stage.INIT, Stage.NORMAL, Stage.SERVER, and Stage.ANNOUNCEMENTS.
* four stages: {@link Stage#INIT}, {@link Stage#NORMAL}, {@link Stage#SERVER}, and {@link Stage#ANNOUNCEMENTS}.
*
* Things added at Stage.INIT will be started first (in the order that they are added to the Lifecycle instance) and
* then things added at Stage.NORMAL, then Stage.SERVER, and finally, Stage.ANNOUNCEMENTS will be started.
* Things added at {@link Stage#INIT} will be started first (in the order that they are added to the Lifecycle instance)
* and then things added at {@link Stage#NORMAL}, then {@link Stage#SERVER}, and finally, {@link Stage#ANNOUNCEMENTS}
* will be started.
*
* The close operation goes in reverse order, starting with the last thing added at Stage.ANNOUNCEMENTS and working
* backwards.
* The close operation goes in reverse order, starting with the last thing added at {@link Stage#ANNOUNCEMENTS} and
* working backwards.
*
* Conceptually, the stages have the following purposes:
* - Stage.INIT: Currently, this stage is used exclusively for log4j initialization, since almost everything needs
* logging and it should be the last thing to shutdown. Any sort of bootstrapping object that provides something that
* should be initialized before nearly all other Lifecycle objects could also belong here (if it doesn't need
* logging during start or stop).
* - Stage.NORMAL: This is the default stage. Most objects will probably make the most sense to be registered at
* this level, with the exception of any form of server or service announcements
* - Stage.SERVER: This lifecycle stage is intended for all 'server' objects, and currently only contains the Jetty
* module, but any sort of 'server' that expects most Lifecycle objects to be initialized by the time it starts, and
* still available at the time it stops can logically live in this stage.
* - Stage.ANNOUNCEMENTS: Any object which announces to a cluster this servers location belongs in this stage. By being
* last, we can be sure that all servers are initialized before we advertise the endpoint locations, and also can be
* sure that we un-announce these advertisements prior to the Stage.SERVER objects stop.
* - {@link Stage#INIT}: Currently, this stage is used exclusively for log4j initialization, since almost everything
* needs logging and it should be the last thing to shutdown. Any sort of bootstrapping object that provides
* something that should be initialized before nearly all other Lifecycle objects could also belong here (if it
* doesn't need logging during start or stop).
* - {@link Stage#NORMAL}: This is the default stage. Most objects will probably make the most sense to be registered
* at this level, with the exception of any form of server or service announcements
* - {@link Stage#SERVER}: This lifecycle stage is intended for all 'server' objects, for example,
* org.apache.druid.server.initialization.jetty.JettyServerModule, but any sort of 'server' that expects most (or
* some specific) Lifecycle objects to be initialized by the time it starts, and still available at the time it stops
* can logically live in this stage.
* - {@link Stage#ANNOUNCEMENTS}: Any object which announces to a cluster this servers location belongs in this stage.
* By being last, we can be sure that all servers are initialized before we advertise the endpoint locations, and
* also can be sure that we un-announce these advertisements prior to the Stage.SERVER objects stop.
*
* There are two sets of methods to add things to the Lifecycle. One set that will just add instances and enforce that
* start() has not been called yet. The other set will add instances and, if the lifecycle is already started, start
Expand Down Expand Up @@ -357,25 +359,27 @@ public void stop()
}
startStopLock.lock();
try {
RuntimeException thrown = null;
Exception thrown = null;

for (Stage s : handlers.navigableKeySet().descendingSet()) {
log.info("Stopping lifecycle [%s] stage [%s]", name, s.name());
for (Handler handler : Lists.reverse(handlers.get(s))) {
try {
handler.stop();
}
catch (RuntimeException e) {
catch (Exception e) {
log.warn(e, "Lifecycle [%s] encountered exception while stopping %s", name, handler);
if (thrown == null) {
thrown = e;
} else {
thrown.addSuppressed(e);
}
}
}
}

if (thrown != null) {
throw thrown;
throw new RuntimeException(thrown);
}
}
finally {
Expand Down
Expand Up @@ -28,6 +28,7 @@ public class Logger
{
private final org.slf4j.Logger log;
private final boolean stackTraces;
private final Logger noStackTraceLogger;

public Logger(String name)
{
Expand All @@ -43,6 +44,7 @@ protected Logger(org.slf4j.Logger log, boolean stackTraces)
{
this.log = log;
this.stackTraces = stackTraces;
noStackTraceLogger = stackTraces ? new Logger(log, false) : this;
}

protected org.slf4j.Logger getSlf4jLogger()
Expand All @@ -57,12 +59,12 @@ public String toString()
}

/**
* Create a copy of this Logger that does not log exception stack traces, unless the log level is DEBUG or lower.
* Returns a copy of this Logger that does not log exception stack traces, unless the log level is DEBUG or lower.
* Useful for writing code like: {@code log.noStackTrace().warn(e, "Something happened.");}
*/
public Logger noStackTrace()
{
return new Logger(log, false);
return noStackTraceLogger;
}

public void trace(String message, Object... formatArgs)
Expand Down
Expand Up @@ -24,8 +24,6 @@

import java.nio.charset.StandardCharsets;

/**
*/
public class StatusResponseHandler implements HttpResponseHandler<StatusResponseHolder, StatusResponseHolder>
{

Expand Down
22 changes: 21 additions & 1 deletion docs/operations/api-reference.md
Expand Up @@ -45,9 +45,29 @@ An endpoint that always returns a boolean "true" value with a 200 OK response, u

Returns the current configuration properties of the process.

* `/status/selfDiscoveredStatus`

Returns a JSON map of the form `{"selfDiscovered": true/false}`, indicating whether the node has received a confirmation
from the central node discovery mechanism (currently ZooKeeper) of the Druid cluster that the node has been added to the
cluster. It is recommended to not consider a Druid node "healthy" or "ready" in automated deployment/container
management systems until it returns `{"selfDiscovered": true}` from this endpoint. This is because a node may be
isolated from the rest of the cluster due to network issues and it doesn't make sense to consider nodes "healthy" in
this case. Also, when nodes such as Brokers use ZooKeeper segment discovery for building their view of the Druid cluster
(as opposed to HTTP segment discovery), they may be unusable until the ZooKeeper client is fully initialized and starts
to receive data from the ZooKeeper cluster. `{"selfDiscovered": true}` is a proxy event indicating that the ZooKeeper
client on the node has started to receive data from the ZooKeeper cluster and it's expected that all segments and other
nodes will be discovered by this node timely from this point.

* `/status/selfDiscovered`

Similar to `/status/selfDiscoveredStatus`, but returns 200 OK response with empty body if the node has discovered itself
and 503 SERVICE UNAVAILABLE if the node hasn't discovered itself yet. This endpoint might be useful because some
monitoring checks such as AWS load balancer health checks are not able to look at the response body.

## Master Server

This section documents the API endpoints for the processes that reside on Master servers (Coordinators and Overlords) in the suggested [three-server configuration](../design/processes.html#server-types).
This section documents the API endpoints for the processes that reside on Master servers (Coordinators and Overlords)
in the suggested [three-server configuration](../design/processes.html#server-types).

### Coordinator

Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
Expand Down Expand Up @@ -59,17 +59,17 @@ public class CommonCacheNotifier
private static final EmittingLogger LOG = new EmittingLogger(CommonCacheNotifier.class);

/**
* {@link NodeType#COORDINATOR} is intentionally omitted because it gets its information about the auth state directly
* {@link NodeRole#COORDINATOR} is intentionally omitted because it gets its information about the auth state directly
* from metadata storage.
*/
private static final List<NodeType> NODE_TYPES = Arrays.asList(
NodeType.BROKER,
NodeType.OVERLORD,
NodeType.HISTORICAL,
NodeType.PEON,
NodeType.ROUTER,
NodeType.MIDDLE_MANAGER,
NodeType.INDEXER
private static final List<NodeRole> NODE_TYPES = Arrays.asList(
NodeRole.BROKER,
NodeRole.OVERLORD,
NodeRole.HISTORICAL,
NodeRole.PEON,
NodeRole.ROUTER,
NodeRole.MIDDLE_MANAGER,
NodeRole.INDEXER
);

private final DruidNodeDiscoveryProvider discoveryProvider;
Expand Down Expand Up @@ -161,8 +161,8 @@ public void addUpdate(String updatedItemName, byte[] updatedItemData)
private List<ListenableFuture<StatusResponseHolder>> sendUpdate(String updatedAuthenticatorPrefix, byte[] serializedEntity)
{
List<ListenableFuture<StatusResponseHolder>> futures = new ArrayList<>();
for (NodeType nodeType : NODE_TYPES) {
DruidNodeDiscovery nodeDiscovery = discoveryProvider.getForNodeType(nodeType);
for (NodeRole nodeRole : NODE_TYPES) {
DruidNodeDiscovery nodeDiscovery = discoveryProvider.getForNodeRole(nodeRole);
Collection<DiscoveryDruidNode> nodes = nodeDiscovery.getAllNodes();
for (DiscoveryDruidNode node : nodes) {
URL listenerURL = getListenerURL(
Expand Down
Expand Up @@ -38,7 +38,7 @@
import org.apache.druid.data.input.InputRow;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
Expand Down Expand Up @@ -744,7 +744,7 @@ private DiscoveryDruidNode createDiscoveryDruidNode(TaskToolbox toolbox)
new LookupNodeService(getContextValue(CTX_KEY_LOOKUP_TIER));
return new DiscoveryDruidNode(
toolbox.getDruidNode(),
NodeType.PEON,
NodeRole.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
Expand Down Expand Up @@ -359,7 +359,7 @@ public String getVersion(final Interval interval)
new LookupNodeService((String) getContextValue(CTX_KEY_LOOKUP_TIER));
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
NodeType.PEON,
NodeRole.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService
Expand Down
Expand Up @@ -431,10 +431,8 @@ private void taskComplete(
private void startWorkersHandling() throws InterruptedException
{
final CountDownLatch workerViewInitialized = new CountDownLatch(1);

DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForService(
WorkerNodeService.DISCOVERY_SERVICE_KEY
);
DruidNodeDiscovery druidNodeDiscovery =
druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY);
druidNodeDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{
Expand Down
Expand Up @@ -46,7 +46,7 @@
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
Expand Down Expand Up @@ -434,7 +434,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception

final DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
NodeType.PEON,
NodeRole.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService
Expand Down

0 comments on commit 1c62987

Please sign in to comment.