Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLUME-3256: Adding RegisterService Component and Zookeeper registrati… #217

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -36,6 +36,9 @@ public final class BasicConfigurationConstants {
public static final String CONFIG_CHANNELS = "channels";
public static final String CONFIG_CHANNELS_PREFIX = CONFIG_CHANNELS + ".";

public static final String CONFIG_REGISTER_SERVICES = "registerservices";
public static final String CONFIG_REGISTER_SERVICES_PREFIX = CONFIG_REGISTER_SERVICES + ".";

public static final String CONFIG_CONFIG = "config";
public static final String CONFIG_TYPE = "type";

Expand Down
Expand Up @@ -132,7 +132,8 @@ public enum ComponentType {
SINK_PROCESSOR("SinkProcessor"),
SINKGROUP("Sinkgroup"),
CHANNEL("Channel"),
CHANNELSELECTOR("ChannelSelector");
CHANNELSELECTOR("ChannelSelector"),
REGISTER_SERVICE("RegisterService");

private final String componentType;

Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.flume.conf.channel.ChannelConfiguration.ChannelConfigurationType;
import org.apache.flume.conf.channel.ChannelSelectorConfiguration.ChannelSelectorConfigurationType;
import org.apache.flume.conf.configfilter.ConfigFilterConfiguration.ConfigFilterConfigurationType;
import org.apache.flume.conf.register.service.RegisterServiceConfiguration;
import org.apache.flume.conf.sink.SinkConfiguration.SinkConfigurationType;
import org.apache.flume.conf.sink.SinkGroupConfiguration;
import org.apache.flume.conf.sink.SinkProcessorConfiguration.SinkProcessorConfigurationType;
Expand Down Expand Up @@ -65,6 +66,8 @@ public static ComponentConfiguration create(String name, String type, ComponentT
.getConfiguration(name);
case SINKGROUP:
return new SinkGroupConfiguration(name);
case REGISTER_SERVICE:
return new RegisterServiceConfiguration(name);
default:
throw new ConfigurationException(
"Cannot create configuration. Unknown Type specified: " + type);
Expand Down
Expand Up @@ -21,6 +21,8 @@
import org.apache.flume.conf.FlumeConfigurationError.ErrorOrWarning;
import org.apache.flume.conf.channel.ChannelConfiguration;
import org.apache.flume.conf.channel.ChannelType;
import org.apache.flume.conf.register.service.RegisterServiceConfiguration;
import org.apache.flume.conf.register.service.RegisterServiceType;
import org.apache.flume.configfilter.ConfigFilter;
import org.apache.flume.conf.configfilter.ConfigFilterConfiguration;
import org.apache.flume.conf.configfilter.ConfigFilterType;
Expand Down Expand Up @@ -54,6 +56,8 @@
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIG;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIGFILTERS;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_CONFIGFILTERS_PREFIX;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_REGISTER_SERVICES;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_REGISTER_SERVICES_PREFIX;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKGROUPS;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKGROUPS_PREFIX;
import static org.apache.flume.conf.BasicConfigurationConstants.CONFIG_SINKS;
Expand Down Expand Up @@ -214,8 +218,8 @@ private boolean addRawProperty(String rawName, String rawValue) {
agentConfigMap.put(agentName, aconf);
}

// Each configuration key must begin with one of the three prefixes:
// sources, sinks, or channels.
// Each configuration key must begin with one of the four prefixes:
// sources, sinks, channels or registerservices.
return aconf.addProperty(configKey, value);
}

Expand All @@ -233,6 +237,7 @@ public static class AgentConfiguration {
private String sinks;
private String channels;
private String sinkgroups;
private String registerServices;

private final Map<String, ComponentConfiguration> sourceConfigMap;
private final Map<String, ComponentConfiguration> sinkConfigMap;
Expand All @@ -245,12 +250,14 @@ public static class AgentConfiguration {
private Map<String, Context> sinkContextMap;
private Map<String, Context> channelContextMap;
private Map<String, Context> sinkGroupContextMap;
private Map<String, Context> registerServiceContextMap;

private Set<String> sinkSet;
private Set<String> configFilterSet;
private Set<String> sourceSet;
private Set<String> channelSet;
private Set<String> sinkgroupSet;
private Set<String> registerServiceSet;

private final List<FlumeConfigurationError> errorList;
private List<ConfigFilter> configFiltersInstances;
Expand All @@ -260,6 +267,7 @@ private AgentConfiguration(String agentName,
List<FlumeConfigurationError> errorList) {
this.agentName = agentName;
this.errorList = errorList;
this.registerServices = "";
configFilterConfigMap = new HashMap<>();
sourceConfigMap = new HashMap<>();
sinkConfigMap = new HashMap<>();
Expand All @@ -272,6 +280,7 @@ private AgentConfiguration(String agentName,
sinkGroupContextMap = new HashMap<>();
configFiltersInstances = new ArrayList<>();
configFilterPatternCache = new HashMap<>();
registerServiceContextMap = new HashMap<>();
}

public Map<String, ComponentConfiguration> getChannelConfigMap() {
Expand Down Expand Up @@ -310,6 +319,10 @@ public Map<String, Context> getChannelContext() {
return channelContextMap;
}

public Map<String, Context> getRegisterServiceContextMap() {
return registerServiceContextMap;
}

public Set<String> getSinkSet() {
return sinkSet;
}
Expand All @@ -330,6 +343,9 @@ public Set<String> getSinkgroupSet() {
return sinkgroupSet;
}

public Set<String> getRegisterServiceSet() {
return registerServiceSet;
}

/**
* <p>
Expand Down Expand Up @@ -383,6 +399,13 @@ private boolean isValid() {
sinkSet = validateSinks(channelSet);
sinkgroupSet = validateGroups(sinkSet);

if (registerServices.isEmpty()) {
registerServiceSet = new HashSet<>();
} else {
registerServiceSet = new HashSet<>(Arrays.asList(registerServices.split("\\s+")));
}
registerServiceSet = validateRegisterServices(registerServiceSet);

// If no sources or sinks are present, then this is invalid
if (sourceSet.isEmpty() && sinkSet.isEmpty()) {
LOGGER.warn(
Expand Down Expand Up @@ -525,6 +548,9 @@ private <T extends ComponentWithClassName> T getKnownComponent(String type, T[]
return null;
}

private RegisterServiceType getKnownConfigRegisterService(String type) {
return getKnownComponent(type, RegisterServiceType.values());
}

/**
* If it is a known component it will do the full validation required for
Expand Down Expand Up @@ -1005,6 +1031,55 @@ private Set<String> validGroupSinks(Set<String> sinkSet,
return groupSinks;
}

private Set<String> validateRegisterServices(Set<String> registerServiceSet) {
Iterator<String> iter = registerServiceSet.iterator();
Map<String, Context> newContextMap = new HashMap<>();
while (iter.hasNext()) {
String registerServiceName = iter.next();
LOGGER.error("Register service name is :" + registerServiceName);
Context registerServiceContext = registerServiceContextMap.get(registerServiceName);
if (registerServiceContext == null) {
iter.remove();
LOGGER.error(registerServiceName + "'s context is not set");
addError(registerServiceName, CONFIG_ERROR, ERROR);
continue;
}
RegisterServiceType regSrvType = getKnownConfigRegisterService(
registerServiceContext.getString(BasicConfigurationConstants.CONFIG_TYPE));
if (regSrvType == null) {
iter.remove();
LOGGER.error(registerServiceName + " has wrong config type:" + regSrvType);
addError(registerServiceName, CONFIG_ERROR, ERROR);
continue;
}

RegisterServiceConfiguration conf = null;
String config = regSrvType.toString().toUpperCase(Locale.ENGLISH);
try {
conf = (RegisterServiceConfiguration) ComponentConfigurationFactory.create(
registerServiceName, config, ComponentType.REGISTER_SERVICE);

} catch (ConfigurationException e) {
LOGGER.error("Could not configure register service {} due to: {}",
new Object[]{registerServiceName, e.getMessage(), e});
}
if (conf == null) {
iter.remove();
LOGGER.error(registerServiceName + " is not well configured");
addError(registerServiceName, CONFIG_ERROR, ERROR);
continue;
}
conf.configure(registerServiceContext);
errorList.addAll(conf.getErrors());
newContextMap.put(registerServiceName, registerServiceContext);
}
registerServiceContextMap = newContextMap;
Set<String> tempRegisterServiceSet = new HashSet<String>();
tempRegisterServiceSet.addAll(registerServiceContextMap.keySet());
registerServiceSet.retainAll(tempRegisterServiceSet);
return registerServiceSet;
}

private String getSpaceDelimitedList(Set<String> entries) {
if (entries.isEmpty()) {
return null;
Expand Down Expand Up @@ -1140,7 +1215,6 @@ private boolean addProperty(String key, String value) {
if (CONFIG_SINKGROUPS.equals(key)) {
if (sinkgroups == null) {
sinkgroups = value;

return true;
} else {
LOGGER.warn("Duplicate sinkgroup list specfied for agent: {}", agentName);
Expand All @@ -1149,11 +1223,24 @@ private boolean addProperty(String key, String value) {
}
}

// Check for register services
if (CONFIG_REGISTER_SERVICES.equals(key)) {
if (registerServices == null || registerServices.isEmpty()) {
registerServices = value;
return true;
} else {
LOGGER.warn("Duplicate register service list specfied for agent: {}", agentName);
addError(CONFIG_REGISTER_SERVICES, DUPLICATE_PROPERTY, ERROR);
return false;
}
}

if (addAsSourceConfig(key, value)
|| addAsChannelValue(key, value)
|| addAsSinkConfig(key, value)
|| addAsSinkGroupConfig(key, value)
|| addAsConfigFilterConfig(key, value)
|| addAsRegisterServiceConfig(key, value)
) {
return true;
}
Expand Down Expand Up @@ -1193,6 +1280,11 @@ private boolean addAsSourceConfig(String key, String value) {
);
}

private boolean addAsRegisterServiceConfig(String key, String value) {
return addComponentConfig(
key, value, CONFIG_REGISTER_SERVICES_PREFIX, registerServiceContextMap);
}

private boolean addComponentConfig(
String key, String value, String configPrefix, Map<String, Context> contextMap

Expand Down
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.flume.conf.register.service;

import org.apache.flume.Context;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.conf.ConfigurationException;

public class RegisterServiceConfiguration extends ComponentConfiguration {
public RegisterServiceConfiguration(String componentName) {
super(componentName);
}

@Override
public void configure(Context context) throws ConfigurationException {
super.configure(context);
try {
RegisterServiceType.valueOf(getType().trim().toUpperCase());
} catch (Exception ex) {
throw new ConfigurationException(
"RegisterServiceConfiguration validation failed due to type" + getType(), ex);
}
}
}
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.flume.conf.register.service;

import org.apache.flume.conf.ComponentWithClassName;

public enum RegisterServiceType implements ComponentWithClassName {
/**
* Null register service
*/
OTHER(null),

/**
* Replicating channel selector.
*/
ZOOKEEPER("org.apache.flume.register.service.ZooKeeperRegisterService");

private RegisterServiceType(String registerServiceClassName) {
this.registerServiceClassName = registerServiceClassName;
}

private final String registerServiceClassName;

@Override
public String getClassName() {
return registerServiceClassName;
}
}