Permalink
Browse files

output plugin system now available

relates #SERVER-131
  • Loading branch information...
1 parent d58ffc3 commit 9a498a278afa1918983c3f81295d47a6c8e6b304 @lennartkoopmann lennartkoopmann committed Oct 30, 2012
View
@@ -15,7 +15,7 @@ nbactions.xml
misc/elasticsearch.yml.2
misc/convert_rsyslog_db_to_mongo.rb
-plugin/filters/*
+plugin/*/*.jar
build_script/builds/*
build_script/logs/*
No changes.
No changes.
View
@@ -131,7 +131,7 @@
<dependency>
<groupId>org.graylog2</groupId>
<artifactId>graylog2-plugin</artifactId>
- <version>0.9.7.4</version>
+ <version>0.9.7.6</version>
</dependency>
<dependency>
<groupId>com.googlecode.disruptor</groupId>
@@ -33,7 +33,7 @@
import org.graylog2.initializers.Initializer;
import org.graylog2.inputs.MessageInput;
import org.graylog2.gelf.GELFChunkManager;
-import org.graylog2.outputs.MessageOutput;
+import org.graylog2.plugin.outputs.MessageOutput;
import org.graylog2.streams.StreamCache;
import com.google.common.collect.Lists;
@@ -86,7 +86,7 @@
private List<Initializer> initializers = Lists.newArrayList();
private List<MessageInput> inputs = Lists.newArrayList();
private List<MessageFilter> filters = Lists.newArrayList();
- private List<Class<? extends MessageOutput>> outputs = Lists.newArrayList();
+ private List<MessageOutput> outputs = Lists.newArrayList();
private int loadedFilterPlugins = 0;
@@ -168,8 +168,8 @@ public void registerFilter(MessageFilter filter) {
this.filters.add(filter);
}
- public <T extends MessageOutput> void registerOutput(Class<T> klazz) {
- this.outputs.add(klazz);
+ public void registerOutput(MessageOutput output) {
+ this.outputs.add(output);
}
@Override
@@ -204,7 +204,9 @@ public void run() {
.build()
);
- loadPlugins();
+ // Load and register plugins.
+ loadPlugins(MessageFilter.class, "filters");
+ loadPlugins(MessageOutput.class, "outputs");
// Call all registered initializers.
for (Initializer initializer : this.initializers) {
@@ -227,12 +229,18 @@ public void run() {
}
- private void loadPlugins() {
- PluginLoader pl = new PluginLoader(configuration.getPluginDir());
- for (MessageFilter filter : pl.loadFilterPlugins()) {
- LOG.info("Registering plugin filter [" + filter.getClass().getSimpleName() + "].");
- registerFilter(filter);
- this.loadedFilterPlugins += 1;
+ private <A> void loadPlugins(Class<A> type, String subDirectory) {
+ PluginLoader<A> pl = new PluginLoader(configuration.getPluginDir(), subDirectory, type);
+ for (A plugin : pl.getPlugins()) {
+ LOG.info("Registering <" + type.getSimpleName() + "> plugin [" + plugin.getClass().getCanonicalName() + "].");
+
+ if (plugin instanceof MessageFilter) {
+ registerFilter((MessageFilter) plugin);
+ } else if (plugin instanceof MessageOutput) {
+ registerOutput((MessageOutput) plugin);
+ } else {
+ LOG.error("Could not load plugin [" + plugin.getClass().getCanonicalName() + "] - Not supported type.");
+ }
}
}
@@ -282,20 +290,22 @@ public Buffer getOutputBuffer() {
return this.outputBuffer;
}
+ public List<MessageInput> getInputs() {
+ return this.inputs;
+ }
+
public List<MessageFilter> getFilters() {
return this.filters;
}
- public List<Class<? extends MessageOutput>> getOutputs() {
+ public List<MessageOutput> getOutputs() {
return this.outputs;
}
-
- @Override
+
public MessageCounterManagerImpl getMessageCounterManager() {
return this.messageCounterManager;
}
- @Override
public HostCounterCacheImpl getHostCounterCache() {
return this.hostCounterCache;
}
@@ -322,10 +332,6 @@ public String getServerId() {
return this.serverId;
}
- public int getLoadedFilterPlugins() {
- return this.loadedFilterPlugins;
- }
-
public void setLocalMode(boolean mode) {
this.localMode = mode;
}
@@ -172,7 +172,7 @@ public static void main(String[] args) {
server.registerFilter(new CounterUpdateFilter());
// Register outputs.
- server.registerOutput(ElasticSearchOutput.class);
+ server.registerOutput(new ElasticSearchOutput());
// Blocks until we shut down.
server.run();
@@ -28,7 +28,7 @@
import org.apache.log4j.Logger;
import org.graylog2.Core;
import org.graylog2.buffers.LogMessageEvent;
-import org.graylog2.outputs.MessageOutput;
+import org.graylog2.plugin.outputs.MessageOutput;
import org.graylog2.plugin.logmessage.LogMessage;
import java.util.List;
@@ -73,23 +73,20 @@ public void onEvent(LogMessageEvent event, long sequence, boolean endOfBatch) th
buffer.add(msg);
if (endOfBatch || buffer.size() >= server.getConfiguration().getOutputBatchSize()) {
- for (Class<? extends MessageOutput> outputType : server.getOutputs()) {
+ for (MessageOutput output : server.getOutputs()) {
try {
- // Always create a new instance of this filter.
- MessageOutput output = outputType.newInstance();
-
if (LOG.isDebugEnabled()) {
- LOG.debug("Writing message batch to [" + outputType.getSimpleName() + "]. Size <" + buffer.size() + ">");
+ LOG.debug("Writing message batch to [" + output.getName() + "]. Size <" + buffer.size() + ">");
}
batchSize.update(buffer.size());
output.write(buffer, server);
} catch (Exception e) {
- LOG.error("Could not write message batch to output [" + outputType.getSimpleName() +"].", e);
- } finally {
- buffer.clear();
+ LOG.error("Could not write message batch to output [" + output.getName() +"].", e);
}
}
+
+ buffer.clear();
}
if (LOG.isDebugEnabled()) {
@@ -30,6 +30,7 @@
import org.graylog2.plugin.streams.Stream;
import java.util.concurrent.TimeUnit;
+import org.graylog2.Core;
/**
* @author Lennart Koopmann <lennart@socketfeed.com>
*/
@@ -39,10 +40,11 @@
@Override
public boolean filter(LogMessage msg, GraylogServer server) {
+ Core serverImpl = (Core) server;
TimerContext tcx = processTime.time();
// Increment all registered message counters.
- for (MessageCounter counter : server.getMessageCounterManager().getAllCounters().values()) {
+ for (MessageCounter counter : serverImpl.getMessageCounterManager().getAllCounters().values()) {
// Five second throughput for health page.
counter.incrementThroughput();
@@ -59,7 +61,7 @@ public boolean filter(LogMessage msg, GraylogServer server) {
}
// Update hostcounters. Used to build hosts connection.
- server.getHostCounterCache().increment(msg.getHost());
+ serverImpl.getHostCounterCache().increment(msg.getHost());
tcx.stop();
@@ -29,6 +29,7 @@
import org.graylog2.plugin.logmessage.LogMessage;
import java.util.concurrent.TimeUnit;
+import org.graylog2.Core;
/**
* @author Lennart Koopmann <lennart@socketfeed.com>
@@ -40,10 +41,11 @@
@Override
public boolean filter(LogMessage msg, GraylogServer server) {
+ Core serverImpl = (Core) server;
TimerContext tcx = processTime.time();
- if (server.getRulesEngine() != null) {
- server.getRulesEngine().evaluate(msg);
+ if (serverImpl.getRulesEngine() != null) {
+ serverImpl.getRulesEngine().evaluate(msg);
}
tcx.stop();
@@ -20,15 +20,17 @@
package org.graylog2.outputs;
+import org.graylog2.plugin.outputs.MessageOutput;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;
-import org.graylog2.Core;
import org.graylog2.plugin.logmessage.LogMessage;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.graylog2.Core;
+import org.graylog2.plugin.GraylogServer;
/**
* @author Lennart Koopmann <lennart@socketfeed.com>
@@ -38,15 +40,21 @@
private final Meter writes = Metrics.newMeter(ElasticSearchOutput.class, "Writes", "messages", TimeUnit.SECONDS);
private final Timer processTime = Metrics.newTimer(ElasticSearchOutput.class, "ProcessTimeMilliseconds", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+ private static final String NAME = "ElasticSearchOutput";
+
@Override
- public void write(List<LogMessage> messages, Core server) throws Exception {
+ public void write(List<LogMessage> messages, GraylogServer server) throws Exception {
+ Core serverImpl = (Core) server;
+
writes.mark();
TimerContext tcx = processTime.time();
- server.getIndexer().bulkIndex(messages);
+ serverImpl.getIndexer().bulkIndex(messages);
tcx.stop();
}
-
+ public String getName() {
+ return NAME;
+ }
}
@@ -1,34 +0,0 @@
-/**
- * Copyright 2012 Lennart Koopmann <lennart@socketfeed.com>
- *
- * This file is part of Graylog2.
- *
- * Graylog2 is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * Graylog2 is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Graylog2. If not, see <http://www.gnu.org/licenses/>.
- *
- */
-
-package org.graylog2.outputs;
-
-import java.util.List;
-import org.graylog2.Core;
-import org.graylog2.plugin.logmessage.LogMessage;
-
-/**
- * @author Lennart Koopmann <lennart@socketfeed.com>
- */
-public interface MessageOutput {
-
- void write(List<LogMessage> msg, Core server) throws Exception;
-
-}
@@ -1,13 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-package org.graylog2.plugins;
-
-/**
- *
- * @author lennart.koopmann
- */
-public class FilterPluginLoader {
-
-}
@@ -28,38 +28,37 @@
import java.util.Arrays;
import java.util.List;
import org.apache.log4j.Logger;
-import org.graylog2.Core;
-import org.graylog2.Main;
-import org.graylog2.plugin.filters.MessageFilter;
/**
* @author Lennart Koopmann <lennart@socketfeed.com>
*/
-public class PluginLoader {
+public class PluginLoader<A> {
private static final Logger LOG = Logger.getLogger(PluginLoader.class);
+
+ private final String baseDirectory;
+ private final String subDirectory;
+ private final Class<A> type;
- public static final String FILTER_DIR = "filters";
-
- private String baseDirectory;
-
- public PluginLoader(String baseDirectory) {
+ public PluginLoader(String baseDirectory, String subDirectory, Class<A> clazz) {
this.baseDirectory = baseDirectory;
+ this.subDirectory = subDirectory;
+ this.type = clazz;
}
- public List<MessageFilter> loadFilterPlugins() {
- List<MessageFilter> filters = Lists.newArrayList();
+ public List<A> getPlugins() {
+ List<A> filters = Lists.newArrayList();
// Load all plugin jars.
- for (File jarPath : getAllJars(FILTER_DIR)) {
+ for (File jarPath : getAllJars(subDirectory)) {
try {
ClassLoader loader = URLClassLoader.newInstance(
new URL[] { jarPath.toURI().toURL() },
getClass().getClassLoader()
);
Class<?> clazz = Class.forName(getClassNameFromJarName(jarPath.getName()), true, loader);
- filters.add(clazz.asSubclass(MessageFilter.class).newInstance());
+ filters.add(clazz.asSubclass(type).newInstance());
} catch (Exception ex) {
LOG.error("Could not load plugin <" + jarPath.getAbsolutePath() + ">", ex);
continue;
@@ -62,7 +62,9 @@ public AnonymousInformationCollector(Core server) {
private Map<String, Integer> numberOfLoadedPlugins() {
try {
Map<String, Integer> plugins = Maps.newHashMap();
- plugins.put("filters", server.getLoadedFilterPlugins());
+ plugins.put("inputs", server.getInputs().size());
+ plugins.put("filters", server.getFilters().size());
+ plugins.put("outputs", server.getOutputs().size());
return plugins;
} catch (Exception e) {

0 comments on commit 9a498a2

Please sign in to comment.