From 20b4e036d9884c5c17b41ac5f1e34469864a7a5f Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Wed, 9 Aug 2017 19:48:30 +0200 Subject: [PATCH 01/14] Refactor CloudWatch related code - Create a generic Kinesis transport and reader that can be used by different inputs - Create an abstract CloudWatch log data codec - Rename some flow log classes to more generic names --- .../aws/cloudwatch/CloudWatchLogData.java | 10 ++ .../aws/cloudwatch/CloudWatchLogEvent.java | 20 +++ .../FlowLogMessage.java | 31 +++-- .../CloudWatchFlowLogCodec.java} | 34 ++--- .../inputs/codecs/CloudWatchLogDataCodec.java | 60 +++++++++ .../aws/inputs/flowlogs/FlowLogReader.java | 117 ------------------ .../aws/inputs/flowlogs/FlowLogsInput.java | 10 +- .../flowlogs/json/FlowLogKinesisEvent.java | 9 -- .../aws/inputs/flowlogs/json/RawFlowLog.java | 8 -- .../KinesisTransport.java} | 52 +++++--- .../graylog/aws/kinesis/KinesisConsumer.java | 117 ++++++++++++++++++ .../org/graylog/aws/plugin/AWSModule.java | 14 +-- .../inputs/flowlogs/FlowLogMessageTest.java | 79 ++++++------ 13 files changed, 325 insertions(+), 236 deletions(-) create mode 100644 src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogData.java create mode 100644 src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogEvent.java rename src/main/java/org/graylog/aws/{inputs/flowlogs => cloudwatch}/FlowLogMessage.java (84%) rename src/main/java/org/graylog/aws/inputs/{flowlogs/FlowLogCodec.java => codecs/CloudWatchFlowLogCodec.java} (77%) create mode 100644 src/main/java/org/graylog/aws/inputs/codecs/CloudWatchLogDataCodec.java delete mode 100644 src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogReader.java delete mode 100644 src/main/java/org/graylog/aws/inputs/flowlogs/json/FlowLogKinesisEvent.java delete mode 100644 src/main/java/org/graylog/aws/inputs/flowlogs/json/RawFlowLog.java rename src/main/java/org/graylog/aws/inputs/{flowlogs/FlowLogTransport.java => transports/KinesisTransport.java} (73%) create mode 100644 src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java diff --git a/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogData.java b/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogData.java new file mode 100644 index 00000000..9ae3abe2 --- /dev/null +++ b/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogData.java @@ -0,0 +1,10 @@ +package org.graylog.aws.cloudwatch; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +public class CloudWatchLogData { + @JsonProperty("logEvents") + public List logEvents; +} diff --git a/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogEvent.java b/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogEvent.java new file mode 100644 index 00000000..31f68d6d --- /dev/null +++ b/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogEvent.java @@ -0,0 +1,20 @@ +package org.graylog.aws.cloudwatch; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.MoreObjects; + +public class CloudWatchLogEvent { + @JsonProperty("timestamp") + public long timestamp; + + @JsonProperty("message") + public String message; + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("timestamp", timestamp) + .add("message", message) + .toString(); + } +} diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogMessage.java b/src/main/java/org/graylog/aws/cloudwatch/FlowLogMessage.java similarity index 84% rename from src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogMessage.java rename to src/main/java/org/graylog/aws/cloudwatch/FlowLogMessage.java index d1a7b79b..bcf873a1 100644 --- a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogMessage.java +++ b/src/main/java/org/graylog/aws/cloudwatch/FlowLogMessage.java @@ -1,9 +1,11 @@ -package org.graylog.aws.inputs.flowlogs; +package org.graylog.aws.cloudwatch; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + public class FlowLogMessage { private static final Logger LOG = LoggerFactory.getLogger(FlowLogMessage.class); @@ -55,26 +57,31 @@ public FlowLogMessage(DateTime timestamp, this.logStatus = logStatus; } - public static FlowLogMessage fromParts(String[] parts) { - if(parts == null || parts.length != 15) { - throw new IllegalArgumentException("Message parts were null or not length of 15"); + @Nullable + public static FlowLogMessage fromLogEvent(final CloudWatchLogEvent logEvent) { + final String[] parts = logEvent.message.split(" "); + + if (parts.length != 14) { + LOG.warn("Received FlowLog message with not exactly 14 fields. Skipping. Message was: [{}]", logEvent.message); + return null; } + return new FlowLogMessage( - new DateTime(Long.valueOf(parts[0])), - safeInteger(parts[1]), + new DateTime(Long.valueOf(logEvent.timestamp)), + safeInteger(parts[0]), + parts[1], parts[2], parts[3], parts[4], - parts[5], + safeInteger(parts[5]), safeInteger(parts[6]), safeInteger(parts[7]), - safeInteger(parts[8]), + safeLong(parts[8]), safeLong(parts[9]), - safeLong(parts[10]), + new DateTime(Long.valueOf(parts[10])*1000), new DateTime(Long.valueOf(parts[11])*1000), - new DateTime(Long.valueOf(parts[12])*1000), - parts[13], - parts[14] + parts[12], + parts[13] ); } diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogCodec.java b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchFlowLogCodec.java similarity index 77% rename from src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogCodec.java rename to src/main/java/org/graylog/aws/inputs/codecs/CloudWatchFlowLogCodec.java index 23034edb..37f258ac 100644 --- a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogCodec.java +++ b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchFlowLogCodec.java @@ -1,8 +1,11 @@ -package org.graylog.aws.inputs.flowlogs; +package org.graylog.aws.inputs.codecs; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.assistedinject.Assisted; import org.graylog.aws.AWS; +import org.graylog.aws.cloudwatch.CloudWatchLogEvent; +import org.graylog.aws.cloudwatch.FlowLogMessage; +import org.graylog.aws.inputs.flowlogs.IANAProtocolNumbers; import org.graylog2.plugin.Message; import org.graylog2.plugin.configuration.Configuration; import org.graylog2.plugin.configuration.ConfigurationRequest; @@ -10,10 +13,7 @@ import org.graylog2.plugin.inputs.annotations.FactoryClass; import org.graylog2.plugin.inputs.codecs.Codec; import org.graylog2.plugin.inputs.codecs.CodecAggregator; -import org.graylog2.plugin.journal.RawMessage; import org.joda.time.Seconds; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -21,38 +21,30 @@ import java.util.HashMap; import java.util.Map; -public class FlowLogCodec implements Codec { - private static final Logger LOG = LoggerFactory.getLogger(FlowLogCodec.class); +public class CloudWatchFlowLogCodec extends CloudWatchLogDataCodec { public static final String NAME = "AWSFlowLog"; private final Configuration configuration; - private final ObjectMapper objectMapper; - private final IANAProtocolNumbers protocolNumbers; @Inject - public FlowLogCodec(@Assisted Configuration configuration, ObjectMapper objectMapper) { + public CloudWatchFlowLogCodec(@Assisted Configuration configuration, ObjectMapper objectMapper) { + super(objectMapper); this.configuration = configuration; - this.objectMapper = objectMapper; - this.protocolNumbers = new IANAProtocolNumbers(); } @Nullable @Override - public Message decode(@Nonnull RawMessage rawMessage) { + public Message decodeLogData(@Nonnull final CloudWatchLogEvent logEvent) { try { - String rawString = new String(rawMessage.getPayload()); - String[] parts = rawString.split(" "); + final FlowLogMessage flowLogMessage = FlowLogMessage.fromLogEvent(logEvent); - if (parts.length != 15) { - LOG.warn("Received FlowLog message with not exactly 15 fields. Skipping. Message was: [{}]", rawString); + if (flowLogMessage == null) { return null; } - FlowLogMessage flowLogMessage = FlowLogMessage.fromParts(parts); - - Message result = new Message( + final Message result = new Message( buildSummary(flowLogMessage), "aws-flowlogs", flowLogMessage.getTimestamp() @@ -113,9 +105,9 @@ public String getName() { } @FactoryClass - public interface Factory extends Codec.Factory { + public interface Factory extends Codec.Factory { @Override - FlowLogCodec create(Configuration configuration); + CloudWatchFlowLogCodec create(Configuration configuration); @Override Config getConfig(); diff --git a/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchLogDataCodec.java b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchLogDataCodec.java new file mode 100644 index 00000000..b7691650 --- /dev/null +++ b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchLogDataCodec.java @@ -0,0 +1,60 @@ +package org.graylog.aws.inputs.codecs; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.graylog.aws.cloudwatch.CloudWatchLogData; +import org.graylog.aws.cloudwatch.CloudWatchLogEvent; +import org.graylog2.plugin.Message; +import org.graylog2.plugin.inputs.codecs.MultiMessageCodec; +import org.graylog2.plugin.journal.RawMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public abstract class CloudWatchLogDataCodec implements MultiMessageCodec { + private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogDataCodec.class); + + private final ObjectMapper objectMapper; + + public CloudWatchLogDataCodec(final ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + @Nullable + @Override + public Message decode(@Nonnull RawMessage rawMessage) { + throw new UnsupportedOperationException("MultiMessageCodec " + getClass() + " does not support decode()"); + } + + @Nullable + @Override + public Collection decodeMessages(@Nonnull RawMessage rawMessage) { + try { + final CloudWatchLogData data = objectMapper.readValue(rawMessage.getPayload(), CloudWatchLogData.class); + final List messages = new ArrayList<>(data.logEvents.size()); + + for (final CloudWatchLogEvent logEvent : data.logEvents) { + try { + final Message message = decodeLogData(logEvent); + if (message != null) { + messages.add(message); + } + } catch (Exception e) { + LOG.error("Couldn't decode log event <{}>", logEvent); + } + } + + return messages; + } catch (IOException e) { + throw new RuntimeException("Couldn't deserialize log data", e); + } + } + + @Nullable + protected abstract Message decodeLogData(@Nonnull final CloudWatchLogEvent event); +} diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogReader.java b/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogReader.java deleted file mode 100644 index 17c6545f..00000000 --- a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogReader.java +++ /dev/null @@ -1,117 +0,0 @@ -package org.graylog.aws.inputs.flowlogs; - -import com.amazonaws.regions.Region; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; -import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; -import com.amazonaws.services.kinesis.model.Record; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import okhttp3.HttpUrl; -import org.graylog.aws.auth.AWSAuthProvider; -import org.graylog.aws.config.AWSPluginConfiguration; -import org.graylog.aws.config.Proxy; -import org.graylog.aws.inputs.flowlogs.json.FlowLogKinesisEvent; -import org.graylog.aws.inputs.flowlogs.json.RawFlowLog; -import org.graylog2.plugin.Tools; -import org.graylog2.plugin.cluster.ClusterConfigService; -import org.graylog2.plugin.inputs.MessageInput; -import org.graylog2.plugin.journal.RawMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FlowLogReader implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(FlowLogReader.class); - - private final Region region; - private final ObjectMapper objectMapper; - private final MessageInput sourceInput; - private final String kinesisStreamName; - private final AWSAuthProvider authProvider; - private final HttpUrl proxyUrl; - private final AWSPluginConfiguration awsConfig; - - - private Worker worker; - - public FlowLogReader(String kinesisStreamName, - Region region, - MessageInput input, - ClusterConfigService configService, - AWSAuthProvider authProvider, - HttpUrl proxyUrl) { - this.kinesisStreamName = kinesisStreamName; - this.region = region; - this.sourceInput = input; - this.objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - - this.authProvider = authProvider; - this.proxyUrl = proxyUrl; - - awsConfig = configService.get(AWSPluginConfiguration.class); - - LOG.info("Starting AWS FlowLog reader."); - } - - // TODO metrics - public void run() { - final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration( - "graylog-aws-plugin", - kinesisStreamName, - this.authProvider, - "graylog-server-master") - .withRegionName(region.getName()); - - // Optional HTTP proxy - if(awsConfig.proxyEnabled() && this.proxyUrl != null) { - config.withCommonClientConfig(Proxy.forAWS(this.proxyUrl)); - } - - final IRecordProcessorFactory recordProcessorFactory = () -> new IRecordProcessor() { - @Override - public void initialize(InitializationInput initializationInput) { - LOG.info("Initializing Kinesis worker."); - } - - @Override - public void processRecords(ProcessRecordsInput processRecordsInput) { - for (Record record : processRecordsInput.getRecords()) { - try { - LOG.debug("Received FlowLog events via Kinesis."); - - FlowLogKinesisEvent event = objectMapper.readValue(Tools.decompressGzip(record.getData().array()), FlowLogKinesisEvent.class); - for(RawFlowLog flowlog : event.logEvents) { - String fullMessage = flowlog.timestamp + " " + flowlog.message; - sourceInput.processRawMessage(new RawMessage(fullMessage.getBytes())); - } - } catch(Exception e) { - LOG.error("Could not read FlowLog record from Kinesis stream.", e); - } - } - } - - @Override - public void shutdown(ShutdownInput shutdownInput) { - LOG.info("Shutting down Kinesis worker."); - } - }; - - this.worker = new Worker.Builder() - .recordProcessorFactory(recordProcessorFactory) - .config(config) - .build(); - - worker.run(); - } - - public void stop() { - if (worker != null) { - worker.shutdown(); - } - } - -} diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogsInput.java b/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogsInput.java index 08014c4e..62554835 100644 --- a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogsInput.java +++ b/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogsInput.java @@ -2,6 +2,8 @@ import com.codahale.metrics.MetricRegistry; import com.google.inject.assistedinject.Assisted; +import org.graylog.aws.inputs.codecs.CloudWatchFlowLogCodec; +import org.graylog.aws.inputs.transports.KinesisTransport; import org.graylog2.plugin.LocalMetricRegistry; import org.graylog2.plugin.ServerStatus; import org.graylog2.plugin.configuration.Configuration; @@ -12,14 +14,14 @@ import javax.inject.Inject; public class FlowLogsInput extends MessageInput { - private static final String NAME = "AWS FlowLogs Input"; + private static final String NAME = "AWS CloudWatch Flow Logs"; @Inject public FlowLogsInput(@Assisted Configuration configuration, MetricRegistry metricRegistry, - FlowLogTransport.Factory transport, + KinesisTransport.Factory transport, LocalMetricRegistry localRegistry, - FlowLogCodec.Factory codec, + CloudWatchFlowLogCodec.Factory codec, Config config, Descriptor descriptor, ServerStatus serverStatus) { @@ -56,7 +58,7 @@ public Descriptor() { @ConfigClass public static class Config extends MessageInput.Config { @Inject - public Config(FlowLogTransport.Factory transport, FlowLogCodec.Factory codec) { + public Config(KinesisTransport.Factory transport, CloudWatchFlowLogCodec.Factory codec) { super(transport.getConfig(), codec.getConfig()); } } diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/json/FlowLogKinesisEvent.java b/src/main/java/org/graylog/aws/inputs/flowlogs/json/FlowLogKinesisEvent.java deleted file mode 100644 index fc10ea61..00000000 --- a/src/main/java/org/graylog/aws/inputs/flowlogs/json/FlowLogKinesisEvent.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.graylog.aws.inputs.flowlogs.json; - -import java.util.List; - -public class FlowLogKinesisEvent { - - public List logEvents; - -} diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/json/RawFlowLog.java b/src/main/java/org/graylog/aws/inputs/flowlogs/json/RawFlowLog.java deleted file mode 100644 index e206bb79..00000000 --- a/src/main/java/org/graylog/aws/inputs/flowlogs/json/RawFlowLog.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.graylog.aws.inputs.flowlogs.json; - -public class RawFlowLog { - - public long timestamp; - public String message; - -} diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogTransport.java b/src/main/java/org/graylog/aws/inputs/transports/KinesisTransport.java similarity index 73% rename from src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogTransport.java rename to src/main/java/org/graylog/aws/inputs/transports/KinesisTransport.java index 109aa925..4667bdfc 100644 --- a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogTransport.java +++ b/src/main/java/org/graylog/aws/inputs/transports/KinesisTransport.java @@ -1,4 +1,4 @@ -package org.graylog.aws.inputs.flowlogs; +package org.graylog.aws.inputs.transports; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; @@ -9,6 +9,7 @@ import okhttp3.HttpUrl; import org.graylog.aws.auth.AWSAuthProvider; import org.graylog.aws.config.AWSPluginConfiguration; +import org.graylog.aws.kinesis.KinesisConsumer; import org.graylog2.plugin.LocalMetricRegistry; import org.graylog2.plugin.cluster.ClusterConfigService; import org.graylog2.plugin.configuration.Configuration; @@ -23,6 +24,8 @@ import org.graylog2.plugin.inputs.codecs.CodecAggregator; import org.graylog2.plugin.inputs.transports.ThrottleableTransport; import org.graylog2.plugin.inputs.transports.Transport; +import org.graylog2.plugin.journal.RawMessage; +import org.graylog2.plugin.system.NodeId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,10 +33,11 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; -public class FlowLogTransport implements Transport { - private static final Logger LOG = LoggerFactory.getLogger(FlowLogTransport.class); - public static final String NAME = "flowlog"; +public class KinesisTransport implements Transport { + private static final Logger LOG = LoggerFactory.getLogger(KinesisTransport.class); + public static final String NAME = "awskinesis"; private static final String CK_AWS_REGION = "aws_region"; private static final String CK_ACCESS_KEY = "aws_access_key"; @@ -42,19 +46,22 @@ public class FlowLogTransport implements Transport { private final Configuration configuration; private final org.graylog2.Configuration graylogConfiguration; + private final NodeId nodeId; private final LocalMetricRegistry localRegistry; private final ClusterConfigService clusterConfigService; - private FlowLogReader reader; + private KinesisConsumer reader; @Inject - public FlowLogTransport(@Assisted final Configuration configuration, + public KinesisTransport(@Assisted final Configuration configuration, org.graylog2.Configuration graylogConfiguration, final ClusterConfigService clusterConfigService, + final NodeId nodeId, LocalMetricRegistry localRegistry) { this.clusterConfigService = clusterConfigService; this.configuration = configuration; this.graylogConfiguration = graylogConfiguration; + this.nodeId = nodeId; this.localRegistry = localRegistry; } @@ -62,28 +69,33 @@ public FlowLogTransport(@Assisted final Configuration configuration, public void launch(MessageInput input) throws MisfireException { ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("aws-flowlog-reader-%d") - .setUncaughtExceptionHandler((t, e) -> LOG.error("Uncaught exception in AWS FlowLogs reader.", e)) + .setNameFormat("aws-kinesis-reader-%d") + .setUncaughtExceptionHandler((t, e) -> LOG.error("Uncaught exception in AWS Kinesis reader.", e)) .build()); final AWSPluginConfiguration awsConfig = clusterConfigService.getOrDefault(AWSPluginConfiguration.class, AWSPluginConfiguration.createDefault()); - AWSAuthProvider authProvider = new AWSAuthProvider(awsConfig, this.configuration.getString(CK_ACCESS_KEY), this.configuration.getString(CK_SECRET_KEY)); + AWSAuthProvider authProvider = new AWSAuthProvider(awsConfig, configuration.getString(CK_ACCESS_KEY), configuration.getString(CK_SECRET_KEY)); - this.reader = new FlowLogReader( - this.configuration.getString(CK_KINESIS_STREAM_NAME), - Region.getRegion(Regions.fromName(this.configuration.getString(CK_AWS_REGION))), - input, - clusterConfigService, + this.reader = new KinesisConsumer( + configuration.getString(CK_KINESIS_STREAM_NAME), + Region.getRegion(Regions.fromName(configuration.getString(CK_AWS_REGION))), + kinesisCallback(input), + awsConfig, authProvider, - this.graylogConfiguration.getHttpProxyUri() == null ? null : HttpUrl.get(this.graylogConfiguration.getHttpProxyUri()) + nodeId, + graylogConfiguration.getHttpProxyUri() == null ? null : HttpUrl.get(graylogConfiguration.getHttpProxyUri()) ); - LOG.info("Starting FlowLogs Kinesis reader thread."); + LOG.info("Starting Kinesis reader thread for input [{}/{}]", input.getName(), input.getId()); executor.submit(this.reader); } + private Consumer kinesisCallback(final MessageInput input) { + return (data) -> input.processRawMessage(new RawMessage(data)); + } + @Override public void stop() { if(this.reader != null) { @@ -102,9 +114,9 @@ public MetricSet getMetricSet() { } @FactoryClass - public interface Factory extends Transport.Factory { + public interface Factory extends Transport.Factory { @Override - FlowLogTransport create(Configuration configuration); + KinesisTransport create(Configuration configuration); @Override Config getConfig(); @@ -126,7 +138,7 @@ public ConfigurationRequest getRequestedConfiguration() { "AWS Region", Regions.US_EAST_1.getName(), regions, - "The AWS region the FlowLogs are stored in.", + "The AWS region the Kinesis stream is running in.", ConfigurationField.Optional.NOT_OPTIONAL )); @@ -151,7 +163,7 @@ public ConfigurationRequest getRequestedConfiguration() { CK_KINESIS_STREAM_NAME, "Kinesis Stream name", "", - "The name of the Kinesis Stream that receives your FlowLog messages. See README for instructions on how to connect FlowLogs to a Kinesis Stream.", + "The name of the Kinesis stream that receives your messages. See README for instructions on how to connect messages to a Kinesis Stream.", ConfigurationField.Optional.NOT_OPTIONAL )); diff --git a/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java b/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java new file mode 100644 index 00000000..b96519bd --- /dev/null +++ b/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java @@ -0,0 +1,117 @@ +package org.graylog.aws.kinesis; + +import com.amazonaws.regions.Region; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; +import com.amazonaws.services.kinesis.model.Record; +import okhttp3.HttpUrl; +import org.graylog.aws.auth.AWSAuthProvider; +import org.graylog.aws.config.AWSPluginConfiguration; +import org.graylog.aws.config.Proxy; +import org.graylog2.plugin.Tools; +import org.graylog2.plugin.system.NodeId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Locale; +import java.util.function.Consumer; + +import static java.util.Objects.requireNonNull; + +public class KinesisConsumer implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class); + + private final Region region; + private final String kinesisStreamName; + private final AWSAuthProvider authProvider; + private final NodeId nodeId; + private final HttpUrl proxyUrl; + private final AWSPluginConfiguration awsConfig; + private final Consumer dataHandler; + + private Worker worker; + + public KinesisConsumer(String kinesisStreamName, + Region region, + Consumer dataHandler, + AWSPluginConfiguration awsConfig, + AWSAuthProvider authProvider, + NodeId nodeId, + @Nullable HttpUrl proxyUrl) { + this.kinesisStreamName = requireNonNull(kinesisStreamName, "kinesisStreamName"); + this.region = requireNonNull(region, "region"); + this.dataHandler = requireNonNull(dataHandler, "dataHandler"); + this.awsConfig = requireNonNull(awsConfig, "awsConfig"); + this.authProvider = requireNonNull(authProvider, "authProvider"); + this.nodeId = requireNonNull(nodeId, "nodeId"); + this.proxyUrl = proxyUrl; + } + + // TODO metrics + public void run() { + final String workerId = String.format(Locale.ENGLISH, "graylog-node-%s", nodeId.anonymize()); + final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration( + "graylog-aws-plugin", + kinesisStreamName, + authProvider, + workerId + ).withRegionName(region.getName()); + + // Optional HTTP proxy + if (awsConfig.proxyEnabled() && proxyUrl != null) { + config.withCommonClientConfig(Proxy.forAWS(proxyUrl)); + } + + final IRecordProcessorFactory recordProcessorFactory = () -> new IRecordProcessor() { + @Override + public void initialize(InitializationInput initializationInput) { + LOG.info("Initializing Kinesis worker for stream <{}>", kinesisStreamName); + } + + @Override + public void processRecords(ProcessRecordsInput processRecordsInput) { + LOG.info("Received {} Kinesis events", processRecordsInput.getRecords().size()); + + for (Record record : processRecordsInput.getRecords()) { + try { + // Create a read-only view of the data and use a safe method to convert it to a byte array + // as documented in Record#getData(). (using ByteBuffer#array() can fail) + final ByteBuffer dataBuffer = record.getData().asReadOnlyBuffer(); + final byte[] dataBytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(dataBytes); + + dataHandler.accept(Tools.decompressGzip(dataBytes).getBytes()); + } catch (Exception e) { + LOG.error("Couldn't read Kinesis record from stream <{}>", kinesisStreamName, e); + } + } + } + + @Override + public void shutdown(ShutdownInput shutdownInput) { + LOG.info("Shutting down Kinesis worker for stream <{}>", kinesisStreamName); + } + }; + + this.worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .build(); + + worker.run(); + } + + public void stop() { + if (worker != null) { + worker.shutdown(); + } + } + +} diff --git a/src/main/java/org/graylog/aws/plugin/AWSModule.java b/src/main/java/org/graylog/aws/plugin/AWSModule.java index b51eebe6..555a4b43 100644 --- a/src/main/java/org/graylog/aws/plugin/AWSModule.java +++ b/src/main/java/org/graylog/aws/plugin/AWSModule.java @@ -1,25 +1,25 @@ package org.graylog.aws.plugin; -import org.graylog.aws.inputs.cloudtrail.CloudTrailInput; import org.graylog.aws.inputs.cloudtrail.CloudTrailCodec; +import org.graylog.aws.inputs.cloudtrail.CloudTrailInput; import org.graylog.aws.inputs.cloudtrail.CloudTrailTransport; -import org.graylog.aws.inputs.flowlogs.FlowLogCodec; -import org.graylog.aws.inputs.flowlogs.FlowLogTransport; +import org.graylog.aws.inputs.codecs.CloudWatchFlowLogCodec; import org.graylog.aws.inputs.flowlogs.FlowLogsInput; +import org.graylog.aws.inputs.transports.KinesisTransport; import org.graylog.aws.processors.instancelookup.AWSInstanceNameLookupProcessor; import org.graylog2.plugin.PluginModule; public class AWSModule extends PluginModule { @Override protected void configure() { - // CloudTrail input + // CloudTrail addCodec(CloudTrailCodec.NAME, CloudTrailCodec.class); addTransport(CloudTrailTransport.NAME, CloudTrailTransport.class); addMessageInput(CloudTrailInput.class); - // FlowLog input - addCodec(FlowLogCodec.NAME, FlowLogCodec.class); - addTransport(FlowLogTransport.NAME, FlowLogTransport.class); + // CloudWatch + addCodec(CloudWatchFlowLogCodec.NAME, CloudWatchFlowLogCodec.class); + addTransport(KinesisTransport.NAME, KinesisTransport.class); addMessageInput(FlowLogsInput.class); // Instance name lookup diff --git a/src/test/java/org/graylog/aws/inputs/flowlogs/FlowLogMessageTest.java b/src/test/java/org/graylog/aws/inputs/flowlogs/FlowLogMessageTest.java index 44f59639..540ba2e8 100644 --- a/src/test/java/org/graylog/aws/inputs/flowlogs/FlowLogMessageTest.java +++ b/src/test/java/org/graylog/aws/inputs/flowlogs/FlowLogMessageTest.java @@ -1,5 +1,7 @@ package org.graylog.aws.inputs.flowlogs; +import org.graylog.aws.cloudwatch.CloudWatchLogEvent; +import org.graylog.aws.cloudwatch.FlowLogMessage; import org.testng.annotations.Test; import static org.testng.Assert.*; @@ -9,25 +11,26 @@ public class FlowLogMessageTest { @Test public void testFromPartsDoesNotFailWithMissingIntegerFields() throws Exception { - FlowLogMessage m = FlowLogMessage.fromParts( - new String[]{ - "0", - "-", - "foo", - "eth0", - "127.0.0.1", - "127.0.0.1", - "-", - "-", - "-", - "100", - "100", - "0", - "0", - "ACCEPT", - "OK" - } - ); + final CloudWatchLogEvent logEvent = new CloudWatchLogEvent(); + final String[] strings = { + "-", + "foo", + "eth0", + "127.0.0.1", + "127.0.0.1", + "-", + "-", + "-", + "100", + "100", + "0", + "0", + "ACCEPT", + "OK" + }; + logEvent.message = String.join(" ", strings); + + FlowLogMessage m = FlowLogMessage.fromLogEvent(logEvent); assertEquals(m.getDestinationPort(), 0); assertEquals(m.getSourcePort(), 0); @@ -37,25 +40,25 @@ public void testFromPartsDoesNotFailWithMissingIntegerFields() throws Exception @Test public void testFromPartsDoesNotFailWithMissingLongFields() throws Exception { - FlowLogMessage m = FlowLogMessage.fromParts( - new String[]{ - "0", - "1", - "foo", - "eth0", - "127.0.0.1", - "127.0.0.1", - "80", - "80", - "1", - "-", - "-", - "0", - "0", - "ACCEPT", - "OK" - } - ); + final String[] strings = { + "1", + "foo", + "eth0", + "127.0.0.1", + "127.0.0.1", + "80", + "80", + "1", + "-", + "-", + "0", + "0", + "ACCEPT", + "OK" + }; + final CloudWatchLogEvent logEvent = new CloudWatchLogEvent(); + logEvent.message = String.join(" ", strings); + FlowLogMessage m = FlowLogMessage.fromLogEvent(logEvent); assertEquals(m.getBytes(), 0); assertEquals(m.getPackets(), 0); From eef764be12748005ab7e7b259b3182228ced416e Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Wed, 9 Aug 2017 19:56:00 +0200 Subject: [PATCH 02/14] Add CloudWatch input and codec to read raw logs --- .../cloudwatch/CloudWatchLogsInput.java | 65 ++++++++++++++++ .../inputs/codecs/CloudWatchRawLogCodec.java | 77 +++++++++++++++++++ .../org/graylog/aws/plugin/AWSModule.java | 8 +- 3 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/graylog/aws/inputs/cloudwatch/CloudWatchLogsInput.java create mode 100644 src/main/java/org/graylog/aws/inputs/codecs/CloudWatchRawLogCodec.java diff --git a/src/main/java/org/graylog/aws/inputs/cloudwatch/CloudWatchLogsInput.java b/src/main/java/org/graylog/aws/inputs/cloudwatch/CloudWatchLogsInput.java new file mode 100644 index 00000000..38afb981 --- /dev/null +++ b/src/main/java/org/graylog/aws/inputs/cloudwatch/CloudWatchLogsInput.java @@ -0,0 +1,65 @@ +package org.graylog.aws.inputs.cloudwatch; + +import com.codahale.metrics.MetricRegistry; +import com.google.inject.assistedinject.Assisted; +import org.graylog.aws.inputs.codecs.CloudWatchRawLogCodec; +import org.graylog.aws.inputs.transports.KinesisTransport; +import org.graylog2.plugin.LocalMetricRegistry; +import org.graylog2.plugin.ServerStatus; +import org.graylog2.plugin.configuration.Configuration; +import org.graylog2.plugin.inputs.MessageInput; +import org.graylog2.plugin.inputs.annotations.ConfigClass; +import org.graylog2.plugin.inputs.annotations.FactoryClass; + +import javax.inject.Inject; + +public class CloudWatchLogsInput extends MessageInput { + private static final String NAME = "AWS CloudWatch Logs"; + + @Inject + public CloudWatchLogsInput(@Assisted Configuration configuration, + MetricRegistry metricRegistry, + KinesisTransport.Factory transport, + LocalMetricRegistry localRegistry, + CloudWatchRawLogCodec.Factory codec, + Config config, + Descriptor descriptor, + ServerStatus serverStatus) { + super( + metricRegistry, + configuration, + transport.create(configuration), + localRegistry, + codec.create(configuration), + config, + descriptor, + serverStatus + ); + } + + @FactoryClass + public interface Factory extends MessageInput.Factory { + @Override + CloudWatchLogsInput create(Configuration configuration); + + @Override + Config getConfig(); + + @Override + Descriptor getDescriptor(); + } + + public static class Descriptor extends MessageInput.Descriptor { + public Descriptor() { + super(NAME, false, ""); + } + } + + @ConfigClass + public static class Config extends MessageInput.Config { + @Inject + public Config(KinesisTransport.Factory transport, CloudWatchRawLogCodec.Factory codec) { + super(transport.getConfig(), codec.getConfig()); + } + } +} diff --git a/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchRawLogCodec.java b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchRawLogCodec.java new file mode 100644 index 00000000..b7012e08 --- /dev/null +++ b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchRawLogCodec.java @@ -0,0 +1,77 @@ +package org.graylog.aws.inputs.codecs; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.assistedinject.Assisted; +import org.graylog.aws.cloudwatch.CloudWatchLogEvent; +import org.graylog2.plugin.Message; +import org.graylog2.plugin.configuration.Configuration; +import org.graylog2.plugin.configuration.ConfigurationRequest; +import org.graylog2.plugin.inputs.annotations.ConfigClass; +import org.graylog2.plugin.inputs.annotations.FactoryClass; +import org.graylog2.plugin.inputs.codecs.Codec; +import org.graylog2.plugin.inputs.codecs.CodecAggregator; +import org.joda.time.DateTime; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.inject.Inject; + +public class CloudWatchRawLogCodec extends CloudWatchLogDataCodec { + public static final String NAME = "AWSCloudWatchRawLog"; + + private final Configuration configuration; + + @Inject + public CloudWatchRawLogCodec(@Assisted Configuration configuration, ObjectMapper objectMapper) { + super(objectMapper); + this.configuration = configuration; + } + + @Nullable + @Override + public Message decodeLogData(@Nonnull final CloudWatchLogEvent logEvent) { + try { + return new Message(logEvent.message, "aws-raw-logs", new DateTime(logEvent.timestamp)); + } catch (Exception e) { + throw new RuntimeException("Could not deserialize AWS FlowLog record.", e); + } + } + + @Nonnull + @Override + public Configuration getConfiguration() { + return configuration; + } + + @Nullable + @Override + public CodecAggregator getAggregator() { + return null; + } + + @Override + public String getName() { + return NAME; + } + + @FactoryClass + public interface Factory extends Codec.Factory { + @Override + CloudWatchRawLogCodec create(Configuration configuration); + + @Override + Config getConfig(); + } + + @ConfigClass + public static class Config implements Codec.Config { + @Override + public ConfigurationRequest getRequestedConfiguration() { + return new ConfigurationRequest(); + } + + @Override + public void overrideDefaultValues(@Nonnull ConfigurationRequest cr) { + } + } +} diff --git a/src/main/java/org/graylog/aws/plugin/AWSModule.java b/src/main/java/org/graylog/aws/plugin/AWSModule.java index 555a4b43..e9d3b1f0 100644 --- a/src/main/java/org/graylog/aws/plugin/AWSModule.java +++ b/src/main/java/org/graylog/aws/plugin/AWSModule.java @@ -1,11 +1,13 @@ package org.graylog.aws.plugin; -import org.graylog.aws.inputs.cloudtrail.CloudTrailCodec; import org.graylog.aws.inputs.cloudtrail.CloudTrailInput; +import org.graylog.aws.inputs.cloudtrail.CloudTrailCodec; import org.graylog.aws.inputs.cloudtrail.CloudTrailTransport; +import org.graylog.aws.inputs.cloudwatch.CloudWatchLogsInput; +import org.graylog.aws.inputs.codecs.CloudWatchRawLogCodec; import org.graylog.aws.inputs.codecs.CloudWatchFlowLogCodec; -import org.graylog.aws.inputs.flowlogs.FlowLogsInput; import org.graylog.aws.inputs.transports.KinesisTransport; +import org.graylog.aws.inputs.flowlogs.FlowLogsInput; import org.graylog.aws.processors.instancelookup.AWSInstanceNameLookupProcessor; import org.graylog2.plugin.PluginModule; @@ -19,8 +21,10 @@ protected void configure() { // CloudWatch addCodec(CloudWatchFlowLogCodec.NAME, CloudWatchFlowLogCodec.class); + addCodec(CloudWatchRawLogCodec.NAME, CloudWatchRawLogCodec.class); addTransport(KinesisTransport.NAME, KinesisTransport.class); addMessageInput(FlowLogsInput.class); + addMessageInput(CloudWatchLogsInput.class); // Instance name lookup addMessageProcessor(AWSInstanceNameLookupProcessor.class, AWSInstanceNameLookupProcessor.Descriptor.class); From 494182db39c63175345aeb2737815f6434c45b41 Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Thu, 10 Aug 2017 12:45:04 +0200 Subject: [PATCH 03/14] Use custom object mapper as we did in the old code --- .../aws/inputs/codecs/CloudWatchFlowLogCodec.java | 4 +--- .../aws/inputs/codecs/CloudWatchLogDataCodec.java | 11 ++++------- .../aws/inputs/codecs/CloudWatchRawLogCodec.java | 4 +--- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchFlowLogCodec.java b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchFlowLogCodec.java index 37f258ac..8c7254db 100644 --- a/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchFlowLogCodec.java +++ b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchFlowLogCodec.java @@ -1,6 +1,5 @@ package org.graylog.aws.inputs.codecs; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.assistedinject.Assisted; import org.graylog.aws.AWS; import org.graylog.aws.cloudwatch.CloudWatchLogEvent; @@ -28,8 +27,7 @@ public class CloudWatchFlowLogCodec extends CloudWatchLogDataCodec { private final IANAProtocolNumbers protocolNumbers; @Inject - public CloudWatchFlowLogCodec(@Assisted Configuration configuration, ObjectMapper objectMapper) { - super(objectMapper); + public CloudWatchFlowLogCodec(@Assisted Configuration configuration) { this.configuration = configuration; this.protocolNumbers = new IANAProtocolNumbers(); } diff --git a/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchLogDataCodec.java b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchLogDataCodec.java index b7691650..549a40b7 100644 --- a/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchLogDataCodec.java +++ b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchLogDataCodec.java @@ -16,14 +16,11 @@ import java.util.Collection; import java.util.List; +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + public abstract class CloudWatchLogDataCodec implements MultiMessageCodec { private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogDataCodec.class); - - private final ObjectMapper objectMapper; - - public CloudWatchLogDataCodec(final ObjectMapper objectMapper) { - this.objectMapper = objectMapper; - } + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); @Nullable @Override @@ -35,7 +32,7 @@ public Message decode(@Nonnull RawMessage rawMessage) { @Override public Collection decodeMessages(@Nonnull RawMessage rawMessage) { try { - final CloudWatchLogData data = objectMapper.readValue(rawMessage.getPayload(), CloudWatchLogData.class); + final CloudWatchLogData data = OBJECT_MAPPER.readValue(rawMessage.getPayload(), CloudWatchLogData.class); final List messages = new ArrayList<>(data.logEvents.size()); for (final CloudWatchLogEvent logEvent : data.logEvents) { diff --git a/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchRawLogCodec.java b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchRawLogCodec.java index b7012e08..1ca800db 100644 --- a/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchRawLogCodec.java +++ b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchRawLogCodec.java @@ -1,6 +1,5 @@ package org.graylog.aws.inputs.codecs; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.assistedinject.Assisted; import org.graylog.aws.cloudwatch.CloudWatchLogEvent; import org.graylog2.plugin.Message; @@ -22,8 +21,7 @@ public class CloudWatchRawLogCodec extends CloudWatchLogDataCodec { private final Configuration configuration; @Inject - public CloudWatchRawLogCodec(@Assisted Configuration configuration, ObjectMapper objectMapper) { - super(objectMapper); + public CloudWatchRawLogCodec(@Assisted Configuration configuration) { this.configuration = configuration; } From 5c4c36ab6f3e2cf24ad73fe258ac82cac085c7cd Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Thu, 10 Aug 2017 13:13:28 +0200 Subject: [PATCH 04/14] Add payload examples and ignore unknwon properties --- .../aws/cloudwatch/CloudWatchLogData.java | 28 +++++++++++++++++++ .../aws/cloudwatch/CloudWatchLogEvent.java | 14 ++++++++++ 2 files changed, 42 insertions(+) diff --git a/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogData.java b/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogData.java index 9ae3abe2..5f0dcae0 100644 --- a/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogData.java +++ b/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogData.java @@ -1,9 +1,37 @@ package org.graylog.aws.cloudwatch; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; +/** + * A collection of CloudWatch log events. + *

+ * Example payload: + *

+ * {
+ *   "messageType": "DATA_MESSAGE",
+ *   "owner": "123456789",
+ *   "logGroup": "aws-plugin-test-flows",
+ *   "logStream": "eni-aaaaaaaa-all",
+ *   "subscriptionFilters": ["match-all"],
+ *   "logEvents": [
+ *     {
+ *       "id": "33503748002479370955346306650196094071913271643270021120",
+ *       "timestamp": 1502360020000,
+ *       "message": "2 123456789 eni-aaaaaaaa 10.0.27.226 10.42.96.199 3604 17720 17 1 132 1502360020 1502360079 REJECT OK"
+ *     },
+ *     {
+ *       "id": "33503748002479370955346306650196094071913271643270021127",
+ *       "timestamp": 1502360020000,
+ *       "message": "2 123456789 eni-aaaaaaaa 10.0.34.113 10.42.96.199 53421 17720 6 1 48 1502360020 1502360079 REJECT OK"
+ *     }
+ *   ]
+ * }
+ * 
+ */ +@JsonIgnoreProperties(ignoreUnknown = true) public class CloudWatchLogData { @JsonProperty("logEvents") public List logEvents; diff --git a/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogEvent.java b/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogEvent.java index 31f68d6d..8c6cb3b3 100644 --- a/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogEvent.java +++ b/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogEvent.java @@ -1,8 +1,22 @@ package org.graylog.aws.cloudwatch; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.MoreObjects; +/** + * A single CloudWatch log event. + *

+ * Example payload: + *

+ * {
+ *   "id": "33503748002479370955346306650196094071913271643270021120",
+ *   "timestamp": 1502360020000,
+ *   "message": "2 123456789 eni-aaaaaaaa 10.0.27.226 10.42.96.199 3604 17720 17 1 132 1502360020 1502360079 REJECT OK"
+ * }
+ * 
+ */ +@JsonIgnoreProperties(ignoreUnknown = true) public class CloudWatchLogEvent { @JsonProperty("timestamp") public long timestamp; From dc02d1780efca25d48688a390b52c7b03556eb99 Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Thu, 10 Aug 2017 13:33:09 +0200 Subject: [PATCH 05/14] Make kinesis consumer less noisy --- src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java b/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java index b96519bd..1e4e9621 100644 --- a/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java +++ b/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java @@ -77,7 +77,7 @@ public void initialize(InitializationInput initializationInput) { @Override public void processRecords(ProcessRecordsInput processRecordsInput) { - LOG.info("Received {} Kinesis events", processRecordsInput.getRecords().size()); + LOG.debug("Received {} Kinesis events", processRecordsInput.getRecords().size()); for (Record record : processRecordsInput.getRecords()) { try { From d421fb1c109ac44a5ed4d648eaf1f93178f0e4e2 Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Thu, 10 Aug 2017 16:27:37 +0200 Subject: [PATCH 06/14] Update to the latest AWS SDK and Kinesis library --- pom.xml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index adaeaddc..a65ff3cf 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,8 @@ true true 2.3.0 - 1.11.98 + 1.11.174 + 1.8.1 @@ -114,7 +115,7 @@ com.amazonaws amazon-kinesis-client - 1.7.4 + ${aws-kinesis-client.version} commons-logging From b5318affe82da5138c90f1711e195660d47c01c1 Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Thu, 10 Aug 2017 17:11:26 +0200 Subject: [PATCH 07/14] Use unique application name per Kinesis stream The Kinesis client is using a DynamoDB table under the hood to track shard offsets. When a consumer is using the same application name for two different Kinesis streams, checkpointing will fail. --- src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java b/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java index 1e4e9621..e8ced96e 100644 --- a/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java +++ b/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java @@ -58,7 +58,10 @@ public KinesisConsumer(String kinesisStreamName, public void run() { final String workerId = String.format(Locale.ENGLISH, "graylog-node-%s", nodeId.anonymize()); final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration( - "graylog-aws-plugin", + // The application name needs to be unique per input. Using the same name for two different Kinesis + // streams will cause trouble with state handling in DynamoDB. (used by the Kinesis client under the + // hood to keep state) + String.format(Locale.ENGLISH, "graylog-aws-plugin-%s", kinesisStreamName), kinesisStreamName, authProvider, workerId From f9c8e3457c7d18b8bad558fd3015e7086235a190 Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Thu, 10 Aug 2017 17:14:00 +0200 Subject: [PATCH 08/14] Periodically checkpoint the Kinesis stream This ensures that we are getting old records after a Graylog server restart instead of starting with LATEST on each restart. --- .../graylog/aws/kinesis/KinesisConsumer.java | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java b/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java index e8ced96e..bd4ce1ad 100644 --- a/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java +++ b/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java @@ -1,6 +1,9 @@ package org.graylog.aws.kinesis; import com.amazonaws.regions.Region; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; @@ -9,18 +12,28 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.model.Record; +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; import okhttp3.HttpUrl; import org.graylog.aws.auth.AWSAuthProvider; import org.graylog.aws.config.AWSPluginConfiguration; import org.graylog.aws.config.Proxy; import org.graylog2.plugin.Tools; import org.graylog2.plugin.system.NodeId; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Locale; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static java.util.Objects.requireNonNull; @@ -73,6 +86,8 @@ public void run() { } final IRecordProcessorFactory recordProcessorFactory = () -> new IRecordProcessor() { + private DateTime lastCheckpoint = DateTime.now(); + @Override public void initialize(InitializationInput initializationInput) { LOG.info("Initializing Kinesis worker for stream <{}>", kinesisStreamName); @@ -95,6 +110,48 @@ public void processRecords(ProcessRecordsInput processRecordsInput) { LOG.error("Couldn't read Kinesis record from stream <{}>", kinesisStreamName, e); } } + + // According to the Kinesis client documentation, we should not checkpoint for every record but + // rather periodically. + // TODO: Make interval configurable (global) + if (lastCheckpoint.plusMinutes(1).isBeforeNow()) { + lastCheckpoint = DateTime.now(); + LOG.debug("Checkpointing stream <{}>", kinesisStreamName); + checkpoint(processRecordsInput); + } + } + + private void checkpoint(ProcessRecordsInput processRecordsInput) { + final Retryer retryer = RetryerBuilder.newBuilder() + .retryIfExceptionOfType(ThrottlingException.class) + .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS)) + .withStopStrategy(StopStrategies.stopAfterDelay(10, TimeUnit.MINUTES)) + .withRetryListener(new RetryListener() { + @Override + public void onRetry(Attempt attempt) { + if (attempt.hasException()) { + LOG.warn("Checkpointing stream <{}> failed, retrying. (attempt {})", kinesisStreamName, attempt.getAttemptNumber()); + } + } + }) + .build(); + + try { + retryer.call(() -> { + try { + processRecordsInput.getCheckpointer().checkpoint(); + } catch (InvalidStateException e) { + LOG.error("Couldn't save checkpoint to DynamoDB table used by the Kinesis client library - check database table", e); + } catch (ShutdownException e) { + LOG.debug("Processor is shutting down, skipping checkpoint"); + } + return null; + }); + } catch (ExecutionException e) { + LOG.error("Couldn't checkpoint stream <{}>", kinesisStreamName, e); + } catch (RetryException e) { + LOG.error("Checkpoint retry for stream <{}> finally failed", kinesisStreamName, e); + } } @Override From f9b45ba2e21443485b9e5061f8d0e5dd93cf40ba Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Thu, 10 Aug 2017 17:54:28 +0200 Subject: [PATCH 09/14] Add second constructor to AWSAuthProvider and simplify conditions --- .../org/graylog/aws/auth/AWSAuthProvider.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/graylog/aws/auth/AWSAuthProvider.java b/src/main/java/org/graylog/aws/auth/AWSAuthProvider.java index ffb7fe63..af51b73f 100644 --- a/src/main/java/org/graylog/aws/auth/AWSAuthProvider.java +++ b/src/main/java/org/graylog/aws/auth/AWSAuthProvider.java @@ -10,18 +10,24 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import static com.google.common.base.Strings.isNullOrEmpty; + public class AWSAuthProvider implements AWSCredentialsProvider { private static final Logger LOG = LoggerFactory.getLogger(AWSAuthProvider.class); private AWSCredentialsProvider credentials; - public AWSAuthProvider(AWSPluginConfiguration config, String accessKey, String secretKey) { - if (accessKey != null && secretKey != null - && !accessKey.isEmpty() && !secretKey.isEmpty()) { + public AWSAuthProvider(AWSPluginConfiguration config) { + this(config, null, null); + } + + public AWSAuthProvider(AWSPluginConfiguration config, @Nullable String accessKey, @Nullable String secretKey) { + if (!isNullOrEmpty(accessKey) && !isNullOrEmpty(secretKey)) { this.credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)); LOG.debug("Using input specific config"); - } else if (config.accessKey() != null && config.secretKey() != null - && !config.accessKey().isEmpty() && !config.secretKey().isEmpty()) { + } else if (!isNullOrEmpty(config.accessKey()) && !isNullOrEmpty(config.secretKey())) { this.credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials(config.accessKey(), config.secretKey())); LOG.debug("Using AWS Plugin config"); } else { From ddcd3187e00adf4f9473af677f090f4b95aba72f Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Thu, 10 Aug 2017 18:06:33 +0200 Subject: [PATCH 10/14] Pass auth provider into InstanceLookupTable to allow different methods Also use AmazonEC2Client builder instead of deprecated constructor. --- .../AWSInstanceNameLookupProcessor.java | 6 ++++-- .../instancelookup/InstanceLookupTable.java | 20 ++++++++++++------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java b/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java index a67caf8a..2cf7be43 100644 --- a/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java +++ b/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java @@ -1,11 +1,11 @@ package org.graylog.aws.processors.instancelookup; -import com.amazonaws.auth.BasicAWSCredentials; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ThreadFactoryBuilder; import okhttp3.HttpUrl; import org.graylog.aws.AWS; +import org.graylog.aws.auth.AWSAuthProvider; import org.graylog.aws.config.AWSPluginConfiguration; import org.graylog2.Configuration; import org.graylog2.plugin.Message; @@ -70,6 +70,8 @@ public void run() { return; } + final AWSAuthProvider awsAuthProvider = new AWSAuthProvider(config); + LOG.debug("Refreshing AWS instance lookup table."); final HttpUrl proxyUrl = config.proxyEnabled() && configuration.getHttpProxyUri() != null @@ -77,7 +79,7 @@ public void run() { table.reload( config.getLookupRegions(), - new BasicAWSCredentials(config.accessKey(), config.secretKey()), + awsAuthProvider, proxyUrl ); } catch (Exception e) { diff --git a/src/main/java/org/graylog/aws/processors/instancelookup/InstanceLookupTable.java b/src/main/java/org/graylog/aws/processors/instancelookup/InstanceLookupTable.java index a1960a8e..6e329efe 100644 --- a/src/main/java/org/graylog/aws/processors/instancelookup/InstanceLookupTable.java +++ b/src/main/java/org/graylog/aws/processors/instancelookup/InstanceLookupTable.java @@ -1,7 +1,7 @@ package org.graylog.aws.processors.instancelookup; -import com.amazonaws.auth.AWSCredentials; import com.amazonaws.regions.Regions; +import com.amazonaws.services.ec2.AmazonEC2; import com.amazonaws.services.ec2.AmazonEC2Client; import com.amazonaws.services.ec2.model.DescribeInstancesResult; import com.amazonaws.services.ec2.model.DescribeNetworkInterfacesResult; @@ -14,6 +14,7 @@ import com.amazonaws.services.ec2.model.Tag; import com.google.common.collect.ImmutableMap; import okhttp3.HttpUrl; +import org.graylog.aws.auth.AWSAuthProvider; import org.graylog.aws.config.Proxy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +46,7 @@ public static InstanceLookupTable getInstance() { private InstanceLookupTable() { /* nope */ } - public void reload(List regions, AWSCredentials credentials, HttpUrl proxyUrl) { + public void reload(List regions, AWSAuthProvider awsAuthProvider, HttpUrl proxyUrl) { LOG.info("Reloading AWS instance lookup table."); ImmutableMap.Builder ec2InstancesBuilder = ImmutableMap.builder(); @@ -53,16 +54,21 @@ public void reload(List regions, AWSCredentials credentials, HttpUrl pr for (Regions region : regions) { try { - AmazonEC2Client ec2Client; + AmazonEC2 ec2Client; if(proxyUrl != null) { - ec2Client = new AmazonEC2Client(credentials, Proxy.forAWS(proxyUrl)); + ec2Client = AmazonEC2Client.builder() + .withCredentials(awsAuthProvider) + .withRegion(region) + .withClientConfiguration(Proxy.forAWS(proxyUrl)) + .build(); } else { - ec2Client = new AmazonEC2Client(credentials); + ec2Client = AmazonEC2Client.builder() + .withCredentials(awsAuthProvider) + .withRegion(region) + .build(); } - ec2Client.configureRegion(region); - // Load network interfaces LOG.debug("Requesting AWS network interface descriptions in [{}].", region.getName()); DescribeNetworkInterfacesResult interfaces = ec2Client.describeNetworkInterfaces(); From bcbda91b88dc22d36da328e83f47766930a5f6e4 Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Thu, 10 Aug 2017 18:14:00 +0200 Subject: [PATCH 11/14] Inject InstanceLookupTable and bind it as singleton --- src/main/java/org/graylog/aws/plugin/AWSModule.java | 3 +++ .../instancelookup/AWSInstanceNameLookupProcessor.java | 3 ++- .../processors/instancelookup/InstanceLookupTable.java | 10 ++-------- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/graylog/aws/plugin/AWSModule.java b/src/main/java/org/graylog/aws/plugin/AWSModule.java index e9d3b1f0..af3bf85d 100644 --- a/src/main/java/org/graylog/aws/plugin/AWSModule.java +++ b/src/main/java/org/graylog/aws/plugin/AWSModule.java @@ -9,6 +9,7 @@ import org.graylog.aws.inputs.transports.KinesisTransport; import org.graylog.aws.inputs.flowlogs.FlowLogsInput; import org.graylog.aws.processors.instancelookup.AWSInstanceNameLookupProcessor; +import org.graylog.aws.processors.instancelookup.InstanceLookupTable; import org.graylog2.plugin.PluginModule; public class AWSModule extends PluginModule { @@ -28,5 +29,7 @@ protected void configure() { // Instance name lookup addMessageProcessor(AWSInstanceNameLookupProcessor.class, AWSInstanceNameLookupProcessor.Descriptor.class); + + bind(InstanceLookupTable.class).asEagerSingleton(); } } diff --git a/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java b/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java index 2cf7be43..00e52341 100644 --- a/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java +++ b/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java @@ -49,10 +49,11 @@ public String className() { @Inject public AWSInstanceNameLookupProcessor(ClusterConfigService clusterConfigService, + InstanceLookupTable instanceLookupTable, MetricRegistry metricRegistry, Configuration configuration) { this.metricRegistry = metricRegistry; - this.table = InstanceLookupTable.getInstance(); + this.table = instanceLookupTable; Runnable refresh = new Runnable() { @Override diff --git a/src/main/java/org/graylog/aws/processors/instancelookup/InstanceLookupTable.java b/src/main/java/org/graylog/aws/processors/instancelookup/InstanceLookupTable.java index 6e329efe..fc8da8e8 100644 --- a/src/main/java/org/graylog/aws/processors/instancelookup/InstanceLookupTable.java +++ b/src/main/java/org/graylog/aws/processors/instancelookup/InstanceLookupTable.java @@ -19,13 +19,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Singleton; import java.util.List; +@Singleton public class InstanceLookupTable { private static final Logger LOG = LoggerFactory.getLogger(InstanceLookupTable.class); - private static InstanceLookupTable INSTANCE = new InstanceLookupTable(); - enum InstanceType { RDS, EC2, @@ -40,12 +40,6 @@ enum InstanceType { // TODO METRICS - public static InstanceLookupTable getInstance() { - return INSTANCE; - } - - private InstanceLookupTable() { /* nope */ } - public void reload(List regions, AWSAuthProvider awsAuthProvider, HttpUrl proxyUrl) { LOG.info("Reloading AWS instance lookup table."); From 592c7917d4ac85d76a4f2b343d03f0a46544fc33 Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Mon, 14 Aug 2017 14:52:24 +0200 Subject: [PATCH 12/14] Switch tests to junit --- pom.xml | 6 ------ .../notifications/CloudtrailSNSNotificationParserTest.java | 4 ++-- .../org/graylog/aws/inputs/flowlogs/FlowLogMessageTest.java | 4 ++-- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/pom.xml b/pom.xml index a65ff3cf..c63a4b30 100644 --- a/pom.xml +++ b/pom.xml @@ -135,12 +135,6 @@ ${auto-service.version} provided - - org.testng - testng - 6.9.10 - test - diff --git a/src/test/java/org/graylog/aws/inputs/cloudtrail/notifications/CloudtrailSNSNotificationParserTest.java b/src/test/java/org/graylog/aws/inputs/cloudtrail/notifications/CloudtrailSNSNotificationParserTest.java index 9fa0387e..ee32028f 100644 --- a/src/test/java/org/graylog/aws/inputs/cloudtrail/notifications/CloudtrailSNSNotificationParserTest.java +++ b/src/test/java/org/graylog/aws/inputs/cloudtrail/notifications/CloudtrailSNSNotificationParserTest.java @@ -1,11 +1,11 @@ package org.graylog.aws.inputs.cloudtrail.notifications; import com.amazonaws.services.sqs.model.Message; -import org.testng.annotations.Test; +import org.junit.Test; import java.util.List; -import static org.testng.Assert.assertEquals; +import static org.junit.Assert.assertEquals; public class CloudtrailSNSNotificationParserTest { diff --git a/src/test/java/org/graylog/aws/inputs/flowlogs/FlowLogMessageTest.java b/src/test/java/org/graylog/aws/inputs/flowlogs/FlowLogMessageTest.java index 540ba2e8..94087f2b 100644 --- a/src/test/java/org/graylog/aws/inputs/flowlogs/FlowLogMessageTest.java +++ b/src/test/java/org/graylog/aws/inputs/flowlogs/FlowLogMessageTest.java @@ -2,9 +2,9 @@ import org.graylog.aws.cloudwatch.CloudWatchLogEvent; import org.graylog.aws.cloudwatch.FlowLogMessage; -import org.testng.annotations.Test; +import org.junit.Test; -import static org.testng.Assert.*; +import static org.junit.Assert.assertEquals; public class FlowLogMessageTest { From f3bf4dd27a295ee1f4628c51266bb9ea0e374950 Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Mon, 14 Aug 2017 14:54:23 +0200 Subject: [PATCH 13/14] Use default config in instance name lookup processor - Remove now unused "AWSPluginConfiguration#isComplete()" - Log a warning if the lookup processor is enabled but no regions are configured --- pom.xml | 5 ++++ .../aws/config/AWSPluginConfiguration.java | 8 +---- .../AWSInstanceNameLookupProcessor.java | 11 +++---- .../config/AWSPluginConfigurationTest.java | 29 +++++++++++++++++++ 4 files changed, 41 insertions(+), 12 deletions(-) create mode 100644 src/test/java/org/graylog/aws/config/AWSPluginConfigurationTest.java diff --git a/pom.xml b/pom.xml index c63a4b30..8c46397a 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,11 @@ ${auto-service.version} provided + + org.assertj + assertj-core + ${assertj-core.version} + diff --git a/src/main/java/org/graylog/aws/config/AWSPluginConfiguration.java b/src/main/java/org/graylog/aws/config/AWSPluginConfiguration.java index ee5839bf..eac17718 100644 --- a/src/main/java/org/graylog/aws/config/AWSPluginConfiguration.java +++ b/src/main/java/org/graylog/aws/config/AWSPluginConfiguration.java @@ -64,16 +64,10 @@ public static Builder builder() { return new AutoValue_AWSPluginConfiguration.Builder(); } - @JsonIgnore - public boolean isComplete() { - return accessKey() != null && secretKey() != null - && !accessKey().isEmpty() && !secretKey().isEmpty(); - } - @JsonIgnore public List getLookupRegions() { if (lookupRegions() == null || lookupRegions().isEmpty()) { - return Collections.EMPTY_LIST; + return Collections.emptyList(); } ImmutableList.Builder builder = ImmutableList.builder(); diff --git a/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java b/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java index 00e52341..9181fd2d 100644 --- a/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java +++ b/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java @@ -59,15 +59,16 @@ public AWSInstanceNameLookupProcessor(ClusterConfigService clusterConfigService, @Override public void run() { try { - config = clusterConfigService.get(AWSPluginConfiguration.class); + config = clusterConfigService.getOrDefault(AWSPluginConfiguration.class, + AWSPluginConfiguration.createDefault()); - if(config == null || !config.isComplete()) { - LOG.warn("AWS plugin is not fully configured. No instance lookups will happen."); + if (!config.lookupsEnabled()) { + LOG.debug("AWS instance name lookups are disabled."); return; } - if (!config.lookupsEnabled()) { - LOG.debug("AWS instance name lookups are disabled."); + if (config.lookupsEnabled() && config.getLookupRegions().isEmpty()) { + LOG.warn("AWS region configuration is not complete. No instance lookups will happen."); return; } diff --git a/src/test/java/org/graylog/aws/config/AWSPluginConfigurationTest.java b/src/test/java/org/graylog/aws/config/AWSPluginConfigurationTest.java new file mode 100644 index 00000000..239f4778 --- /dev/null +++ b/src/test/java/org/graylog/aws/config/AWSPluginConfigurationTest.java @@ -0,0 +1,29 @@ +package org.graylog.aws.config; + +import com.amazonaws.regions.Regions; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.graylog.aws.config.AWSPluginConfiguration.createDefault; + +public class AWSPluginConfigurationTest { + @Test + public void lookupRegions() throws Exception { + final AWSPluginConfiguration config = createDefault() + .toBuilder() + .lookupRegions("us-west-1,eu-west-1 , us-east-1 ") + .build(); + + assertThat(config.getLookupRegions()).containsExactly(Regions.US_WEST_1, Regions.EU_WEST_1, Regions.US_EAST_1); + } + + @Test + public void lookupRegionsWithEmptyValue() throws Exception { + final AWSPluginConfiguration config = createDefault() + .toBuilder() + .lookupRegions("") + .build(); + + assertThat(config.getLookupRegions()).isEmpty(); + } +} \ No newline at end of file From 5e35da91122db4353a20afadd2c1d48db5d11f86 Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Wed, 16 Aug 2017 10:54:49 +0200 Subject: [PATCH 14/14] Rename inputs to remove "CloudWatch" from the names --- .../org/graylog/aws/inputs/cloudwatch/CloudWatchLogsInput.java | 2 +- .../java/org/graylog/aws/inputs/flowlogs/FlowLogsInput.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/graylog/aws/inputs/cloudwatch/CloudWatchLogsInput.java b/src/main/java/org/graylog/aws/inputs/cloudwatch/CloudWatchLogsInput.java index 38afb981..12d3d3ad 100644 --- a/src/main/java/org/graylog/aws/inputs/cloudwatch/CloudWatchLogsInput.java +++ b/src/main/java/org/graylog/aws/inputs/cloudwatch/CloudWatchLogsInput.java @@ -14,7 +14,7 @@ import javax.inject.Inject; public class CloudWatchLogsInput extends MessageInput { - private static final String NAME = "AWS CloudWatch Logs"; + private static final String NAME = "AWS Logs"; @Inject public CloudWatchLogsInput(@Assisted Configuration configuration, diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogsInput.java b/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogsInput.java index 62554835..782baa32 100644 --- a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogsInput.java +++ b/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogsInput.java @@ -14,7 +14,7 @@ import javax.inject.Inject; public class FlowLogsInput extends MessageInput { - private static final String NAME = "AWS CloudWatch Flow Logs"; + private static final String NAME = "AWS Flow Logs"; @Inject public FlowLogsInput(@Assisted Configuration configuration,