Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

FLUME-1852. Issues with EmbeddedAgentConfiguration.

(Brock Noland via Mike Percy)
  • Loading branch information...
commit 3df65e12c8d480cd46f190a0bb4addfee4272062 1 parent d45af17
@mpercy mpercy authored
View
73 ...edded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
@@ -33,6 +33,7 @@
import org.apache.flume.conf.sink.SinkType;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
/**
@@ -65,7 +66,7 @@
public static final String SINKS_PREFIX = join(SINKS, "");
/**
- * Source type, choices are `embedded' or `avro'
+ * Source type, choices are `embedded'
*/
public static final String SOURCE_TYPE = join(SOURCE, TYPE);
/**
@@ -81,7 +82,7 @@
*/
public static final String CHANNEL_PREFIX = join(CHANNEL, "");
/**
- * Sink processor type, choices are `default' (failover) or `load_balance'
+ * Sink processor type, choices are `default', `failover' or `load_balance'
*/
public static final String SINK_PROCESSOR_TYPE = join(SINK_PROCESSOR, TYPE);
/**
@@ -90,10 +91,11 @@
public static final String SINK_PROCESSOR_PREFIX = join(SINK_PROCESSOR, "");
/**
* Embedded source which provides simple in-memory transfer to channel.
- * Use this source via the put,pulAll methods on the EmbeddedAgent. This
- * is the recommended source to use for Embedded Agents.
+ * Use this source via the put,putAll methods on the EmbeddedAgent. This
+ * is the only supported source to use for Embedded Agents.
*/
public static final String SOURCE_TYPE_EMBEDDED = EmbeddedSource.class.getName();
+ private static final String SOURCE_TYPE_EMBEDDED_ALIAS = "EMBEDDED";
/**
* Memory channel which stores events in heap. See Flume User Guide for
* configuration information. This is the recommended channel to use for
@@ -101,8 +103,8 @@
*/
public static final String CHANNEL_TYPE_MEMORY = ChannelType.MEMORY.name();
/**
- * File based channel which stores events in heap. See Flume User Guide for
- * configuration information.
+ * File based channel which stores events in on local disk. See Flume User
+ * Guide for configuration information.
*/
public static final String CHANNEL_TYPE_FILE = ChannelType.FILE.name();
@@ -129,6 +131,7 @@
private static final String[] ALLOWED_SOURCES = {
+ SOURCE_TYPE_EMBEDDED_ALIAS,
SOURCE_TYPE_EMBEDDED,
};
@@ -147,6 +150,9 @@
SINK_PROCESSOR_TYPE_LOAD_BALANCE
};
+ private static final ImmutableList<String> DISALLOWED_SINK_NAMES =
+ ImmutableList.of("source", "channel", "processor");
+
private static void validate(String name,
Map<String, String> properties) throws FlumeException {
@@ -158,6 +164,10 @@ private static void validate(String name,
checkRequired(properties, SINKS);
String sinkNames = properties.get(SINKS);
for(String sink : sinkNames.split("\\s+")) {
+ if(DISALLOWED_SINK_NAMES.contains(sink.toLowerCase())) {
+ throw new FlumeException("Sink name " + sink + " is one of the" +
+ " disallowed sink names: " + DISALLOWED_SINK_NAMES);
+ }
String key = join(sink, TYPE);
checkRequired(properties, key);
checkAllowed(ALLOWED_SINKS, properties.get(key));
@@ -182,7 +192,8 @@ private static void validate(String name,
// we are going to modify the properties as we parse the config
properties = new HashMap<String, String>(properties);
- if(!properties.containsKey(SOURCE_TYPE)) {
+ if(!properties.containsKey(SOURCE_TYPE) || SOURCE_TYPE_EMBEDDED_ALIAS.
+ equalsIgnoreCase(properties.get(SOURCE_TYPE))) {
properties.put(SOURCE_TYPE, SOURCE_TYPE_EMBEDDED);
}
String sinkNames = properties.remove(SINKS);
@@ -199,7 +210,6 @@ private static void validate(String name,
// user supplied config -> agent configuration
Map<String, String> result = Maps.newHashMap();
- Joiner joiner = Joiner.on(SEPERATOR);
// properties will be modified during iteration so we need a
// copy of the keys
Set<String> userProvidedKeys;
@@ -209,42 +219,40 @@ private static void validate(String name,
* source at the channel.
*/
// point agent at source
- result.put(joiner.
- join(name, BasicConfigurationConstants.CONFIG_SOURCES), sourceName);
+ result.put(join(name, BasicConfigurationConstants.CONFIG_SOURCES),
+ sourceName);
// point agent at channel
- result.put(joiner.
- join(name, BasicConfigurationConstants.CONFIG_CHANNELS), channelName);
- // point agent at source
- result.put(joiner.
- join(name, BasicConfigurationConstants.CONFIG_SINKS), sinkNames);
+ result.put(join(name, BasicConfigurationConstants.CONFIG_CHANNELS),
+ channelName);
+ // point agent at sinks
+ result.put(join(name, BasicConfigurationConstants.CONFIG_SINKS),
+ sinkNames);
// points the agent at the sinkgroup
- result.put(joiner.
- join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS),
+ result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS),
sinkGroupName);
// points the sinkgroup at the sinks
- result.put(joiner.
- join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
+ result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
sinkGroupName, SINKS), sinkNames);
// points the source at the channel
- result.put(joiner.join(name,
+ result.put(join(name,
BasicConfigurationConstants.CONFIG_SOURCES, sourceName,
BasicConfigurationConstants.CONFIG_CHANNELS), channelName);
/*
- * Second process the the sink configuration and point the sinks
+ * Second process the sink configuration and point the sinks
* at the channel.
*/
userProvidedKeys = new HashSet<String>(properties.keySet());
for(String sink : sinkNames.split("\\s+")) {
for(String key : userProvidedKeys) {
String value = properties.get(key);
- if(key.startsWith(sink)) {
+ if(key.startsWith(sink + SEPERATOR)) {
properties.remove(key);
- result.put(joiner.join(name,
+ result.put(join(name,
BasicConfigurationConstants.CONFIG_SINKS, key), value);
}
}
// point the sink at the channel
- result.put(joiner.join(name,
+ result.put(join(name,
BasicConfigurationConstants.CONFIG_SINKS, sink,
BasicConfigurationConstants.CONFIG_CHANNEL), channelName);
}
@@ -255,20 +263,19 @@ private static void validate(String name,
userProvidedKeys = new HashSet<String>(properties.keySet());
for(String key : userProvidedKeys) {
String value = properties.get(key);
- if(key.startsWith(SOURCE)) {
+ if(key.startsWith(SOURCE_PREFIX)) {
// users use `source' but agent needs the actual source name
- key = key.replace(SOURCE, sourceName);
- result.put(joiner.join(name,
+ key = key.replaceFirst(SOURCE, sourceName);
+ result.put(join(name,
BasicConfigurationConstants.CONFIG_SOURCES, key), value);
- } else if(key.startsWith(CHANNEL)) {
+ } else if(key.startsWith(CHANNEL_PREFIX)) {
// users use `channel' but agent needs the actual channel name
- key = key.replace(CHANNEL, channelName);
- result.put(joiner.join(name,
+ key = key.replaceFirst(CHANNEL, channelName);
+ result.put(join(name,
BasicConfigurationConstants.CONFIG_CHANNELS, key), value);
- } else if(key.startsWith(SINK_PROCESSOR)) {
+ } else if(key.startsWith(SINK_PROCESSOR_PREFIX)) {
// agent.sinkgroups.sinkgroup.processor.*
- result.put(joiner.
- join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
+ result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
sinkGroupName, key), value);
} else {
// XXX should we simply ignore this?
View
2  flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java
@@ -19,6 +19,6 @@
/**
* This package provides Flume users the ability to embed simple agents
* in applications. For specific and up to date information, please see
- * the Flume User Guide.
+ * the Flume Developer Guide.
*/
package org.apache.flume.agent.embedded;
View
42 ...d-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
@@ -47,10 +47,30 @@ public void setUp() throws Exception {
properties.put("sink2.port", "2");
properties.put("processor.type", "load_balance");
}
+
+
+ @Test
+ public void testFullSourceType() throws Exception {
+ doTestExcepted(EmbeddedAgentConfiguration.
+ configure("test1", properties));
+ }
+
@Test
- public void testBasic() throws Exception {
- Map<String, String> actual = EmbeddedAgentConfiguration.
- configure("test1", properties);
+ public void testMissingSourceType() throws Exception {
+ Assert.assertNotNull(properties.remove("source.type"));
+ doTestExcepted(EmbeddedAgentConfiguration.
+ configure("test1", properties));
+ }
+
+ @Test
+ public void testShortSourceType() throws Exception {
+ properties.put("source.type", "EMBEDDED");
+ doTestExcepted(EmbeddedAgentConfiguration.
+ configure("test1", properties));
+ }
+
+
+ public void doTestExcepted(Map<String, String> actual) throws Exception {
Map<String, String> expected = Maps.newHashMap();
expected.put("test1.channels", "channel-test1");
expected.put("test1.channels.channel-test1.capacity", "200");
@@ -71,7 +91,6 @@ public void testBasic() throws Exception {
expected.put("test1.sources.source-test1.channels", "channel-test1");
expected.put("test1.sources.source-test1.type", EmbeddedAgentConfiguration.
SOURCE_TYPE_EMBEDDED);
-
Assert.assertEquals(expected, actual);
}
@@ -116,4 +135,19 @@ public void testBadKey() throws Exception {
properties.put("bad.key.name", "bad");
EmbeddedAgentConfiguration.configure("test1", properties);
}
+ @Test(expected = FlumeException.class)
+ public void testSinkNamedLikeSource() throws Exception {
+ properties.put("sinks", "source");
+ EmbeddedAgentConfiguration.configure("test1", properties);
+ }
+ @Test(expected = FlumeException.class)
+ public void testSinkNamedLikeChannel() throws Exception {
+ properties.put("sinks", "channel");
+ EmbeddedAgentConfiguration.configure("test1", properties);
+ }
+ @Test(expected = FlumeException.class)
+ public void testSinkNamedLikeProcessor() throws Exception {
+ properties.put("sinks", "processor");
+ EmbeddedAgentConfiguration.configure("test1", properties);
+ }
}
View
1  ...-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
@@ -28,7 +28,6 @@
import org.apache.flume.Channel;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
import org.apache.flume.SinkRunner;
import org.apache.flume.SourceRunner;
import org.apache.flume.event.SimpleEvent;
Please sign in to comment.
Something went wrong with that request. Please try again.