Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EtcdConfigurationSource #318

Merged
merged 3 commits into from Jun 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,159 @@
package com.netflix.config.source;

import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.netflix.config.WatchedConfigurationSource;
import com.netflix.config.WatchedUpdateListener;
import com.netflix.config.WatchedUpdateResult;
import org.boon.core.Handler;
import org.boon.etcd.Etcd;
import org.boon.etcd.Node;
import org.boon.etcd.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

import static com.google.common.collect.Maps.newHashMap;
import static com.netflix.config.WatchedUpdateResult.createIncremental;


/**
* Implementation of the dynamic {@link WatchedConfigurationSource} for Etcd
*
* This implementation requires the path to the Etcd directory that contains
* nodes that represent each managed configuration property.
*
* An example Etcd configuration path is /<my-app>/config
* An example Etcd property node path is /<my-app>/config/com.fluxcapacitor.my.property
*
* When a property is mutated via Etcd a callback will be notified and the value managed
* by EtcdConfigurationSource will be updated. Similar to other dynamic configuration
* source (ie. DynamoDB, etc.)
*
* @author spoon16
*/
public class EtcdConfigurationSource implements WatchedConfigurationSource {
private static final Logger logger = LoggerFactory.getLogger(EtcdConfigurationSource.class);
private static final Splitter keySplitter = Splitter.on('/');

private final Map<String, Object> valueCache = Maps.newConcurrentMap();
private final List<WatchedUpdateListener> listeners = new CopyOnWriteArrayList<WatchedUpdateListener>();

private final Etcd etcd;
private final String configPath;

private Handler<Response> updateHandler = new UpdateHandler();

/**
* Initialize EtcdConfigurationSource with property values @ configPath
*
* @param Etcd etcd
*/
public EtcdConfigurationSource(Etcd etcd, String configPath) {
this.etcd = etcd;
this.configPath = Objects.firstNonNull(configPath, "").replaceAll("^/+","");
init();
}

private void init() {
final Response listResponse = etcd.list(configPath);
cacheValues(listResponse.node());
etcd.waitRecursive(updateHandler, configPath);
}

private void cacheValues(Node configNode) {
for (Node valueNode : configNode.getNodes()) {
final String etcdKey = valueNode.key();
final String sourceKey = Iterables.getLast(keySplitter.split(etcdKey));
final String value = valueNode.getValue();
valueCache.put(sourceKey, value);
}
}

@Override
public Map<String, Object> getCurrentData() throws Exception {
return valueCache;
}

@Override
public void addUpdateListener(WatchedUpdateListener l) {
if (l != null) {
listeners.add(l);
}
}

@Override
public void removeUpdateListener(WatchedUpdateListener l) {
if (l != null) {
listeners.remove(l);
}
}

private void updateConfiguration(WatchedUpdateResult result) {
for (WatchedUpdateListener l : listeners) {
try {
l.updateConfiguration(result);
} catch (Throwable ex) {
logger.error("Error in invoking WatchedUpdateListener", ex);
}
}
}

private class UpdateHandler implements Handler<Response> {
@Override
public void handle(Response updateResponse) {
if (updateResponse.wasError()) {
logger.error("Etcd failed with an error response: %s", updateResponse);
etcd.waitRecursive(updateHandler, configPath);
return;
}

logger.debug("Etcd updateResponse: ", updateResponse);
final Node node = updateResponse.node();

if (node != null ) {
final String etcdKey = node.key();
final String sourceKey = Iterables.getLast(keySplitter.split(etcdKey));
final String value = node.getValue();
final String action = getUpdateAction(node, updateResponse.action());

switch (action) {
case "create":
valueCache.put(sourceKey, value);
updateConfiguration(createIncremental(null, ImmutableMap.<String, Object>of(sourceKey, value), null));
break;

case "set":
valueCache.put(sourceKey, value);
updateConfiguration(createIncremental(ImmutableMap.<String, Object>of(sourceKey, value), null, null));
break;

case "delete":
valueCache.remove(sourceKey);
updateConfiguration(createIncremental(null, null, ImmutableMap.<String, Object>of(sourceKey, "")));
break;

default:
logger.warn("unrecognized action, response: %s", updateResponse);
break;
}
}

etcd.waitRecursive(updateHandler, configPath);
}

private String getUpdateAction(Node updateNode, String responseAction) {
final String value = updateNode.getValue();
if (value == null) {
return "delete";
}
return responseAction.toLowerCase();
}
}
}
@@ -0,0 +1,168 @@
package com.netflix.config.source;

import com.google.common.collect.Lists;
import com.netflix.config.*;
import org.boon.core.Handler;
import org.boon.etcd.ClientBuilder;
import org.boon.etcd.Etcd;
import org.boon.etcd.Node;
import org.boon.etcd.Response;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;

/**
* Tests the implementation of {@link EtcdConfigurationSource}.
*
* @author spoon16
*/
public class EtcdConfigurationSourceTest {
private static final Logger logger = LoggerFactory.getLogger(EtcdConfigurationSourceTest.class);

private static final Etcd ETCD = mock(Etcd.class);

// uncomment to use local/vagrant CoreOS VM running Etcd
// private static final Etcd ETCD = ClientBuilder.builder().hosts(URI.create("http://172.17.8.101:4001")).createClient();

private static final String CONFIG_PATH = "config";
private static final Response ETCD_LIST_RESPONSE = new Response("get", 200,
new Node("/config", null, 1378, 1378, 0, true, Lists.newArrayList(
new Node("/config/test.key1", "test.value1-etcd", 19311, 19311, 0, false, null),
new Node("/config/test.key4", "test.value4-etcd", 1388, 1388, 0, false, null),
new Node("/config/test.key6", "test.value6-etcd", 1232, 1232, 0, false, null),
new Node("/config/test.key7", "test.value7-etcd", 1234, 1234, 0, false, null)
)));
private static Handler<Response> ETCD_UPDATE_HANDLER;
private static final Answer WITH_ETCD_UPDATE_HANDLER = new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
ETCD_UPDATE_HANDLER = (Handler<Response>) invocation.getArguments()[0];
return null;
}
};
private static EtcdConfigurationSource ETCD_CONFIGURATION_SOURCE;
private static DynamicWatchedConfiguration ETCD_CONFIGURATION;
private static final ConcurrentMapConfiguration MAP_CONFIGURATION = new ConcurrentMapConfiguration();
private static final ConcurrentMapConfiguration SYSTEM_CONFIGURATION = new ConcurrentMapConfiguration();

@BeforeClass
public static void before() throws Exception {
final ConcurrentCompositeConfiguration compositeConfig = new ConcurrentCompositeConfiguration();

doReturn(ETCD_LIST_RESPONSE).when(ETCD).list(anyString());
doAnswer(WITH_ETCD_UPDATE_HANDLER).when(ETCD).waitRecursive(any(Handler.class), anyString());
ETCD_CONFIGURATION_SOURCE = new EtcdConfigurationSource(ETCD, CONFIG_PATH);
ETCD_CONFIGURATION = new DynamicWatchedConfiguration(ETCD_CONFIGURATION_SOURCE);

compositeConfig.addConfiguration(ETCD_CONFIGURATION, "etcd dynamic override configuration");

MAP_CONFIGURATION.addProperty("test.key1", "test.value1-map");
MAP_CONFIGURATION.addProperty("test.key2", "test.value2-map");
MAP_CONFIGURATION.addProperty("test.key3", "test.value3-map");
MAP_CONFIGURATION.addProperty("test.key4", "test.value4-map");
MAP_CONFIGURATION.addProperty("test.key7", "test.value7-map");
compositeConfig.addConfiguration(MAP_CONFIGURATION, "map configuration");

System.setProperty("test.key4", "test.value4-system");
System.setProperty("test.key5", "test.value5-system");
SYSTEM_CONFIGURATION.loadProperties(System.getProperties());
compositeConfig.addConfiguration(SYSTEM_CONFIGURATION, "system configuration");

ConfigurationManager.install(compositeConfig);
}

/**
* should return value from EtcdConfigurationSource when EtcdConfigurationSource provides key
*/
@Test
public void testEtcdPropertyOverride() throws Exception {
// there is a etcd value for this key
assertEquals("test.value1-etcd", DynamicPropertyFactory.getInstance().getStringProperty("test.key1", "default").get());
}

/**
* should return map configuration source value when EtcdConfigurationSource does not provide key
*/
@Test
public void testNoEtcdPropertyOverride() throws Exception {
// there is not etcd value for this key but there is a configuration source that provides this key
assertEquals("test.value2-map", DynamicPropertyFactory.getInstance().getStringProperty("test.key2", "default").get());
}

/**
* should return default value when no configuration source provides key
*/
@Test
public void testDefault() throws Exception {
// no configuration source for key
assertEquals("default", DynamicPropertyFactory.getInstance().getStringProperty("test.key99", "default").get());
}

/**
* should select lower priority configuration sources selected when EtcdConfigurationSource does not provide key
*/
@Test
public void testSystemPropertyOverride() throws Exception {
// system configuration provides key, etcd configuration provides key, source = etcd configuration
assertEquals("test.value4-etcd", DynamicPropertyFactory.getInstance().getStringProperty("test.key4", "default").get());

// system configuration provides key, etcd configuration does not provide key, source = system configuration
assertEquals("test.value5-system", DynamicPropertyFactory.getInstance().getStringProperty("test.key5", "default").get());
}

/**
* should not override EtcdConfigurationSource when lower priority configuration source is updated
*/
@Test
public void testUpdateOverriddenProperty() throws Exception {
final String updateProperty = "test.key1";

// update the map config's property and assert that the value is still the overridden value
MAP_CONFIGURATION.setProperty(updateProperty, "prop1");
assertEquals("test.value1-etcd", DynamicPropertyFactory.getInstance().getStringProperty(updateProperty, "default").get());
}

/**
* should update EtcdConfigurationSource when Etcd client handles writes
*/
@Test
public void testUpdateEtcdProperty() throws Exception {
final String updateProperty = "test.key6";
final String updateKey = CONFIG_PATH + "/" + updateProperty;
final String updateValue = "test.value6-etcd-override";
final String initialValue = "test.value6-etcd";

assertEquals(initialValue, DynamicPropertyFactory.getInstance().getStringProperty(updateProperty, "default").get());

ETCD_UPDATE_HANDLER.handle(new Response("set", 200, new Node(updateKey, updateValue, 19444, 19444, 0, false, null)));
assertEquals(updateValue, DynamicPropertyFactory.getInstance().getStringProperty(updateProperty, "default").get());
}

/**
* should delete from EtcdConfigurationSource when Etcd client handles a delete event
*/
@Test
public void testDeleteEtcdProperty() throws Exception {
final String deleteProperty = "test.key7";
final String deleteKey = CONFIG_PATH + "/" + deleteProperty;
final String initialValue = "test.value7-etcd";

assertEquals(initialValue, DynamicPropertyFactory.getInstance().getStringProperty(deleteProperty, "default").get());

ETCD_UPDATE_HANDLER.handle(new Response("delete", 200, new Node(deleteKey, null, 12345, 12345, 0, false, null)));
assertEquals("test.value7-map", DynamicPropertyFactory.getInstance().getStringProperty(deleteProperty, "default").get());
}
}
13 changes: 13 additions & 0 deletions build.gradle
Expand Up @@ -95,6 +95,19 @@ project(':archaius-zookeeper') {
}
}

project(':archaius-etcd') {
sourceCompatibility = 1.7
targetCompatibility = 1.7

dependencies {
compile project(':archaius-core')
compile 'io.fastjson:etcd-client:0.33'
testCompile 'junit:junit:4.11'
testCompile 'org.mockito:mockito-all:1.9.5'
testCompile 'org.slf4j:slf4j-simple:1.6.4'
}
}

project(':archaius-scala') {
apply plugin: 'scala'

Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Expand Up @@ -5,5 +5,6 @@ include 'archaius-aws'
//include 'archaius-jclouds'
include 'archaius-scala'
include 'archaius-zookeeper'
include 'archaius-etcd'
include 'archaius-typesafe'