Skip to content

Commit

Permalink
Split raw CEF and syslog CEF inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jochen Schalanda committed Sep 18, 2017
1 parent f4fd32b commit 85239c6
Show file tree
Hide file tree
Showing 11 changed files with 402 additions and 74 deletions.
25 changes: 10 additions & 15 deletions src/main/java/org/graylog/plugins/cef/CEFInputModule.java
Expand Up @@ -4,45 +4,40 @@
import com.google.inject.TypeLiteral; import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder; import com.google.inject.multibindings.MapBinder;
import org.graylog.plugins.cef.codec.CEFCodec; import org.graylog.plugins.cef.codec.CEFCodec;
import org.graylog.plugins.cef.codec.CEFSyslogCodec;
import org.graylog.plugins.cef.input.CEFSyslogTCPInput;
import org.graylog.plugins.cef.input.CEFSyslogUDPInput;
import org.graylog.plugins.cef.input.CEFTCPInput; import org.graylog.plugins.cef.input.CEFTCPInput;
import org.graylog.plugins.cef.input.CEFUDPInput; import org.graylog.plugins.cef.input.CEFUDPInput;
import org.graylog.plugins.cef.pipelines.rules.CEFParserFunction; import org.graylog.plugins.cef.pipelines.rules.CEFParserFunction;
import org.graylog.plugins.pipelineprocessor.ast.functions.Function; import org.graylog.plugins.pipelineprocessor.ast.functions.Function;
import org.graylog2.plugin.PluginConfigBean;
import org.graylog2.plugin.PluginModule; import org.graylog2.plugin.PluginModule;


import java.util.Collections;
import java.util.Set;

public class CEFInputModule extends PluginModule { public class CEFInputModule extends PluginModule {

@Override
public Set<? extends PluginConfigBean> getConfigBeans() {
return Collections.emptySet();
}

@Override @Override
protected void configure() { protected void configure() {
// Register message input. // Register message input.
addCodec(CEFCodec.NAME, CEFCodec.class); addCodec(CEFCodec.NAME, CEFCodec.class);
addCodec(CEFSyslogCodec.NAME, CEFSyslogCodec.class);

addMessageInput(CEFUDPInput.class); addMessageInput(CEFUDPInput.class);
addMessageInput(CEFTCPInput.class); addMessageInput(CEFTCPInput.class);
addMessageInput(CEFSyslogUDPInput.class);
addMessageInput(CEFSyslogTCPInput.class);


// Register pipeline function. // Register pipeline function.
addMessageProcessorFunction(CEFParserFunction.NAME, CEFParserFunction.class); addMessageProcessorFunction(CEFParserFunction.NAME, CEFParserFunction.class);
} }


protected void addMessageProcessorFunction(String name, Class<? extends Function<?>> functionClass) { private void addMessageProcessorFunction(String name, Class<? extends Function<?>> functionClass) {
addMessageProcessorFunction(binder(), name, functionClass); addMessageProcessorFunction(binder(), name, functionClass);
} }


public static MapBinder<String, Function<?>> processorFunctionBinder(Binder binder) { private MapBinder<String, Function<?>> processorFunctionBinder(Binder binder) {
return MapBinder.newMapBinder(binder, TypeLiteral.get(String.class), new TypeLiteral<Function<?>>() {}); return MapBinder.newMapBinder(binder, TypeLiteral.get(String.class), new TypeLiteral<Function<?>>() {});
} }


public static void addMessageProcessorFunction(Binder binder, String name, Class<? extends Function<?>> functionClass) { private void addMessageProcessorFunction(Binder binder, String name, Class<? extends Function<?>> functionClass) {
processorFunctionBinder(binder).addBinding(name).to(functionClass); processorFunctionBinder(binder).addBinding(name).to(functionClass);

} }

} }
7 changes: 2 additions & 5 deletions src/main/java/org/graylog/plugins/cef/CEFInputPlugin.java
Expand Up @@ -7,17 +7,14 @@
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;


/**
* Implement the Plugin interface here.
*/
public class CEFInputPlugin implements Plugin { public class CEFInputPlugin implements Plugin {
@Override @Override
public PluginMetaData metadata() { public PluginMetaData metadata() {
return new CEFInputMetaData(); return new CEFInputMetaData();
} }


@Override @Override
public Collection<PluginModule> modules () { public Collection<PluginModule> modules() {
return Collections.<PluginModule>singletonList(new CEFInputModule()); return Collections.singleton(new CEFInputModule());
} }
} }
94 changes: 94 additions & 0 deletions src/main/java/org/graylog/plugins/cef/codec/BaseCEFCodec.java
@@ -0,0 +1,94 @@
package org.graylog.plugins.cef.codec;

import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import org.graylog.plugins.cef.parser.CEFMessage;
import org.graylog2.plugin.ResolvableInetSocketAddress;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.journal.RawMessage;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.InetSocketAddress;

public abstract class BaseCEFCodec implements Codec {
private static final Logger LOG = LoggerFactory.getLogger(BaseCEFCodec.class);

private static final String CK_TIMEZONE = "timezone";

protected final Configuration configuration;

@AssistedInject
public BaseCEFCodec(@Assisted Configuration configuration) {
this.configuration = configuration;

DateTimeZone timezone;
try {
timezone = DateTimeZone.forID(configuration.getString(CK_TIMEZONE));
} catch (Exception e) {
LOG.warn("Could not configure CEF input timezone. Falling back to local default. Please check the error message:", e);
timezone = DateTimeZone.getDefault();
}
}

protected String buildMessageSummary(CEFMessage cef) {
return cef.deviceProduct() + ": [" + cef.deviceEventClassId() + ", " + cef.humanReadableSeverity() + "] " + cef.name();
}

protected String decideSource(CEFMessage cef, RawMessage raw) {
if (cef.fields() != null && cef.fields().containsKey("dvc")) {
String dvc = (String) cef.fields().get("dvc");
if (!dvc.isEmpty()) {
return dvc;
}
}

// Use raw message source information if we were not able to parse a source from the CEF extensions.
final ResolvableInetSocketAddress address = raw.getRemoteAddress();
final InetSocketAddress remoteAddress;
if (address == null) {
remoteAddress = null;
} else {
remoteAddress = address.getInetSocketAddress();
}

return remoteAddress == null ? "unknown" : remoteAddress.getAddress().toString();
}

@Nullable
@Override
public CodecAggregator getAggregator() {
return null;
}

@ConfigClass
public static class Config implements Codec.Config {
@Override
public ConfigurationRequest getRequestedConfiguration() {
ConfigurationRequest cr = new ConfigurationRequest();

cr.addField(new TextField(
CK_TIMEZONE,
"Timezone",
DateTimeZone.getDefault().getID(),
"Timezone of the timestamps in the CEF messages we'l receive. Set this to the local timezone if in doubt. (CEF messages do not include timezone information) Format example: +01:00 or America/Chicago",
ConfigurationField.Optional.NOT_OPTIONAL
));

return cr;
}

@Override
public void overrideDefaultValues(@Nonnull ConfigurationRequest cr) {
}
}
}
51 changes: 8 additions & 43 deletions src/main/java/org/graylog/plugins/cef/codec/CEFCodec.java
Expand Up @@ -4,9 +4,8 @@
import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject; import com.google.inject.assistedinject.AssistedInject;
import org.graylog.plugins.cef.parser.CEFMessage; import org.graylog.plugins.cef.parser.CEFMessage;
import org.graylog.plugins.cef.parser.SyslogCEFParser; import org.graylog.plugins.cef.parser.CEFParser;
import org.graylog2.plugin.Message; import org.graylog2.plugin.Message;
import org.graylog2.plugin.ResolvableInetSocketAddress;
import org.graylog2.plugin.configuration.Configuration; import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest; import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField; import org.graylog2.plugin.configuration.fields.ConfigurationField;
Expand All @@ -22,32 +21,28 @@


import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.net.InetSocketAddress;

public class CEFCodec implements Codec {


public class CEFCodec extends BaseCEFCodec {
public static final String NAME = "CEF"; public static final String NAME = "CEF";


private static final Logger LOG = LoggerFactory.getLogger(CEFCodec.class); private static final Logger LOG = LoggerFactory.getLogger(CEFCodec.class);

private static final String CK_TIMEZONE = "timezone"; private static final String CK_TIMEZONE = "timezone";


private final Configuration configuration; private final CEFParser parser;
private final SyslogCEFParser parser;


@AssistedInject @AssistedInject
public CEFCodec(@Assisted Configuration configuration) { public CEFCodec(@Assisted Configuration configuration) {
this.configuration = configuration; super(configuration);


DateTimeZone timezone; DateTimeZone timezone;
try { try {
timezone = DateTimeZone.forID(configuration.getString(CK_TIMEZONE)); timezone = DateTimeZone.forID(configuration.getString(CK_TIMEZONE));
} catch(Exception e) { } catch (Exception e) {
LOG.warn("Could not configure CEF input timezone. Falling back to local default. Please check the error message:", e); LOG.warn("Could not configure CEF input timezone. Falling back to local default. Please check the error message:", e);
timezone = DateTimeZone.getDefault(); timezone = DateTimeZone.getDefault();
} }


this.parser = new SyslogCEFParser(timezone); this.parser = new CEFParser();
} }


@Nullable @Nullable
Expand All @@ -56,7 +51,7 @@ public Message decode(@Nonnull RawMessage rawMessage) {
try { try {
// CEF standard says all messages are UTF-8 so I trust that. // CEF standard says all messages are UTF-8 so I trust that.
String s = new String(rawMessage.getPayload(), Charsets.UTF_8); String s = new String(rawMessage.getPayload(), Charsets.UTF_8);
CEFMessage cef = parser.parse(s); CEFMessage cef = parser.parse(s).build();


// Build standard message. // Build standard message.
Message result = new Message(buildMessageSummary(cef), decideSource(cef, rawMessage), cef.timestamp()); Message result = new Message(buildMessageSummary(cef), decideSource(cef, rawMessage), cef.timestamp());
Expand All @@ -77,41 +72,11 @@ public Message decode(@Nonnull RawMessage rawMessage) {
result.addField("msg", cef.message()); result.addField("msg", cef.message());


return result; return result;
} catch(Exception e) { } catch (Exception e) {
throw new RuntimeException("Could not decode CEF message.", e); throw new RuntimeException("Could not decode CEF message.", e);
} }
} }


private String buildMessageSummary(CEFMessage cef) {
return new StringBuilder()
.append(cef.deviceProduct())
.append(": ")
.append("[").append(cef.deviceEventClassId()).append(", ")
.append(cef.humanReadableSeverity()).append("] ")
.append(cef.name())
.toString();
}

private String decideSource(CEFMessage cef, RawMessage raw) {
if(cef.fields() != null && cef.fields().containsKey("dvc")) {
String dvc = (String) cef.fields().get("dvc");
if (!dvc.isEmpty()) {
return dvc;
}
}

// Use raw message source information if we were not able to parse a source from the CEF extensions.
final ResolvableInetSocketAddress address = raw.getRemoteAddress();
final InetSocketAddress remoteAddress;
if (address == null) {
remoteAddress = null;
} else {
remoteAddress = address.getInetSocketAddress();
}

return remoteAddress == null ? "unknown" : remoteAddress.getAddress().toString();
}

@Nullable @Nullable
@Override @Override
public CodecAggregator getAggregator() { public CodecAggregator getAggregator() {
Expand Down

0 comments on commit 85239c6

Please sign in to comment.