From 87775596150479875d601a36775da5e538e26876 Mon Sep 17 00:00:00 2001 From: JohnZZGithub Date: Fri, 13 Jul 2018 16:26:49 -0700 Subject: [PATCH] FLUME-3256: Adding Register Service Component and Zookeeper registration for service discovery 1. Add a new component named RegisterService for agent discovery 2. Implement a Zookeeper based RegisterService to use zookeeper for service discovery 3. Unit tests 4. User guide change for Zookeeper RegisterService --- .../conf/BasicConfigurationConstants.java | 3 + .../flume/conf/ComponentConfiguration.java | 3 +- .../conf/ComponentConfigurationFactory.java | 3 + .../apache/flume/conf/FlumeConfiguration.java | 98 +++++++- .../service/RegisterServiceConfiguration.java | 38 ++++ .../register/service/RegisterServiceType.java | 42 ++++ .../flume/conf/TestFlumeConfiguration.java | 141 ++++++++++++ .../TestFlumeConfigurationConfigFilter.java | 4 + flume-ng-core/pom.xml | 21 ++ .../InterceptorBuilderFactory.java | 1 - .../service/AbstractRegisterService.java | 55 +++++ .../register/service/RegisterService.java | 35 +++ .../service/RegisterServiceFactory.java | 56 +++++ .../service/ZooKeeperRegisterService.java | 214 ++++++++++++++++++ .../service/TestZooKeeperRegisterService.java | 128 +++++++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 51 +++++ .../TestEmbeddedAgentEmbeddedSource.java | 11 + .../node/AbstractConfigurationProvider.java | 27 ++- .../org/apache/flume/node/Application.java | 29 ++- .../flume/node/MaterializedConfiguration.java | 4 + .../node/SimpleMaterializedConfiguration.java | 15 +- 21 files changed, 970 insertions(+), 9 deletions(-) create mode 100644 flume-ng-configuration/src/main/java/org/apache/flume/conf/register/service/RegisterServiceConfiguration.java create mode 100644 flume-ng-configuration/src/main/java/org/apache/flume/conf/register/service/RegisterServiceType.java create mode 100644 flume-ng-core/src/main/java/org/apache/flume/register/service/AbstractRegisterService.java create mode 100644 flume-ng-core/src/main/java/org/apache/flume/register/service/RegisterService.java create mode 100644 flume-ng-core/src/main/java/org/apache/flume/register/service/RegisterServiceFactory.java create mode 100644 flume-ng-core/src/main/java/org/apache/flume/register/service/ZooKeeperRegisterService.java create mode 100644 flume-ng-core/src/test/java/org/apache/flume/register/service/TestZooKeeperRegisterService.java diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java index 3f23e622bc..5c7b34baa4 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/BasicConfigurationConstants.java @@ -36,6 +36,9 @@ public final class BasicConfigurationConstants { public static final String CONFIG_CHANNELS = "channels"; public static final String CONFIG_CHANNELS_PREFIX = CONFIG_CHANNELS + "."; + public static final String CONFIG_REGISTER_SERVICES = "registerservices"; + public static final String CONFIG_REGISTER_SERVICES_PREFIX = CONFIG_REGISTER_SERVICES + "."; + public static final String CONFIG_CONFIG = "config"; public static final String CONFIG_TYPE = "type"; diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java index b7bab15373..231f20bfd8 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfiguration.java @@ -132,7 +132,8 @@ public enum ComponentType { SINK_PROCESSOR("SinkProcessor"), SINKGROUP("Sinkgroup"), CHANNEL("Channel"), - CHANNELSELECTOR("ChannelSelector"); + CHANNELSELECTOR("ChannelSelector"), + REGISTER_SERVICE("RegisterService"); private final String componentType; diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java index f1e3a8c14c..66ad9afc22 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/ComponentConfigurationFactory.java @@ -22,6 +22,7 @@ import org.apache.flume.conf.channel.ChannelConfiguration.ChannelConfigurationType; import org.apache.flume.conf.channel.ChannelSelectorConfiguration.ChannelSelectorConfigurationType; import org.apache.flume.conf.configfilter.ConfigFilterConfiguration.ConfigFilterConfigurationType; +import org.apache.flume.conf.register.service.RegisterServiceConfiguration; import org.apache.flume.conf.sink.SinkConfiguration.SinkConfigurationType; import org.apache.flume.conf.sink.SinkGroupConfiguration; import org.apache.flume.conf.sink.SinkProcessorConfiguration.SinkProcessorConfigurationType; @@ -65,6 +66,8 @@ public static ComponentConfiguration create(String name, String type, ComponentT .getConfiguration(name); case SINKGROUP: return new SinkGroupConfiguration(name); + case REGISTER_SERVICE: + return new RegisterServiceConfiguration(name); default: throw new ConfigurationException( "Cannot create configuration. Unknown Type specified: " + type); diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java index e6c7875d9a..7a1ca36297 100644 --- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java @@ -21,6 +21,8 @@ import org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning; import org.apache.flume.conf.channel.ChannelConfiguration; import org.apache.flume.conf.channel.ChannelType; +import org.apache.flume.conf.register.service.RegisterServiceConfiguration; +import org.apache.flume.conf.register.service.RegisterServiceType; import org.apache.flume.configfilter.ConfigFilter; import org.apache.flume.conf.configfilter.ConfigFilterConfiguration; import org.apache.flume.conf.configfilter.ConfigFilterType; @@ -54,6 +56,8 @@ import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIG; import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIGFILTERS; import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIGFILTERS_PREFIX; +import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_REGISTER_SERVICES; +import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_REGISTER_SERVICES_PREFIX; import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKGROUPS; import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKGROUPS_PREFIX; import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKS; @@ -214,8 +218,8 @@ private boolean addRawProperty(String rawName, String rawValue) { agentConfigMap.put(agentName, aconf); } - // Each configuration key must begin with one of the three prefixes: - // sources, sinks, or channels. + // Each configuration key must begin with one of the four prefixes: + // sources, sinks, channels or registerservices. return aconf.addProperty(configKey, value); } @@ -233,6 +237,7 @@ public static class AgentConfiguration { private String sinks; private String channels; private String sinkgroups; + private String registerServices; private final Map sourceConfigMap; private final Map sinkConfigMap; @@ -245,12 +250,14 @@ public static class AgentConfiguration { private Map sinkContextMap; private Map channelContextMap; private Map sinkGroupContextMap; + private Map registerServiceContextMap; private Set sinkSet; private Set configFilterSet; private Set sourceSet; private Set channelSet; private Set sinkgroupSet; + private Set registerServiceSet; private final List errorList; private List configFiltersInstances; @@ -260,6 +267,7 @@ private AgentConfiguration(String agentName, List errorList) { this.agentName = agentName; this.errorList = errorList; + this.registerServices = ""; configFilterConfigMap = new HashMap<>(); sourceConfigMap = new HashMap<>(); sinkConfigMap = new HashMap<>(); @@ -272,6 +280,7 @@ private AgentConfiguration(String agentName, sinkGroupContextMap = new HashMap<>(); configFiltersInstances = new ArrayList<>(); configFilterPatternCache = new HashMap<>(); + registerServiceContextMap = new HashMap<>(); } public Map getChannelConfigMap() { @@ -310,6 +319,10 @@ public Map getChannelContext() { return channelContextMap; } + public Map getRegisterServiceContextMap() { + return registerServiceContextMap; + } + public Set getSinkSet() { return sinkSet; } @@ -330,6 +343,9 @@ public Set getSinkgroupSet() { return sinkgroupSet; } + public Set getRegisterServiceSet() { + return registerServiceSet; + } /** *

@@ -383,6 +399,13 @@ private boolean isValid() { sinkSet = validateSinks(channelSet); sinkgroupSet = validateGroups(sinkSet); + if (registerServices.isEmpty()) { + registerServiceSet = new HashSet<>(); + } else { + registerServiceSet = new HashSet<>(Arrays.asList(registerServices.split("\\s+"))); + } + registerServiceSet = validateRegisterServices(registerServiceSet); + // If no sources or sinks are present, then this is invalid if (sourceSet.isEmpty() && sinkSet.isEmpty()) { LOGGER.warn( @@ -525,6 +548,9 @@ private T getKnownComponent(String type, T[] return null; } + private RegisterServiceType getKnownConfigRegisterService(String type) { + return getKnownComponent(type, RegisterServiceType.values()); + } /** * If it is a known component it will do the full validation required for @@ -1005,6 +1031,55 @@ private Set validGroupSinks(Set sinkSet, return groupSinks; } + private Set validateRegisterServices(Set registerServiceSet) { + Iterator iter = registerServiceSet.iterator(); + Map newContextMap = new HashMap<>(); + while (iter.hasNext()) { + String registerServiceName = iter.next(); + LOGGER.error("Register service name is :" + registerServiceName); + Context registerServiceContext = registerServiceContextMap.get(registerServiceName); + if (registerServiceContext == null) { + iter.remove(); + LOGGER.error(registerServiceName + "'s context is not set"); + addError(registerServiceName, CONFIG_ERROR, ERROR); + continue; + } + RegisterServiceType regSrvType = getKnownConfigRegisterService( + registerServiceContext.getString(BasicConfigurationConstants.CONFIG_TYPE)); + if (regSrvType == null) { + iter.remove(); + LOGGER.error(registerServiceName + " has wrong config type:" + regSrvType); + addError(registerServiceName, CONFIG_ERROR, ERROR); + continue; + } + + RegisterServiceConfiguration conf = null; + String config = regSrvType.toString().toUpperCase(Locale.ENGLISH); + try { + conf = (RegisterServiceConfiguration) ComponentConfigurationFactory.create( + registerServiceName, config, ComponentType.REGISTER_SERVICE); + + } catch (ConfigurationException e) { + LOGGER.error("Could not configure register service {} due to: {}", + new Object[]{registerServiceName, e.getMessage(), e}); + } + if (conf == null) { + iter.remove(); + LOGGER.error(registerServiceName + " is not well configured"); + addError(registerServiceName, CONFIG_ERROR, ERROR); + continue; + } + conf.configure(registerServiceContext); + errorList.addAll(conf.getErrors()); + newContextMap.put(registerServiceName, registerServiceContext); + } + registerServiceContextMap = newContextMap; + Set tempRegisterServiceSet = new HashSet(); + tempRegisterServiceSet.addAll(registerServiceContextMap.keySet()); + registerServiceSet.retainAll(tempRegisterServiceSet); + return registerServiceSet; + } + private String getSpaceDelimitedList(Set entries) { if (entries.isEmpty()) { return null; @@ -1140,7 +1215,6 @@ private boolean addProperty(String key, String value) { if (CONFIG_SINKGROUPS.equals(key)) { if (sinkgroups == null) { sinkgroups = value; - return true; } else { LOGGER.warn("Duplicate sinkgroup list specfied for agent: {}", agentName); @@ -1149,11 +1223,24 @@ private boolean addProperty(String key, String value) { } } + // Check for register services + if (CONFIG_REGISTER_SERVICES.equals(key)) { + if (registerServices == null || registerServices.isEmpty()) { + registerServices = value; + return true; + } else { + LOGGER.warn("Duplicate register service list specfied for agent: {}", agentName); + addError(CONFIG_REGISTER_SERVICES, DUPLICATE_PROPERTY, ERROR); + return false; + } + } + if (addAsSourceConfig(key, value) || addAsChannelValue(key, value) || addAsSinkConfig(key, value) || addAsSinkGroupConfig(key, value) || addAsConfigFilterConfig(key, value) + || addAsRegisterServiceConfig(key, value) ) { return true; } @@ -1193,6 +1280,11 @@ private boolean addAsSourceConfig(String key, String value) { ); } + private boolean addAsRegisterServiceConfig(String key, String value) { + return addComponentConfig( + key, value, CONFIG_REGISTER_SERVICES_PREFIX, registerServiceContextMap); + } + private boolean addComponentConfig( String key, String value, String configPrefix, Map contextMap diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/register/service/RegisterServiceConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/register/service/RegisterServiceConfiguration.java new file mode 100644 index 0000000000..83f5eccd4e --- /dev/null +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/register/service/RegisterServiceConfiguration.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flume.conf.register.service; + +import org.apache.flume.Context; +import org.apache.flume.conf.ComponentConfiguration; +import org.apache.flume.conf.ConfigurationException; + +public class RegisterServiceConfiguration extends ComponentConfiguration { + public RegisterServiceConfiguration(String componentName) { + super(componentName); + } + + @Override + public void configure(Context context) throws ConfigurationException { + super.configure(context); + try { + RegisterServiceType.valueOf(getType().trim().toUpperCase()); + } catch (Exception ex) { + throw new ConfigurationException( + "RegisterServiceConfiguration validation failed due to type" + getType(), ex); + } + } +} diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/register/service/RegisterServiceType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/register/service/RegisterServiceType.java new file mode 100644 index 0000000000..2d1b53a4e2 --- /dev/null +++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/register/service/RegisterServiceType.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flume.conf.register.service; + +import org.apache.flume.conf.ComponentWithClassName; + +public enum RegisterServiceType implements ComponentWithClassName { + /** + * Null register service + */ + OTHER(null), + + /** + * Replicating channel selector. + */ + ZOOKEEPER("org.apache.flume.register.service.ZooKeeperRegisterService"); + + private RegisterServiceType(String registerServiceClassName) { + this.registerServiceClassName = registerServiceClassName; + } + + private final String registerServiceClassName; + + @Override + public String getClassName() { + return registerServiceClassName; + } +} diff --git a/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java index a881d00c43..2b20689fc2 100644 --- a/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java +++ b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfiguration.java @@ -69,6 +69,147 @@ public void testFLUME1743() { Assert.assertTrue(agentConfiguration.getConfigFilterSet().contains("f1")); } + @Test + public void testGeneralRegisterServiceConfig() { + Properties properties = new Properties(); + properties.put("agent1.channels", "ch0"); + properties.put("agent1.channels.ch0.type", "memory"); + + properties.put("agent1.sources", "src0"); + properties.put("agent1.sources.src0.type", "multiport_syslogtcp"); + properties.put("agent1.sources.src0.channels", "ch0"); + properties.put("agent1.sources.src0.host", "localhost"); + properties.put("agent1.sources.src0.ports", "10001 10002 10003"); + properties.put("agent1.sources.src0.portHeader", "port"); + + properties.put("agent1.sinks", "sink0"); + properties.put("agent1.sinks.sink0.type", "null"); + properties.put("agent1.sinks.sink0.channel", "ch0"); + + properties.put("agent1.registerservices", "zkreg0 zkreg1 "); + properties.put("agent1.registerservices.zkreg0.type", "zookeeper"); + properties.put("agent1.registerservices.zkreg0.zkhost", "host1"); + properties.put("agent1.registerservices.zkreg1.type", "zookeeper"); + properties.put("agent1.registerservices.zkreg1.zkhost", "host1"); + + properties.put("agent1.configfilters", "f1"); + properties.put("agent1.configfilters.f1.type", "env"); + + FlumeConfiguration conf = new FlumeConfiguration(properties); + AgentConfiguration agentConfiguration = conf.getConfigurationFor("agent1"); + Assert.assertEquals(String.valueOf(agentConfiguration.getSourceSet()), 1, + agentConfiguration.getSourceSet().size()); + Assert.assertEquals(String.valueOf(agentConfiguration.getChannelSet()), 1, + agentConfiguration.getChannelSet().size()); + Assert.assertEquals(String.valueOf(agentConfiguration.getSinkSet()), 1, + agentConfiguration.getSinkSet().size()); + Assert.assertTrue(agentConfiguration.getSourceSet().contains("src0")); + Assert.assertTrue(agentConfiguration.getChannelSet().contains("ch0")); + Assert.assertTrue(agentConfiguration.getSinkSet().contains("sink0")); + Assert.assertTrue(agentConfiguration.getConfigFilterSet().contains("f1")); + + // Register Service related + Assert.assertEquals(agentConfiguration.getRegisterServiceSet().size(), 2); + Assert.assertTrue(agentConfiguration.getRegisterServiceSet().contains("zkreg0")); + Assert.assertTrue(agentConfiguration.getRegisterServiceSet().contains("zkreg1")); + Assert.assertEquals(agentConfiguration.getRegisterServiceContextMap().size(), 2); + Assert.assertTrue(agentConfiguration.getRegisterServiceContextMap().containsKey("zkreg0")); + Assert.assertTrue(agentConfiguration.getRegisterServiceContextMap().containsKey("zkreg1")); + } + + @Test + public void testRegisterServiceWithUselessConfig() { + Properties properties = new Properties(); + properties.put("agent1.channels", "ch0"); + properties.put("agent1.channels.ch0.type", "memory"); + + properties.put("agent1.sources", "src0"); + properties.put("agent1.sources.src0.type", "multiport_syslogtcp"); + properties.put("agent1.sources.src0.channels", "ch0"); + properties.put("agent1.sources.src0.host", "localhost"); + properties.put("agent1.sources.src0.ports", "10001 10002 10003"); + properties.put("agent1.sources.src0.portHeader", "port"); + + properties.put("agent1.sinks", "sink0"); + properties.put("agent1.sinks.sink0.type", "null"); + properties.put("agent1.sinks.sink0.channel", "ch0"); + + properties.put("agent1.registerservices", "zkreg "); + properties.put("agent1.registerservices.zkreg.type", "zookeeper"); + properties.put("agent1.registerservices.zkreg.zkhost", "host1"); + properties.put("agent1.registerservices.zkreg1.type", "zookeeper"); + properties.put("agent1.registerservices.zkreg1.zkhost", "host1"); + + properties.put("agent1.configfilters", "f1"); + properties.put("agent1.configfilters.f1.type", "env"); + + FlumeConfiguration conf = new FlumeConfiguration(properties); + AgentConfiguration agentConfiguration = conf.getConfigurationFor("agent1"); + Assert.assertEquals(String.valueOf(agentConfiguration.getSourceSet()), 1, + agentConfiguration.getSourceSet().size()); + Assert.assertEquals(String.valueOf(agentConfiguration.getChannelSet()), 1, + agentConfiguration.getChannelSet().size()); + Assert.assertEquals(String.valueOf(agentConfiguration.getSinkSet()), 1, + agentConfiguration.getSinkSet().size()); + Assert.assertTrue(agentConfiguration.getSourceSet().contains("src0")); + Assert.assertTrue(agentConfiguration.getChannelSet().contains("ch0")); + Assert.assertTrue(agentConfiguration.getSinkSet().contains("sink0")); + Assert.assertTrue(agentConfiguration.getConfigFilterSet().contains("f1")); + + // Register Service related + Assert.assertEquals(agentConfiguration.getRegisterServiceSet().size(), 1); + Assert.assertTrue(agentConfiguration.getRegisterServiceSet().contains("zkreg")); + Assert.assertEquals(agentConfiguration.getRegisterServiceContextMap().size(), 1); + Assert.assertTrue(agentConfiguration.getRegisterServiceContextMap().containsKey("zkreg")); + } + + @Test + public void testRegisterServiceWithUnkownType() { + Properties properties = new Properties(); + properties.put("agent1.channels", "ch0"); + properties.put("agent1.channels.ch0.type", "memory"); + + properties.put("agent1.sources", "src0"); + properties.put("agent1.sources.src0.type", "multiport_syslogtcp"); + properties.put("agent1.sources.src0.channels", "ch0"); + properties.put("agent1.sources.src0.host", "localhost"); + properties.put("agent1.sources.src0.ports", "10001 10002 10003"); + properties.put("agent1.sources.src0.portHeader", "port"); + + properties.put("agent1.sinks", "sink0"); + properties.put("agent1.sinks.sink0.type", "null"); + properties.put("agent1.sinks.sink0.channel", "ch0"); + + properties.put("agent1.registerservices", "zkreg zkreg1"); + properties.put("agent1.registerservices.zkreg.type", "zookeeper"); + properties.put("agent1.registerservices.zkreg.zkhost", "host1"); + properties.put("agent1.registerservices.zkreg1.type", "ddd"); + properties.put("agent1.registerservices.zkreg1.zkhost", "host1"); + + properties.put("agent1.configfilters", "f1"); + properties.put("agent1.configfilters.f1.type", "env"); + + FlumeConfiguration conf = new FlumeConfiguration(properties); + AgentConfiguration agentConfiguration = conf.getConfigurationFor("agent1"); + Assert.assertEquals(String.valueOf(agentConfiguration.getSourceSet()), 1, + agentConfiguration.getSourceSet().size()); + Assert.assertEquals(String.valueOf(agentConfiguration.getChannelSet()), 1, + agentConfiguration.getChannelSet().size()); + Assert.assertEquals(String.valueOf(agentConfiguration.getSinkSet()), 1, + agentConfiguration.getSinkSet().size()); + Assert.assertTrue(agentConfiguration.getSourceSet().contains("src0")); + Assert.assertTrue(agentConfiguration.getChannelSet().contains("ch0")); + Assert.assertTrue(agentConfiguration.getSinkSet().contains("sink0")); + Assert.assertTrue(agentConfiguration.getConfigFilterSet().contains("f1")); + + // Register Service related + Assert.assertEquals(agentConfiguration.getRegisterServiceSet().size(), 1); + Assert.assertTrue(agentConfiguration.getRegisterServiceSet().contains("zkreg")); + Assert.assertEquals(agentConfiguration.getRegisterServiceContextMap().size(), 1); + Assert.assertTrue(agentConfiguration.getRegisterServiceContextMap().containsKey("zkreg")); + Assert.assertEquals(conf.getConfigurationErrors().size(), 1); + } + @Test public void testFlumeConfigAdsErrorOnNullName() { HashMap properties = new HashMap<>(); diff --git a/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfigurationConfigFilter.java b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfigurationConfigFilter.java index ada2d8fa79..b7333f1bbd 100644 --- a/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfigurationConfigFilter.java +++ b/flume-ng-configuration/src/test/java/org/apache/flume/conf/TestFlumeConfigurationConfigFilter.java @@ -19,12 +19,16 @@ import org.apache.flume.Context; import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Properties; import static org.junit.Assert.assertEquals; public class TestFlumeConfigurationConfigFilter { + private static final Logger LOGGER = + LoggerFactory.getLogger(TestFlumeConfigurationConfigFilter.class); @Test public void testFlumeConfigFilterWorks() { diff --git a/flume-ng-core/pom.xml b/flume-ng-core/pom.xml index b2a90db43b..0602e1db51 100644 --- a/flume-ng-core/pom.xml +++ b/flume-ng-core/pom.xml @@ -390,6 +390,27 @@ limitations under the License. mina-core + + org.apache.curator + curator-framework + + + + org.apache.curator + curator-recipes + + + + org.apache.curator + curator-test + + + + org.apache.zookeeper + zookeeper + compile + + diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorBuilderFactory.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorBuilderFactory.java index 7c64613d71..b5dbc00d6b 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorBuilderFactory.java +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorBuilderFactory.java @@ -49,5 +49,4 @@ public static Builder newInstance(String name) } return clazz.newInstance(); } - } diff --git a/flume-ng-core/src/main/java/org/apache/flume/register/service/AbstractRegisterService.java b/flume-ng-core/src/main/java/org/apache/flume/register/service/AbstractRegisterService.java new file mode 100644 index 0000000000..f20b57be7e --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/register/service/AbstractRegisterService.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.register.service; + +import org.apache.flume.Context; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; +import org.apache.flume.conf.Configurable; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public abstract class AbstractRegisterService implements RegisterService, Configurable { + private String name; + + public AbstractRegisterService() { } + + @Override + public synchronized void setName(String name) { + this.name = name; + } + + @Override + public synchronized void start() throws Exception { } + + @Override + public synchronized void stop() { } + + @Override + public synchronized String getName() { + return name; + } + + @Override + public synchronized void configure(Context context) { } + + public String toString() { + return this.getClass().getName() + "{name: " + name + "}"; + } +} diff --git a/flume-ng-core/src/main/java/org/apache/flume/register/service/RegisterService.java b/flume-ng-core/src/main/java/org/apache/flume/register/service/RegisterService.java new file mode 100644 index 0000000000..8d52d50165 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/register/service/RegisterService.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.register.service; + +import org.apache.flume.Context; +import org.apache.flume.NamedComponent; + +/** + * Interface to register service. Implementing service should register + * service and start and un-register on stop + */ +public interface RegisterService extends NamedComponent { + + public abstract void start() throws Exception; + + public abstract void stop(); + + public abstract void configure(Context context); +} diff --git a/flume-ng-core/src/main/java/org/apache/flume/register/service/RegisterServiceFactory.java b/flume-ng-core/src/main/java/org/apache/flume/register/service/RegisterServiceFactory.java new file mode 100644 index 0000000000..24dfc1048a --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/register/service/RegisterServiceFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.register.service; + +import java.util.Locale; + +import org.apache.flume.FlumeException; +import org.apache.flume.conf.register.service.RegisterServiceType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RegisterServiceFactory { + + private static final Logger logger = LoggerFactory + .getLogger(RegisterServiceFactory.class); + + /** + * Get register service via name. + */ + public static RegisterService newInstance(String type) + throws FlumeException { + RegisterServiceType registerServiceType = RegisterServiceType.OTHER; + try { + registerServiceType = RegisterServiceType.valueOf(type.toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException ex) { + String errMsg = "Unable to load channel type: " + type; + logger.error(errMsg); + throw new FlumeException(errMsg); + } + try { + return ((Class) + Class.forName(registerServiceType.getClassName())).newInstance(); + } catch (Exception ex) { + String errMsg = "Unable to load channel type: " + type + ", class: " + type + + ", ex:" + ex.toString(); + logger.error(errMsg); + throw new FlumeException(errMsg); + } + } +} diff --git a/flume-ng-core/src/main/java/org/apache/flume/register/service/ZooKeeperRegisterService.java b/flume-ng-core/src/main/java/org/apache/flume/register/service/ZooKeeperRegisterService.java new file mode 100644 index 0000000000..fec9bcdda6 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/register/service/ZooKeeperRegisterService.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flume.register.service; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.HashSet; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.flume.Context; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.mortbay.log.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + * Registering Flume Agent with ZooKeeper for service discovery + * All nodes registered should be ephemeral and and reload usually + * recreates zookeeper client only if address and properties are changed. + */ +public class ZooKeeperRegisterService extends AbstractRegisterService { + + private static final Logger LOGGER = LoggerFactory + .getLogger(ZooKeeperRegisterService.class); + + public static final String TIER_KEY = "tier"; + public static final String ZK_HOST_KEY = "zkhost"; + public static final String ZK_PORT_KEY = "zkport"; + public static final String ZK_PATH_PREFIX_KEY = "zkpathprefix"; + public static final String ZK_ENDPOINT_PORT_KEY = "zkendpointport"; + public static final String ZK_AUTH_KEY = "zkauth"; + public static final String ZK_MAX_RETRY = "zkmaxretry"; + public static final String ZK_BASE_SLEEP_MS = "zkbasesleepms"; + + private static final int DEFAULT_ZK_MAX_RETRY = 3; + private static final int DEFAULT_ZK_BASE_SLEEP_TIME_MS = 1000; + + private Context context; + private CuratorFramework client; + private String zkString; + private String zkPathPrefix; + private String zkEndpointPort; + private String zkAuth; + private Set tiersPaths; + private Set previousTiersPaths; + private boolean overwritePreviousEphemeralNode = true; + private boolean configured = false; + + @Override + public void start() throws Exception { + client.start(); + registerZNodes(); + } + + private void registerZNodes() throws Exception { + String tiersString = context.getString(TIER_KEY); + previousTiersPaths = (previousTiersPaths == null) ? + new HashSet() : tiersPaths; + + tiersPaths = new HashSet(); + for (String tier : tiersString.split("\\s+")) { + String zNodePath = zkPathPrefix + "/" + tier; + tiersPaths.add(zNodePath); + } + + // Before creating zkPath make sure one does not + // exist already + List newTiersPaths = new ArrayList(); + Iterator iter = tiersPaths.iterator(); + while (iter.hasNext()) { + String tierZNodePath = iter.next(); + if (!previousTiersPaths.contains(tierZNodePath)) { + newTiersPaths.add(tierZNodePath); + } + } + + Set toDeleteTiersPaths = new HashSet(); + for (String path : previousTiersPaths) { + if (!tiersPaths.contains(path)) { + toDeleteTiersPaths.add(path); + } + } + + String hostname = InetAddress.getLocalHost().getHostName(); + String ephemeralZNodeName = hostname + ":" + zkEndpointPort; + + List aclList = null; + if (zkAuth != null) { + byte[] d = DigestUtils.sha1(zkAuth); + String[] userPass = zkAuth.split(":"); + ACL acl = new ACL(ZooDefs.Perms.ALL, new Id("digest", userPass[0] + ":" + + Base64.encodeBase64(d).toString())); + aclList = Lists.newArrayList(acl); + aclList.add(new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.AUTH_IDS)); + aclList.add(new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE)); + } else { + aclList = ZooDefs.Ids.OPEN_ACL_UNSAFE; + } + + // Create ephemeral ZNodes for these paths. + for (String zNode : newTiersPaths) { + LOGGER.info("Creating peristent parent zNode: " + zNode); + try { + client.create().withMode(CreateMode.PERSISTENT).withACL(aclList).forPath(zNode); + } catch (KeeperException exists) { + Log.info("zNode exists: " + zNode); + } + } + + for (String zNode : tiersPaths) { + StringBuffer str = new StringBuffer(); + str.append("{\"serviceEndpoint\":{\"host\":\""); + str.append(hostname); + str.append("\",\"port\":"); + str.append(zkEndpointPort); + str.append("},\"additionalEndpoints\":{},\"status\":\"ALIVE\"}"); + + String nodePath = zNode + "/" + ephemeralZNodeName; + LOGGER.info("Creating ephemeral node under zNode: " + zNode + + " with name: " + nodePath + ", overwrite:" + overwritePreviousEphemeralNode); + createNode( + nodePath, str.toString().getBytes(), + CreateMode.EPHEMERAL, aclList, overwritePreviousEphemeralNode); + } + + // Delete ephemeral ZNodes for paths which have been removed + for (String zNode : toDeleteTiersPaths) { + String nodePath = zNode + "/" + ephemeralZNodeName; + LOGGER.info("Deleting ephemeral node under zNode: " + zNode + + " with name: " + nodePath); + tryToDeleteNode(nodePath); + } + } + + private void tryToDeleteNode(String nodePath) { + try { + client.delete().guaranteed().forPath(nodePath); + LOGGER.info("Delete node:" + nodePath); + } catch (KeeperException.NoNodeException ex) { + LOGGER.info("Node doesn't exist:" + nodePath); + } catch (Exception ex) { + LOGGER.warn("Delete node throws exception, treated as succeeded", ex); + } + } + + private void createNode( + String path, byte[] data, CreateMode mode, List aclList, boolean overwrite) + throws Exception { + if (overwrite) { + LOGGER.info("Delete node if exits" + path); + tryToDeleteNode(path); + } + client.create().creatingParentsIfNeeded() + .withMode(mode) + .withACL(aclList) + .forPath(path, data); + } + + @Override + public void stop() { + configured = false; + client.close(); + } + + @Override + public synchronized void configure(Context context) { + this.context = context; + if (!configured) { + zkString = context.getString(ZK_HOST_KEY) + + ":" + context.getString(ZK_PORT_KEY); + zkPathPrefix = context.getString(ZK_PATH_PREFIX_KEY); + zkEndpointPort = context.getString(ZK_ENDPOINT_PORT_KEY); + zkAuth = context.getString(ZK_AUTH_KEY); + + CuratorFrameworkFactory.Builder curatorFactory = CuratorFrameworkFactory.builder(); + if (zkAuth != null) { + curatorFactory.authorization("digest", zkAuth.getBytes()); + } + curatorFactory.connectString(zkString); + int baseSleepTimeMs = context.getInteger(ZK_BASE_SLEEP_MS, DEFAULT_ZK_BASE_SLEEP_TIME_MS); + int maxRetry = context.getInteger(ZK_MAX_RETRY, DEFAULT_ZK_MAX_RETRY); + curatorFactory.retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, maxRetry)); + client = curatorFactory.build(); + configured = true; + } + } +} diff --git a/flume-ng-core/src/test/java/org/apache/flume/register/service/TestZooKeeperRegisterService.java b/flume-ng-core/src/test/java/org/apache/flume/register/service/TestZooKeeperRegisterService.java new file mode 100644 index 0000000000..69a2f11a49 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/register/service/TestZooKeeperRegisterService.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.flume.register.service; + +import static org.junit.Assert.assertEquals; + +import java.net.InetAddress; +import java.util.List; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.GetChildrenBuilder; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingServer; +import org.apache.flume.Context; +import org.apache.flume.register.service.RegisterService; +import org.apache.flume.register.service.ZooKeeperRegisterService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestZooKeeperRegisterService { + + private TestingServer zkServer; + private CuratorFramework client; + + @Before + public void setUp() throws Exception { + zkServer = new TestingServer(); + client = CuratorFrameworkFactory + .newClient("localhost:" + zkServer.getPort(), + new ExponentialBackoffRetry(1000, 3)); + client.start(); + } + + @Test + public void testZooKeeperRegistration() throws Exception { + Context ctx = new Context(); + String[] hostPort = zkServer.getConnectString().split(":"); + ctx.put(ZooKeeperRegisterService.ZK_HOST_KEY, hostPort[0]); + ctx.put(ZooKeeperRegisterService.ZK_PORT_KEY, hostPort[1]); + ctx.put(ZooKeeperRegisterService.ZK_PATH_PREFIX_KEY, "/testzk"); + ctx.put(ZooKeeperRegisterService.ZK_ENDPOINT_PORT_KEY, "1463"); + ctx.put(ZooKeeperRegisterService.TIER_KEY, "tier1 tier2"); + + RegisterService registerService = new ZooKeeperRegisterService(); + registerService.configure(ctx); + registerService.start(); + + GetChildrenBuilder builder = client.getChildren(); + List paths = builder.forPath( + ctx.getString(ZooKeeperRegisterService.ZK_PATH_PREFIX_KEY)); + assertEquals(paths.size(), 2); + + for (String str : paths) { + List ephemeralNodes = builder.forPath( + ctx.getString(ZooKeeperRegisterService.ZK_PATH_PREFIX_KEY) + "/" + str); + assertEquals(ephemeralNodes.size(), 1); + String expectedZNodeName = InetAddress.getLocalHost().getHostName() + + ":" + ctx.getString(ZooKeeperRegisterService.ZK_ENDPOINT_PORT_KEY); + assertEquals(expectedZNodeName, ephemeralNodes.get(0)); + } + + registerService.stop(); + // Add new tier and check + ctx.put(ZooKeeperRegisterService.TIER_KEY, "tier1 tier2 tier3"); + registerService.configure(ctx); + registerService.start(); + + builder = client.getChildren(); + paths = builder.forPath( + ctx.getString(ZooKeeperRegisterService.ZK_PATH_PREFIX_KEY)); + assertEquals(paths.size(), 3); + for (String str : paths) { + List ephemeralNodes = builder.forPath( + ctx.getString(ZooKeeperRegisterService.ZK_PATH_PREFIX_KEY) + "/" + str); + assertEquals(ephemeralNodes.size(), 1); + String expectedZNodeName = InetAddress.getLocalHost().getHostName() + + ":" + ctx.getString(ZooKeeperRegisterService.ZK_ENDPOINT_PORT_KEY); + assertEquals(expectedZNodeName, ephemeralNodes.get(0)); + } + + registerService.stop(); + // Delete tier and check + ctx.put(ZooKeeperRegisterService.TIER_KEY, "tier1"); + registerService.configure(ctx); + registerService.start(); + + builder = client.getChildren(); + paths = builder.forPath( + ctx.getString(ZooKeeperRegisterService.ZK_PATH_PREFIX_KEY)); + assertEquals(paths.size(), 3); + for (String str : paths) { + List ephemeralNodes = builder.forPath( + ctx.getString(ZooKeeperRegisterService.ZK_PATH_PREFIX_KEY) + "/" + str); + if (str.contains("tier1")) { + assertEquals(ephemeralNodes.size(), 1); + String expectedZNodeName = InetAddress.getLocalHost().getHostName() + + ":" + ctx.getString(ZooKeeperRegisterService.ZK_ENDPOINT_PORT_KEY); + assertEquals(expectedZNodeName, ephemeralNodes.get(0)); + } else { + assertEquals(ephemeralNodes.size(), 0); + } + } + } + + @After + public void tearDown() throws Exception { + client.close(); + zkServer.stop(); + } + + +} diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 816550ae38..8f98aaf0dc 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -308,6 +308,57 @@ Argument Name Default Description **p** /flume Base Path in Zookeeper to store Agent configurations ================== ================ ========================================================================= + +Register agent to Zookeeper +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Flume supports Agent registeration to Zookeeper with hostname and server port. *This is an experimental feature.* +We introduce a new component called register service to implement the feature. The process will register a Ephemeral node in Zookeeper while the process is alive. Only digest ACL is supported now. + +Here's an example configuration: + +.. code-block:: properties + +agent.agent1.registerservices= scribeSource kafkaSource +agent.agent1.registerservices.scribeSource.tier=scribeSourceGroup1 scribeSourceGroup2 +agent.agent1.registerservices.scribeSource.type=zookeeper +agent.agent1.registerservices.scribeSource.zkhost=zookeeper.myhost.com +agent.agent1.registerservices.scribeSource.zkport=2181 +agent.agent1.registerservices.scribeSource.zkendpointport=1477 +agent.agent1.registerservices.scribeSource.zkpathprefix=/flumecluster1 +agent.agent1.registerservices.scribeSource.zkauth=flumeuser:passwd +agent.agent1.registerservices.kafkaSource.type=zookeeper +agent.agent1.registerservices.kafkaSource.tier=kafkaTier +agent.agent1.registerservices.kafkaSource.zkhost=zookeeper.myhost.com +agent.agent1.registerservices.kafkaSource.zkport=2181 +agent.agent1.registerservices.kafkaSource.zkendpointport=2477 +agent.agent1.registerservices.kafkaSource.zkpathprefix=/flumecluster1 +agent.agent1.registerservices.kafkaSource.zkauth=flumeuser:passwd + + +The zk path will be $zkpathprefix/$tier/$HOSTNAME:$zkendpointport +.. code-block:: properties +/flumecluster1/scribeSourceGroup1/$HOSTNAME:1477 +/flumecluster1/scribeSourceGroup1/$HOSTNAME:1477 +/flumecluster1/kafkaTier/$HOSTNAME:2477 + +Here're the configuration items needed to be configured to use this feature. + +================== ================ ========================================================================= +Property Name Default Description +================== ================ ========================================================================= +type -- Should always be zookeeper +zkhost -- Zookeeper host name +zkport -- Zookeeper server host +zkendpointport -- The port of flume agent to serve. The name of Zookeeper node will be $HOSTNAME:zkendpointport +zkpathprefix -- Zk prefix path to register the ephemeral node. +tier -- The parent folder of the ZooKeeper node, multiple args are supported. +zkauth -- Auth to connect to the Zookeeper, only digest ACL is supported now. +zkmaxretry 3 Zookeeper rpc max retry times +zkbasesleepms 1000 Zookeeper base sleep time used in curator lib. +================== ================ ========================================================================= + + Installing third-party plugins ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java index ae594440b3..eb0dadb22d 100644 --- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java +++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java @@ -33,6 +33,7 @@ import org.apache.flume.event.SimpleEvent; import org.apache.flume.lifecycle.LifecycleState; import org.apache.flume.node.MaterializedConfiguration; +import org.apache.flume.register.service.RegisterService; import org.junit.Before; import org.junit.Test; @@ -109,6 +110,16 @@ public void addSinkRunner(String name, SinkRunner sinkRunner) { public void addChannel(String name, Channel channel) { throw new UnsupportedOperationException(); } + + @Override + public void addRegisterService(String name, RegisterService service) { + throw new UnsupportedOperationException(); + } + + @Override + public Map getRegisterServices() { + throw new UnsupportedOperationException(); + } }; agent = new EmbeddedAgent(new MaterializedConfigurationProvider() { public MaterializedConfiguration get(String name, Map properties) { diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java index caf452215c..8c8031d959 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java @@ -56,6 +56,8 @@ import org.apache.flume.conf.sink.SinkConfiguration; import org.apache.flume.conf.sink.SinkGroupConfiguration; import org.apache.flume.conf.source.SourceConfiguration; +import org.apache.flume.register.service.RegisterService; +import org.apache.flume.register.service.RegisterServiceFactory; import org.apache.flume.sink.DefaultSinkFactory; import org.apache.flume.sink.DefaultSinkProcessor; import org.apache.flume.sink.SinkGroup; @@ -100,6 +102,7 @@ public MaterializedConfiguration getConfiguration() { Map channelComponentMap = Maps.newHashMap(); Map sourceRunnerMap = Maps.newHashMap(); Map sinkRunnerMap = Maps.newHashMap(); + Map registerServiceMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap); loadSources(agentConf, channelComponentMap, sourceRunnerMap); @@ -128,6 +131,10 @@ public MaterializedConfiguration getConfiguration() { for (Map.Entry entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } + loadRegisterServices(agentConf, registerServiceMap); + for (Entry entry : registerServiceMap.entrySet()) { + conf.addRegisterService(entry.getKey(), entry.getValue()); + } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { @@ -534,6 +541,24 @@ private void loadSinkGroups(AgentConfiguration agentConf, } } } + + private void loadRegisterServices( + AgentConfiguration agentConf, Map registerServiceMap) { + Map registerServiceContextMap = agentConf.getRegisterServiceContextMap(); + for (Entry entry :registerServiceContextMap.entrySet()) { + String registerServiceName = entry.getKey(); + Context context = entry.getValue(); + String registerServiceType = context.getString(BasicConfigurationConstants.CONFIG_TYPE); + RegisterService registerService = + RegisterServiceFactory.newInstance(registerServiceType); + Configurables.configure(registerService, context); + registerService.setName(registerServiceName); + LOGGER.info("Created register service " + registerServiceName + + ", type:" + registerServiceType); + registerServiceMap.put(registerServiceName, registerService); + } + } + private static class ChannelComponent { final Channel channel; final List components; @@ -554,4 +579,4 @@ protected Map toMap(Properties properties) { } return result; } -} \ No newline at end of file +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java index 406bb7de61..c02e12b54f 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java @@ -37,6 +37,7 @@ import org.apache.flume.SourceRunner; import org.apache.flume.instrumentation.MonitorService; import org.apache.flume.instrumentation.MonitoringType; +import org.apache.flume.register.service.RegisterService; import org.apache.flume.lifecycle.LifecycleAware; import org.apache.flume.lifecycle.LifecycleState; import org.apache.flume.lifecycle.LifecycleSupervisor; @@ -50,6 +51,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; @@ -152,6 +154,12 @@ private void stopAllComponents() { logger.error("Error while stopping {}", entry.getValue(), e); } } + + for (Entry entry: + this.materializedConfiguration.getRegisterServices().entrySet()) { + logger.info("Stopping RegisterService" + entry.getKey()); + entry.getValue().stop(); + } } if (monitorServer != null) { monitorServer.stop(); @@ -211,10 +219,27 @@ private void startAllComponents(MaterializedConfiguration materializedConfigurat logger.error("Error while starting {}", entry.getValue(), e); } } - + this.loadRegisterService(materializedConfiguration); this.loadMonitoring(); } + private void loadRegisterService( + MaterializedConfiguration materializedConfiguration) { + logger.info("Loading register serviece"); + Map registerServiceMap = + materializedConfiguration.getRegisterServices(); + for (Entry entry : registerServiceMap.entrySet()) { + String registerServiceName = entry.getKey(); + RegisterService registerService = entry.getValue(); + try { + registerService.start(); + logger.info("Started register service:" + registerServiceName); + } catch (Exception ex) { + logger.error("Start register service failed:" + registerServiceName, ex); + } + } + } + @SuppressWarnings("unchecked") private void loadMonitoring() { Properties systemProps = System.getProperties(); @@ -374,4 +399,4 @@ public void run() { logger.error("A fatal error occurred while running. Exception follows.", e); } } -} \ No newline at end of file +} diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java b/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java index fa3ef55549..4610614f7b 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/MaterializedConfiguration.java @@ -22,6 +22,7 @@ import org.apache.flume.Channel; import org.apache.flume.SinkRunner; import org.apache.flume.SourceRunner; +import org.apache.flume.register.service.RegisterService; import java.util.Map; @@ -36,6 +37,8 @@ public interface MaterializedConfiguration { public void addSinkRunner(String name, SinkRunner sinkRunner); + public void addRegisterService(String name, RegisterService service); + public void addChannel(String name, Channel channel); public Map getSourceRunners(); @@ -44,4 +47,5 @@ public interface MaterializedConfiguration { public Map getChannels(); + public Map getRegisterServices(); } diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java b/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java index fa58dff1a6..332fffa2b4 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/SimpleMaterializedConfiguration.java @@ -25,6 +25,7 @@ import org.apache.flume.Channel; import org.apache.flume.SinkRunner; import org.apache.flume.SourceRunner; +import org.apache.flume.register.service.RegisterService; import com.google.common.collect.ImmutableMap; @@ -33,17 +34,19 @@ public class SimpleMaterializedConfiguration implements MaterializedConfiguratio private final Map channels; private final Map sourceRunners; private final Map sinkRunners; + private final Map registerServiceMap; public SimpleMaterializedConfiguration() { channels = new HashMap(); sourceRunners = new HashMap(); sinkRunners = new HashMap(); + registerServiceMap = new HashMap(); } @Override public String toString() { return "{ sourceRunners:" + sourceRunners + " sinkRunners:" + sinkRunners - + " channels:" + channels + " }"; + + " channels:" + channels + " registerService: " + registerServiceMap + " }"; } @Override public void addSourceRunner(String name, SourceRunner sourceRunner) { @@ -60,6 +63,16 @@ public void addChannel(String name, Channel channel) { channels.put(name, channel); } + @Override + public void addRegisterService(String name, RegisterService value) { + registerServiceMap.put(name, value); + } + + @Override + public ImmutableMap getRegisterServices() { + return ImmutableMap.copyOf(registerServiceMap); + } + @Override public Map getChannels() { return ImmutableMap.copyOf(channels);