Skip to content

Commit

Permalink
FLUME-3256: Adding RegisterService Component and Zookeeper registrati…
Browse files Browse the repository at this point in the history
…on for service discovery

1. Add a new component named RegisterService for agent discovery
2. Implement a Zookeeper based RegisterService to use zookeeper for service discovery
3. Unit tests
4. User guide change for Zookeeper RegisterService
  • Loading branch information
JohnZZGithub committed Jul 13, 2018
1 parent a76e2e9 commit 45a8b8b
Show file tree
Hide file tree
Showing 19 changed files with 889 additions and 9 deletions.
Expand Up @@ -36,6 +36,9 @@ public final class BasicConfigurationConstants {
public static final String CONFIG_CHANNELS = "channels";
public static final String CONFIG_CHANNELS_PREFIX = CONFIG_CHANNELS + ".";

public static final String CONFIG_REGISTER_SERVICES = "registerservices";
public static final String CONFIG_REGISTER_SERVICES_PREFIX = CONFIG_REGISTER_SERVICES + ".";

public static final String CONFIG_CONFIG = "config";
public static final String CONFIG_TYPE = "type";

Expand Down
Expand Up @@ -132,7 +132,8 @@ public enum ComponentType {
SINK_PROCESSOR("SinkProcessor"),
SINKGROUP("Sinkgroup"),
CHANNEL("Channel"),
CHANNELSELECTOR("ChannelSelector");
CHANNELSELECTOR("ChannelSelector"),
REGISTER_SERVICE("RegisterService");

private final String componentType;

Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.flume.conf.channel.ChannelConfiguration.ChannelConfigurationType;
import org.apache.flume.conf.channel.ChannelSelectorConfiguration.ChannelSelectorConfigurationType;
import org.apache.flume.conf.configfilter.ConfigFilterConfiguration.ConfigFilterConfigurationType;
import org.apache.flume.conf.register.service.RegisterServiceConfiguration;
import org.apache.flume.conf.sink.SinkConfiguration.SinkConfigurationType;
import org.apache.flume.conf.sink.SinkGroupConfiguration;
import org.apache.flume.conf.sink.SinkProcessorConfiguration.SinkProcessorConfigurationType;
Expand Down Expand Up @@ -65,6 +66,8 @@ public static ComponentConfiguration create(String name, String type, ComponentT
.getConfiguration(name);
case SINKGROUP:
return new SinkGroupConfiguration(name);
case REGISTER_SERVICE:
return new RegisterServiceConfiguration(name);
default:
throw new ConfigurationException(
"Cannot create configuration. Unknown Type specified: " + type);
Expand Down
Expand Up @@ -21,6 +21,8 @@
import org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning;
import org.apache.flume.conf.channel.ChannelConfiguration;
import org.apache.flume.conf.channel.ChannelType;
import org.apache.flume.conf.register.service.RegisterServiceConfiguration;
import org.apache.flume.conf.register.service.RegisterServiceType;
import org.apache.flume.configfilter.ConfigFilter;
import org.apache.flume.conf.configfilter.ConfigFilterConfiguration;
import org.apache.flume.conf.configfilter.ConfigFilterType;
Expand Down Expand Up @@ -54,6 +56,8 @@
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIG;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIGFILTERS;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIGFILTERS_PREFIX;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_REGISTER_SERVICES;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_REGISTER_SERVICES_PREFIX;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKGROUPS;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKGROUPS_PREFIX;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKS;
Expand Down Expand Up @@ -214,8 +218,8 @@ private boolean addRawProperty(String rawName, String rawValue) {
agentConfigMap.put(agentName, aconf);
}

// Each configuration key must begin with one of the three prefixes:
// sources, sinks, or channels.
// Each configuration key must begin with one of the four prefixes:
// sources, sinks, channels or registerservices.
return aconf.addProperty(configKey, value);
}

Expand All @@ -233,6 +237,7 @@ public static class AgentConfiguration {
private String sinks;
private String channels;
private String sinkgroups;
private String registerServices;

private final Map<String, ComponentConfiguration> sourceConfigMap;
private final Map<String, ComponentConfiguration> sinkConfigMap;
Expand All @@ -245,12 +250,14 @@ public static class AgentConfiguration {
private Map<String, Context> sinkContextMap;
private Map<String, Context> channelContextMap;
private Map<String, Context> sinkGroupContextMap;
private Map<String, Context> registerServiceContextMap;

private Set<String> sinkSet;
private Set<String> configFilterSet;
private Set<String> sourceSet;
private Set<String> channelSet;
private Set<String> sinkgroupSet;
private Set<String> registerServiceSet;

private final List<FlumeConfigurationError> errorList;
private List<ConfigFilter> configFiltersInstances;
Expand All @@ -260,6 +267,7 @@ private AgentConfiguration(String agentName,
List<FlumeConfigurationError> errorList) {
this.agentName = agentName;
this.errorList = errorList;
this.registerServices = "";
configFilterConfigMap = new HashMap<>();
sourceConfigMap = new HashMap<>();
sinkConfigMap = new HashMap<>();
Expand All @@ -272,6 +280,7 @@ private AgentConfiguration(String agentName,
sinkGroupContextMap = new HashMap<>();
configFiltersInstances = new ArrayList<>();
configFilterPatternCache = new HashMap<>();
registerServiceContextMap = new HashMap<>();
}

public Map<String, ComponentConfiguration> getChannelConfigMap() {
Expand Down Expand Up @@ -310,6 +319,10 @@ public Map<String, Context> getChannelContext() {
return channelContextMap;
}

public Map<String, Context> getRegisterServiceContextMap() {
return registerServiceContextMap;
}

public Set<String> getSinkSet() {
return sinkSet;
}
Expand All @@ -330,6 +343,9 @@ public Set<String> getSinkgroupSet() {
return sinkgroupSet;
}

public Set<String> getRegisterServiceSet() {
return registerServiceSet;
}

/**
* <p>
Expand Down Expand Up @@ -383,6 +399,13 @@ private boolean isValid() {
sinkSet = validateSinks(channelSet);
sinkgroupSet = validateGroups(sinkSet);

if (registerServices.isEmpty()) {
registerServiceSet = new HashSet<>();
} else {
registerServiceSet = new HashSet<>(Arrays.asList(registerServices.split("\\s+")));
}
registerServiceSet = validateRegisterServices(registerServiceSet);

// If no sources or sinks are present, then this is invalid
if (sourceSet.isEmpty() && sinkSet.isEmpty()) {
LOGGER.warn(
Expand Down Expand Up @@ -525,6 +548,9 @@ private <T extends ComponentWithClassName> T getKnownComponent(String type, T[]
return null;
}

private RegisterServiceType getKnownConfigRegisterService(String type) {
return getKnownComponent(type, RegisterServiceType.values());
}

/**
* If it is a known component it will do the full validation required for
Expand Down Expand Up @@ -1005,6 +1031,55 @@ private Set<String> validGroupSinks(Set<String> sinkSet,
return groupSinks;
}

private Set<String> validateRegisterServices(Set<String> registerServiceSet) {
Iterator<String> iter = registerServiceSet.iterator();
Map<String, Context> newContextMap = new HashMap<>();
while (iter.hasNext()) {
String registerServiceName = iter.next();
LOGGER.error("Register service name is :" + registerServiceName);
Context registerServiceContext = registerServiceContextMap.get(registerServiceName);
if (registerServiceContext == null) {
iter.remove();
LOGGER.error(registerServiceName + "'s context is not set");
addError(registerServiceName, CONFIG_ERROR, ERROR);
continue;
}
RegisterServiceType regSrvType = getKnownConfigRegisterService(
registerServiceContext.getString(BasicConfigurationConstants.CONFIG_TYPE));
if (regSrvType == null) {
iter.remove();
LOGGER.error(registerServiceName + " has wrong config type:" + regSrvType);
addError(registerServiceName, CONFIG_ERROR, ERROR);
continue;
}

RegisterServiceConfiguration conf = null;
String config = regSrvType.toString().toUpperCase(Locale.ENGLISH);
try {
conf = (RegisterServiceConfiguration) ComponentConfigurationFactory.create(
registerServiceName, config, ComponentType.REGISTER_SERVICE);

} catch (ConfigurationException e) {
LOGGER.error("Could not configure register service {} due to: {}",
new Object[]{registerServiceName, e.getMessage(), e});
}
if (conf == null) {
iter.remove();
LOGGER.error(registerServiceName + " is not well configured");
addError(registerServiceName, CONFIG_ERROR, ERROR);
continue;
}
conf.configure(registerServiceContext);
errorList.addAll(conf.getErrors());
newContextMap.put(registerServiceName, registerServiceContext);
}
registerServiceContextMap = newContextMap;
Set<String> tempRegisterServiceSet = new HashSet<String>();
tempRegisterServiceSet.addAll(registerServiceContextMap.keySet());
registerServiceSet.retainAll(tempRegisterServiceSet);
return registerServiceSet;
}

private String getSpaceDelimitedList(Set<String> entries) {
if (entries.isEmpty()) {
return null;
Expand Down Expand Up @@ -1140,7 +1215,6 @@ private boolean addProperty(String key, String value) {
if (CONFIG_SINKGROUPS.equals(key)) {
if (sinkgroups == null) {
sinkgroups = value;

return true;
} else {
LOGGER.warn("Duplicate sinkgroup list specfied for agent: {}", agentName);
Expand All @@ -1149,11 +1223,24 @@ private boolean addProperty(String key, String value) {
}
}

// Check for register services
if (CONFIG_REGISTER_SERVICES.equals(key)) {
if (registerServices == null || registerServices.isEmpty()) {
registerServices = value;
return true;
} else {
LOGGER.warn("Duplicate register service list specfied for agent: {}", agentName);
addError(CONFIG_REGISTER_SERVICES, DUPLICATE_PROPERTY, ERROR);
return false;
}
}

if (addAsSourceConfig(key, value)
|| addAsChannelValue(key, value)
|| addAsSinkConfig(key, value)
|| addAsSinkGroupConfig(key, value)
|| addAsConfigFilterConfig(key, value)
|| addAsRegisterServiceConfig(key, value)
) {
return true;
}
Expand Down Expand Up @@ -1193,6 +1280,11 @@ private boolean addAsSourceConfig(String key, String value) {
);
}

private boolean addAsRegisterServiceConfig(String key, String value) {
return addComponentConfig(
key, value, CONFIG_REGISTER_SERVICES_PREFIX, registerServiceContextMap);
}

private boolean addComponentConfig(
String key, String value, String configPrefix, Map<String, Context> contextMap

Expand Down
Expand Up @@ -69,6 +69,147 @@ public void testFLUME1743() {
Assert.assertTrue(agentConfiguration.getConfigFilterSet().contains("f1"));
}

@Test
public void testGeneralRegisterServiceConfig() {
Properties properties = new Properties();
properties.put("agent1.channels", "ch0");
properties.put("agent1.channels.ch0.type", "memory");

properties.put("agent1.sources", "src0");
properties.put("agent1.sources.src0.type", "multiport_syslogtcp");
properties.put("agent1.sources.src0.channels", "ch0");
properties.put("agent1.sources.src0.host", "localhost");
properties.put("agent1.sources.src0.ports", "10001 10002 10003");
properties.put("agent1.sources.src0.portHeader", "port");

properties.put("agent1.sinks", "sink0");
properties.put("agent1.sinks.sink0.type", "null");
properties.put("agent1.sinks.sink0.channel", "ch0");

properties.put("agent1.registerservices", "zkreg0 zkreg1 ");
properties.put("agent1.registerservices.zkreg0.type", "zookeeper");
properties.put("agent1.registerservices.zkreg0.zkhost", "host1");
properties.put("agent1.registerservices.zkreg1.type", "zookeeper");
properties.put("agent1.registerservices.zkreg1.zkhost", "host1");

properties.put("agent1.configfilters", "f1");
properties.put("agent1.configfilters.f1.type", "env");

FlumeConfiguration conf = new FlumeConfiguration(properties);
AgentConfiguration agentConfiguration = conf.getConfigurationFor("agent1");
Assert.assertEquals(String.valueOf(agentConfiguration.getSourceSet()), 1,
agentConfiguration.getSourceSet().size());
Assert.assertEquals(String.valueOf(agentConfiguration.getChannelSet()), 1,
agentConfiguration.getChannelSet().size());
Assert.assertEquals(String.valueOf(agentConfiguration.getSinkSet()), 1,
agentConfiguration.getSinkSet().size());
Assert.assertTrue(agentConfiguration.getSourceSet().contains("src0"));
Assert.assertTrue(agentConfiguration.getChannelSet().contains("ch0"));
Assert.assertTrue(agentConfiguration.getSinkSet().contains("sink0"));
Assert.assertTrue(agentConfiguration.getConfigFilterSet().contains("f1"));

// Register Service related
Assert.assertEquals(agentConfiguration.getRegisterServiceSet().size(), 2);
Assert.assertTrue(agentConfiguration.getRegisterServiceSet().contains("zkreg0"));
Assert.assertTrue(agentConfiguration.getRegisterServiceSet().contains("zkreg1"));
Assert.assertEquals(agentConfiguration.getRegisterServiceContextMap().size(), 2);
Assert.assertTrue(agentConfiguration.getRegisterServiceContextMap().containsKey("zkreg0"));
Assert.assertTrue(agentConfiguration.getRegisterServiceContextMap().containsKey("zkreg1"));
}

@Test
public void testRegisterServiceWithUselessConfig() {
Properties properties = new Properties();
properties.put("agent1.channels", "ch0");
properties.put("agent1.channels.ch0.type", "memory");

properties.put("agent1.sources", "src0");
properties.put("agent1.sources.src0.type", "multiport_syslogtcp");
properties.put("agent1.sources.src0.channels", "ch0");
properties.put("agent1.sources.src0.host", "localhost");
properties.put("agent1.sources.src0.ports", "10001 10002 10003");
properties.put("agent1.sources.src0.portHeader", "port");

properties.put("agent1.sinks", "sink0");
properties.put("agent1.sinks.sink0.type", "null");
properties.put("agent1.sinks.sink0.channel", "ch0");

properties.put("agent1.registerservices", "zkreg ");
properties.put("agent1.registerservices.zkreg.type", "zookeeper");
properties.put("agent1.registerservices.zkreg.zkhost", "host1");
properties.put("agent1.registerservices.zkreg1.type", "zookeeper");
properties.put("agent1.registerservices.zkreg1.zkhost", "host1");

properties.put("agent1.configfilters", "f1");
properties.put("agent1.configfilters.f1.type", "env");

FlumeConfiguration conf = new FlumeConfiguration(properties);
AgentConfiguration agentConfiguration = conf.getConfigurationFor("agent1");
Assert.assertEquals(String.valueOf(agentConfiguration.getSourceSet()), 1,
agentConfiguration.getSourceSet().size());
Assert.assertEquals(String.valueOf(agentConfiguration.getChannelSet()), 1,
agentConfiguration.getChannelSet().size());
Assert.assertEquals(String.valueOf(agentConfiguration.getSinkSet()), 1,
agentConfiguration.getSinkSet().size());
Assert.assertTrue(agentConfiguration.getSourceSet().contains("src0"));
Assert.assertTrue(agentConfiguration.getChannelSet().contains("ch0"));
Assert.assertTrue(agentConfiguration.getSinkSet().contains("sink0"));
Assert.assertTrue(agentConfiguration.getConfigFilterSet().contains("f1"));

// Register Service related
Assert.assertEquals(agentConfiguration.getRegisterServiceSet().size(), 1);
Assert.assertTrue(agentConfiguration.getRegisterServiceSet().contains("zkreg"));
Assert.assertEquals(agentConfiguration.getRegisterServiceContextMap().size(), 1);
Assert.assertTrue(agentConfiguration.getRegisterServiceContextMap().containsKey("zkreg"));
}

@Test
public void testRegisterServiceWithUnkownType() {
Properties properties = new Properties();
properties.put("agent1.channels", "ch0");
properties.put("agent1.channels.ch0.type", "memory");

properties.put("agent1.sources", "src0");
properties.put("agent1.sources.src0.type", "multiport_syslogtcp");
properties.put("agent1.sources.src0.channels", "ch0");
properties.put("agent1.sources.src0.host", "localhost");
properties.put("agent1.sources.src0.ports", "10001 10002 10003");
properties.put("agent1.sources.src0.portHeader", "port");

properties.put("agent1.sinks", "sink0");
properties.put("agent1.sinks.sink0.type", "null");
properties.put("agent1.sinks.sink0.channel", "ch0");

properties.put("agent1.registerservices", "zkreg zkreg1");
properties.put("agent1.registerservices.zkreg.type", "zookeeper");
properties.put("agent1.registerservices.zkreg.zkhost", "host1");
properties.put("agent1.registerservices.zkreg1.type", "ddd");
properties.put("agent1.registerservices.zkreg1.zkhost", "host1");

properties.put("agent1.configfilters", "f1");
properties.put("agent1.configfilters.f1.type", "env");

FlumeConfiguration conf = new FlumeConfiguration(properties);
AgentConfiguration agentConfiguration = conf.getConfigurationFor("agent1");
Assert.assertEquals(String.valueOf(agentConfiguration.getSourceSet()), 1,
agentConfiguration.getSourceSet().size());
Assert.assertEquals(String.valueOf(agentConfiguration.getChannelSet()), 1,
agentConfiguration.getChannelSet().size());
Assert.assertEquals(String.valueOf(agentConfiguration.getSinkSet()), 1,
agentConfiguration.getSinkSet().size());
Assert.assertTrue(agentConfiguration.getSourceSet().contains("src0"));
Assert.assertTrue(agentConfiguration.getChannelSet().contains("ch0"));
Assert.assertTrue(agentConfiguration.getSinkSet().contains("sink0"));
Assert.assertTrue(agentConfiguration.getConfigFilterSet().contains("f1"));

// Register Service related
Assert.assertEquals(agentConfiguration.getRegisterServiceSet().size(), 1);
Assert.assertTrue(agentConfiguration.getRegisterServiceSet().contains("zkreg"));
Assert.assertEquals(agentConfiguration.getRegisterServiceContextMap().size(), 1);
Assert.assertTrue(agentConfiguration.getRegisterServiceContextMap().containsKey("zkreg"));
Assert.assertEquals(conf.getConfigurationErrors().size(), 1);
}

@Test
public void testFlumeConfigAdsErrorOnNullName() {
HashMap<String, String> properties = new HashMap<>();
Expand Down

0 comments on commit 45a8b8b

Please sign in to comment.