Permalink
Browse files

FLUME-1630. Flume configuration code could be improved.

(Brock Noland via Hari Shreedharan)
  • Loading branch information...
1 parent 59d0329 commit 97ed09e6f8255ee99ebb27cd10ef11a90830db24 @harishreedharan harishreedharan committed Nov 30, 2012
Showing with 1,678 additions and 2,234 deletions.
  1. +2 −0 flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
  2. +2 −0 flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/JdbcChannel.java
  3. +2 −1 ...y-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java
  4. +24 −6 flume-ng-configuration/src/main/java/org/apache/flume/conf/FlumeConfiguration.java
  5. +2 −1 flume-ng-core/src/main/java/org/apache/flume/ChannelFactory.java
  6. +5 −0 flume-ng-core/src/main/java/org/apache/flume/Constants.java
  7. +2 −2 flume-ng-core/src/main/java/org/apache/flume/SinkFactory.java
  8. +2 −1 flume-ng-core/src/main/java/org/apache/flume/SourceFactory.java
  9. +7 −15 ...meNodeApplication.java → flume-ng-core/src/main/java/org/apache/flume/annotations/Disposable.java
  10. +8 −17 .../node/NodeManager.java → flume-ng-core/src/main/java/org/apache/flume/annotations/Recyclable.java
  11. +13 −70 flume-ng-core/src/main/java/org/apache/flume/channel/DefaultChannelFactory.java
  12. +2 −0 flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
  13. +15 −72 flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
  14. +15 −78 flume-ng-core/src/main/java/org/apache/flume/source/DefaultSourceFactory.java
  15. +2 −21 flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java
  16. +2 −20 flume-ng-core/src/test/java/org/apache/flume/source/TestDefaultSourceFactory.java
  17. +6 −0 flume-ng-node/pom.xml
  18. +0 −216 flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java
  19. +0 −434 ...e-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
  20. +379 −0 flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
  21. +233 −100 flume-ng-node/src/main/java/org/apache/flume/node/Application.java
  22. +5 −2 flume-ng-node/src/main/java/org/apache/flume/node/ConfigurationProvider.java
  23. +0 −110 flume-ng-node/src/main/java/org/apache/flume/node/FlumeNode.java
  24. +12 −6 ...ode/src/main/java/org/apache/flume/node/{NodeConfiguration.java → MaterializedConfiguration.java}
  25. +155 −0 flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java
  26. +220 −0 flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
  27. +23 −20 ...apache/flume/{conf/file/SimpleNodeConfiguration.java → node/SimpleMaterializedConfiguration.java}
  28. +0 −69 flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/AbstractLogicalNodeManager.java
  29. +0 −263 flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
  30. +0 −37 flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/NodeConfigurationAware.java
  31. +0 −82 ...-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java
  32. +193 −0 flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java
  33. +0 −233 flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
  34. +123 −0 flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java
  35. +0 −172 flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
  36. +0 −111 flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
  37. +101 −0 ...e-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java
  38. +111 −0 flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java
  39. +0 −42 flume-ng-node/src/test/java/org/apache/flume/source/FlakeySequenceGeneratorSource.java
  40. +12 −33 flume-ng-node/src/test/resources/flume-conf.properties
@@ -32,6 +32,7 @@
import org.apache.flume.Event;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.annotations.Disposable;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.channel.file.Log.Builder;
@@ -67,6 +68,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
+@Disposable
public class FileChannel extends BasicChannelSemantics {
private static final Logger LOG = LoggerFactory
@@ -23,6 +23,7 @@
import org.apache.flume.Transaction;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.annotations.Disposable;
import org.apache.flume.channel.AbstractChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
+@Disposable
public class JdbcChannel extends AbstractChannel {
private static final Logger LOG = LoggerFactory.getLogger(JdbcChannel.class);
@@ -33,6 +33,7 @@
import org.apache.flume.Transaction;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.annotations.Disposable;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.channel.MemoryChannel;
@@ -56,7 +57,7 @@
* {@link org.apache.flume.channel.file.FileChannel}, which gives better
* performance and is also durable.
*/
-
+@Disposable
@Deprecated
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -19,7 +19,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -66,18 +65,37 @@
public static final String NEWLINE = System.getProperty("line.separator",
"\n");
public static final String INDENTSTEP = " ";
-
/**
* Creates a populated Flume Configuration object.
+ * @deprecated please use the other constructor
*/
+ @Deprecated
public FlumeConfiguration(Properties properties) {
agentConfigMap = new HashMap<String, AgentConfiguration>();
errors = new LinkedList<FlumeConfigurationError>();
// Construct the in-memory component hierarchy
- Enumeration<?> propertyNames = properties.propertyNames();
- while (propertyNames.hasMoreElements()) {
- String name = (String) propertyNames.nextElement();
- String value = properties.getProperty(name);
+ for(Object name : properties.keySet()) {
+ Object value = properties.get(name);
+ if (!addRawProperty(name.toString(), value.toString())) {
+ logger.warn("Configuration property ignored: " + name + " = " + value);
+ }
+ }
+ // Now iterate thru the agentContext and create agent configs and add them
+ // to agentConfigMap
+
+ // validate and remove improperly configured components
+ validateConfiguration();
+
+ }
+ /**
+ * Creates a populated Flume Configuration object.
+ */
+ public FlumeConfiguration(Map<String, String> properties) {
+ agentConfigMap = new HashMap<String, AgentConfiguration>();
+ errors = new LinkedList<FlumeConfigurationError>();
+ // Construct the in-memory component hierarchy
+ for(String name : properties.keySet()) {
+ String value = properties.get(name);
if (!addRawProperty(name, value)) {
logger.warn("Configuration property ignored: " + name + " = " + value);
@@ -22,6 +22,7 @@
public Channel create(String name, String type) throws FlumeException;
- public boolean unregister(Channel channel);
+ public Class<? extends Channel> getClass(String type)
+ throws FlumeException;
}
@@ -20,6 +20,11 @@
public final class Constants {
+ /**
+ * Disables the fail-fast startup behavior. This would be used in the
+ * scenario where the agent is expected to start, but the config
+ * file be populated at a later point in time.
+ */
public static final String SYSPROP_CALLED_FROM_SERVICE
= "flume.called.from.service";
@@ -25,6 +25,6 @@
public Sink create(String name, String type)
throws FlumeException;
- public boolean unregister(Sink sink);
-
+ public Class<? extends Sink> getClass(String type)
+ throws FlumeException;
}
@@ -25,5 +25,6 @@
public Source create(String sourceName, String type)
throws FlumeException;
- public boolean unregister(Source source);
+ public Class<? extends Source> getClass(String type)
+ throws FlumeException;
}
@@ -17,19 +17,11 @@
* under the License.
*/
-package org.apache.flume.node;
+package org.apache.flume.annotations;
+import java.lang.annotation.Target;
+import java.lang.annotation.Retention;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.lang.annotation.ElementType.TYPE;
-import org.junit.Ignore;
-import org.junit.Test;
-
-@Ignore("Causes blocking with no method for clean shutdown")
-public class TestFlumeNodeApplication {
-
- @Test
- public void testApplication() {
- String[] args = new String[] {};
-
- Application.main(args);
- }
-
-}
+@Target({ TYPE }) @Retention(RUNTIME)
+public @interface Disposable {}
@@ -17,20 +17,11 @@
* under the License.
*/
-package org.apache.flume.node;
-
-import java.util.Set;
-
-import org.apache.flume.lifecycle.LifecycleAware;
-
-public interface NodeManager extends LifecycleAware {
-
- public boolean add(LifecycleAware node);
-
- public boolean remove(LifecycleAware node);
-
- public Set<LifecycleAware> getNodes();
-
- public void setNodes(Set<LifecycleAware> nodes);
-
-}
+package org.apache.flume.annotations;
+import java.lang.annotation.Target;
+import java.lang.annotation.Retention;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.lang.annotation.ElementType.TYPE;
+
+@Target({ TYPE }) @Retention(RUNTIME)
+public @interface Recyclable {}
@@ -19,9 +19,6 @@
package org.apache.flume.channel;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.flume.Channel;
import org.apache.flume.ChannelFactory;
import org.apache.flume.FlumeException;
@@ -36,93 +33,39 @@
private static final Logger logger = LoggerFactory
.getLogger(DefaultChannelFactory.class);
- private Map<Class<?>, Map<String, Channel>> channels;
-
- public DefaultChannelFactory() {
- channels = new HashMap<Class<?>, Map<String, Channel>>();
- }
-
@Override
- public boolean unregister(Channel channel) {
- Preconditions.checkNotNull(channel);
- logger.info("Unregister channel {}", channel);
- boolean removed = false;
-
- Map<String, Channel> channelMap = channels.get(channel.getClass());
- if (channelMap != null) {
- removed = (channelMap.remove(channel.getName()) != null);
-
- if (channelMap.size() == 0) {
- channels.remove(channel.getClass());
- }
+ public Channel create(String name, String type) throws FlumeException {
+ Preconditions.checkNotNull(name, "name");
+ Preconditions.checkNotNull(type, "type");
+ logger.info("Creating instance of channel {} type {}", name, type);
+ Class<? extends Channel> channelClass = getClass(type);
+ try {
+ return channelClass.newInstance();
+ } catch (Exception ex) {
+ throw new FlumeException("Unable to create channel: " + name
+ + ", type: " + type + ", class: " + channelClass.getName(), ex);
}
-
- return removed;
}
@SuppressWarnings("unchecked")
@Override
- public Channel create(String name, String type) throws FlumeException {
- Preconditions.checkNotNull(name);
- Preconditions.checkNotNull(type);
- logger.debug("Creating instance of channel {} type {}", name, type);
-
+ public Class<? extends Channel> getClass(String type)
+ throws FlumeException {
String channelClassName = type;
-
ChannelType channelType = ChannelType.OTHER;
try {
channelType = ChannelType.valueOf(type.toUpperCase());
} catch (IllegalArgumentException ex) {
logger.debug("Channel type {} is a custom type", type);
}
-
if (!channelType.equals(ChannelType.OTHER)) {
channelClassName = channelType.getChannelClassName();
}
-
- Class<? extends Channel> channelClass = null;
try {
- channelClass = (Class<? extends Channel>) Class.forName(channelClassName);
+ return (Class<? extends Channel>) Class.forName(channelClassName);
} catch (Exception ex) {
throw new FlumeException("Unable to load channel type: " + type
+ ", class: " + channelClassName, ex);
}
-
- Map<String, Channel> channelMap = channels.get(channelClass);
- if (channelMap == null) {
- channelMap = new HashMap<String, Channel>();
- channels.put(channelClass, channelMap);
- }
-
- Channel channel = channelMap.get(name);
-
- if (channel == null) {
- try {
- channel = channelClass.newInstance();
- channel.setName(name);
- channelMap.put(name, channel);
- } catch (Exception ex) {
- // Clean up channel map
- channels.remove(channelClass);
- throw new FlumeException("Unable to create channel: " + name
- + ", type: " + type + ", class: " + channelClassName, ex);
- }
- }
-
- return channel;
- }
-
- public Map<Class<?>, Map<String, Channel>> getRegistryClone() {
- Map<Class<?>, Map<String, Channel>> result =
- new HashMap<Class<?>, Map<String, Channel>>();
-
- for (Class<?> klass : channels.keySet()) {
- Map<String, Channel> channelMap = channels.get(klass);
- Map<String, Channel> resultChannelMap = new HashMap<String, Channel>();
- resultChannelMap.putAll(channelMap);
- result.put(klass, resultChannelMap);
- }
-
- return result;
}
}
@@ -29,6 +29,7 @@
import org.apache.flume.Event;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.annotations.Recyclable;
import org.apache.flume.instrumentation.ChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
+@Recyclable
public class MemoryChannel extends BasicChannelSemantics {
private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
private static final Integer defaultCapacity = 100;
Oops, something went wrong.

0 comments on commit 97ed09e

Please sign in to comment.