Skip to content

Commit

Permalink
Exhibitor Support (#3664)
Browse files Browse the repository at this point in the history
* allow JsonConfigTesterBase to treat the fields of collections

* [Feature] Exhibitor Support (#3664)

This patch provides the integration of Druid & Netflix Exhibitor. Druid
currently use Apache Curator as ZooKeeper client. Curator can be
integrated with Exhibitor to achieve a live/updating list of the
ZooKeeper ensemble. This patch enables Druid to use this features.
  • Loading branch information
yuusaku-t authored and drcrallen committed Jan 2, 2017
1 parent 49d71e9 commit 02519d5
Show file tree
Hide file tree
Showing 7 changed files with 516 additions and 18 deletions.
17 changes: 17 additions & 0 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,23 @@ The following path is used for service discovery. It is **not** affected by `dru
|--------|-----------|-------|
|`druid.discovery.curator.path`|Services announce themselves under this ZooKeeper path.|`/druid/discovery`|

### Exhibitor

[Exhibitor](https://github.com/Netflix/exhibitor/wiki) is a supervisor system for ZooKeeper.
Exhibitor can dynamically scale-up/down the cluster of ZooKeeper servers.
Druid can update self-owned list of ZooKeeper servers through Exhibitor without restarting.
That is, it allows Druid to keep the connections of Exhibitor-supervised ZooKeeper servers.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.exhibitor.service.hosts`|A JSON array which contains the hostnames of Exhibitor instances. Please specify this property if you want to use Exhibitor-supervised cluster.|none|
|`druid.exhibitor.service.port`|The REST port used to connect to Exhibitor.|`8080`|
|`druid.exhibitor.service.restUriPath`|The path of the REST call used to get the server set.|`/exhibitor/v1/cluster/list`|
|`druid.exhibitor.service.useSsl`|Boolean flag for whether or not to use https protocol.|`false`|
|`druid.exhibitor.service.pollingMs`|How ofter to poll the exhibitors for the list|`10000`|

Note that `druid.zk.service.host` is used as a backup in case an Exhibitor instance can't be contacted and therefore should still be set.

### Startup Logging

All nodes can log debugging information on startup.
Expand Down
88 changes: 72 additions & 16 deletions server/src/main/java/io/druid/curator/CuratorModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,48 +23,62 @@
import com.google.inject.Module;
import com.google.inject.Provides;

import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.java.util.common.lifecycle.Lifecycle;
import io.druid.java.util.common.logger.Logger;

import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
import org.apache.curator.ensemble.exhibitor.Exhibitors;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;

import java.io.IOException;

import java.util.List;

import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.java.util.common.lifecycle.Lifecycle;
import io.druid.java.util.common.logger.Logger;

/**
*/
public class CuratorModule implements Module
{
static final String CURATOR_CONFIG_PREFIX = "druid.zk.service";

static final String EXHIBITOR_CONFIG_PREFIX = "druid.exhibitor.service";

private static final int BASE_SLEEP_TIME_MS = 1000;

private static final int MAX_SLEEP_TIME_MS = 45000;

private static final int MAX_RETRIES = 30;

private static final Logger log = new Logger(CuratorModule.class);

@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(
binder, "druid.zk.service",
CuratorConfig.class
);
JsonConfigProvider.bind(binder, CURATOR_CONFIG_PREFIX, CuratorConfig.class);
JsonConfigProvider.bind(binder, EXHIBITOR_CONFIG_PREFIX, ExhibitorConfig.class);
}

@Provides
@LazySingleton
public CuratorFramework makeCurator(CuratorConfig config, Lifecycle lifecycle) throws IOException
public CuratorFramework makeCurator(
CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle
) throws IOException
{
final CuratorFramework framework =
CuratorFrameworkFactory.builder()
.connectString(config.getZkHosts())
.ensembleProvider(ensembleProvider)
.sessionTimeoutMs(config.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
.retryPolicy(new BoundedExponentialBackoffRetry(
BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES))
.compressionProvider(new PotentiallyGzippedCompressionProvider(config.getEnableCompression()))
.aclProvider(config.getEnableAcl() ? new SecuredACLProvider() : new DefaultACLProvider())
.build();
Expand All @@ -91,6 +105,48 @@ public void stop()
return framework;
}

@Provides
@LazySingleton
public EnsembleProvider makeEnsembleProvider(CuratorConfig config, ExhibitorConfig exConfig)
{
if (exConfig.getHosts().isEmpty()) {
return new FixedEnsembleProvider(config.getZkHosts());
}

return new ExhibitorEnsembleProvider(
new Exhibitors(
exConfig.getHosts(),
exConfig.getRestPort(),
newBackupProvider(config.getZkHosts())
),
new DefaultExhibitorRestClient(exConfig.getUseSsl()),
exConfig.getRestUriPath(),
exConfig.getPollingMs(),
new BoundedExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_RETRIES)
)
{
@Override
public void start() throws Exception
{
log.info("Poll the list of zookeeper servers for initial ensemble");
this.pollForInitialEnsemble();
super.start();
}
};
}

private Exhibitors.BackupConnectionStringProvider newBackupProvider(final String zkHosts)
{
return new Exhibitors.BackupConnectionStringProvider()
{
@Override
public String getBackupConnectionString() throws Exception
{
return zkHosts;
}
};
}

class SecuredACLProvider implements ACLProvider
{
@Override
Expand Down
77 changes: 77 additions & 0 deletions server/src/main/java/io/druid/curator/ExhibitorConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.curator;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.ArrayList;
import java.util.List;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

/**
*/
public class ExhibitorConfig
{
@JsonProperty
private List<String> hosts = new ArrayList<>();

@JsonProperty("port")
@Min(0)
@Max(0xffff)
private int restPort = 8080;

@JsonProperty
private String restUriPath = "/exhibitor/v1/cluster/list";

@JsonProperty
private boolean useSsl = false;

@JsonProperty
@Min(0)
private int pollingMs = 10000;

public List<String> getHosts()
{
return hosts;
}

public int getRestPort()
{
return restPort;
}

public String getRestUriPath()
{
return restUriPath;
}

public boolean getUseSsl()
{
return useSsl;
}

public int getPollingMs()
{
return pollingMs;
}

}
139 changes: 139 additions & 0 deletions server/src/test/java/io/druid/curator/CuratorModuleTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.curator;

import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.util.Modules;

import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;
import java.util.Properties;

import io.druid.guice.GuiceInjectors;
import io.druid.guice.LifecycleModule;

/**
*/
public final class CuratorModuleTest
{

private static final String curatorHostKey = CuratorModule.CURATOR_CONFIG_PREFIX + ".host";

private static final String exhibitorHostsKey = CuratorModule.EXHIBITOR_CONFIG_PREFIX + ".hosts";

@Test
public void defaultEnsembleProvider() throws NoSuchFieldException, IllegalAccessException
{
Injector injector = newInjector(new Properties());
injector.getInstance(CuratorFramework.class); // initialize related components
EnsembleProvider ensembleProvider = injector.getInstance(EnsembleProvider.class);
Assert.assertTrue(
"EnsembleProvider should be FixedEnsembleProvider",
ensembleProvider instanceof FixedEnsembleProvider
);
Assert.assertEquals(
"The connectionString should be 'localhost'",
"localhost", ensembleProvider.getConnectionString()
);
}

@Test
public void fixedZkHosts()
{
Properties props = new Properties();
props.put(curatorHostKey, "hostA");
Injector injector = newInjector(props);

injector.getInstance(CuratorFramework.class); // initialize related components
EnsembleProvider ensembleProvider = injector.getInstance(EnsembleProvider.class);
Assert.assertTrue(
"EnsembleProvider should be FixedEnsembleProvider",
ensembleProvider instanceof FixedEnsembleProvider
);
Assert.assertEquals(
"The connectionString should be 'hostA'",
"hostA", ensembleProvider.getConnectionString()
);
}

@Test
public void exhibitorEnsembleProvider()
{
Properties props = new Properties();
props.put(curatorHostKey, "hostA");
props.put(exhibitorHostsKey, "[\"hostB\"]");
Injector injector = newInjector(props);

injector.getInstance(CuratorFramework.class); // initialize related components
EnsembleProvider ensembleProvider = injector.getInstance(EnsembleProvider.class);
Assert.assertTrue(
"EnsembleProvider should be ExhibitorEnsembleProvider",
ensembleProvider instanceof ExhibitorEnsembleProvider
);
}

@Test
public void emptyExhibitorHosts()
{
Properties props = new Properties();
props.put(curatorHostKey, "hostB");
props.put(exhibitorHostsKey, "[]");
Injector injector = newInjector(props);

injector.getInstance(CuratorFramework.class); // initialize related components
EnsembleProvider ensembleProvider = injector.getInstance(EnsembleProvider.class);
Assert.assertTrue(
"EnsembleProvider should be FixedEnsembleProvider",
ensembleProvider instanceof FixedEnsembleProvider
);
Assert.assertEquals(
"The connectionString should be 'hostB'",
"hostB", ensembleProvider.getConnectionString()
);
}

private Injector newInjector(final Properties props)
{
List<Module> modules = ImmutableList.<Module>builder()
.addAll(GuiceInjectors.makeDefaultStartupModules())
.add(new LifecycleModule()).add(new CuratorModule()).build();
return Guice.createInjector(
Modules.override(modules).with(new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(Properties.class).toInstance(props);
}
})
);
}

}
Loading

0 comments on commit 02519d5

Please sign in to comment.