@@ -114,7 +115,7 @@
com.amazonaws
amazon-kinesis-client
- 1.7.4
+ ${aws-kinesis-client.version}
commons-logging
@@ -135,10 +136,9 @@
provided
- org.testng
- testng
- 6.9.10
- test
+ org.assertj
+ assertj-core
+ ${assertj-core.version}
diff --git a/src/main/java/org/graylog/aws/auth/AWSAuthProvider.java b/src/main/java/org/graylog/aws/auth/AWSAuthProvider.java
index ffb7fe63..af51b73f 100644
--- a/src/main/java/org/graylog/aws/auth/AWSAuthProvider.java
+++ b/src/main/java/org/graylog/aws/auth/AWSAuthProvider.java
@@ -10,18 +10,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+
public class AWSAuthProvider implements AWSCredentialsProvider {
private static final Logger LOG = LoggerFactory.getLogger(AWSAuthProvider.class);
private AWSCredentialsProvider credentials;
- public AWSAuthProvider(AWSPluginConfiguration config, String accessKey, String secretKey) {
- if (accessKey != null && secretKey != null
- && !accessKey.isEmpty() && !secretKey.isEmpty()) {
+ public AWSAuthProvider(AWSPluginConfiguration config) {
+ this(config, null, null);
+ }
+
+ public AWSAuthProvider(AWSPluginConfiguration config, @Nullable String accessKey, @Nullable String secretKey) {
+ if (!isNullOrEmpty(accessKey) && !isNullOrEmpty(secretKey)) {
this.credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
LOG.debug("Using input specific config");
- } else if (config.accessKey() != null && config.secretKey() != null
- && !config.accessKey().isEmpty() && !config.secretKey().isEmpty()) {
+ } else if (!isNullOrEmpty(config.accessKey()) && !isNullOrEmpty(config.secretKey())) {
this.credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials(config.accessKey(), config.secretKey()));
LOG.debug("Using AWS Plugin config");
} else {
diff --git a/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogData.java b/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogData.java
new file mode 100644
index 00000000..5f0dcae0
--- /dev/null
+++ b/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogData.java
@@ -0,0 +1,38 @@
+package org.graylog.aws.cloudwatch;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/**
+ * A collection of CloudWatch log events.
+ *
+ * Example payload:
+ *
+ * {
+ * "messageType": "DATA_MESSAGE",
+ * "owner": "123456789",
+ * "logGroup": "aws-plugin-test-flows",
+ * "logStream": "eni-aaaaaaaa-all",
+ * "subscriptionFilters": ["match-all"],
+ * "logEvents": [
+ * {
+ * "id": "33503748002479370955346306650196094071913271643270021120",
+ * "timestamp": 1502360020000,
+ * "message": "2 123456789 eni-aaaaaaaa 10.0.27.226 10.42.96.199 3604 17720 17 1 132 1502360020 1502360079 REJECT OK"
+ * },
+ * {
+ * "id": "33503748002479370955346306650196094071913271643270021127",
+ * "timestamp": 1502360020000,
+ * "message": "2 123456789 eni-aaaaaaaa 10.0.34.113 10.42.96.199 53421 17720 6 1 48 1502360020 1502360079 REJECT OK"
+ * }
+ * ]
+ * }
+ *
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CloudWatchLogData {
+ @JsonProperty("logEvents")
+ public List logEvents;
+}
diff --git a/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogEvent.java b/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogEvent.java
new file mode 100644
index 00000000..8c6cb3b3
--- /dev/null
+++ b/src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogEvent.java
@@ -0,0 +1,34 @@
+package org.graylog.aws.cloudwatch;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.MoreObjects;
+
+/**
+ * A single CloudWatch log event.
+ *
+ * Example payload:
+ *
+ * {
+ * "id": "33503748002479370955346306650196094071913271643270021120",
+ * "timestamp": 1502360020000,
+ * "message": "2 123456789 eni-aaaaaaaa 10.0.27.226 10.42.96.199 3604 17720 17 1 132 1502360020 1502360079 REJECT OK"
+ * }
+ *
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CloudWatchLogEvent {
+ @JsonProperty("timestamp")
+ public long timestamp;
+
+ @JsonProperty("message")
+ public String message;
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("timestamp", timestamp)
+ .add("message", message)
+ .toString();
+ }
+}
diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogMessage.java b/src/main/java/org/graylog/aws/cloudwatch/FlowLogMessage.java
similarity index 84%
rename from src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogMessage.java
rename to src/main/java/org/graylog/aws/cloudwatch/FlowLogMessage.java
index d1a7b79b..bcf873a1 100644
--- a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogMessage.java
+++ b/src/main/java/org/graylog/aws/cloudwatch/FlowLogMessage.java
@@ -1,9 +1,11 @@
-package org.graylog.aws.inputs.flowlogs;
+package org.graylog.aws.cloudwatch;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
public class FlowLogMessage {
private static final Logger LOG = LoggerFactory.getLogger(FlowLogMessage.class);
@@ -55,26 +57,31 @@ public FlowLogMessage(DateTime timestamp,
this.logStatus = logStatus;
}
- public static FlowLogMessage fromParts(String[] parts) {
- if(parts == null || parts.length != 15) {
- throw new IllegalArgumentException("Message parts were null or not length of 15");
+ @Nullable
+ public static FlowLogMessage fromLogEvent(final CloudWatchLogEvent logEvent) {
+ final String[] parts = logEvent.message.split(" ");
+
+ if (parts.length != 14) {
+ LOG.warn("Received FlowLog message with not exactly 14 fields. Skipping. Message was: [{}]", logEvent.message);
+ return null;
}
+
return new FlowLogMessage(
- new DateTime(Long.valueOf(parts[0])),
- safeInteger(parts[1]),
+ new DateTime(Long.valueOf(logEvent.timestamp)),
+ safeInteger(parts[0]),
+ parts[1],
parts[2],
parts[3],
parts[4],
- parts[5],
+ safeInteger(parts[5]),
safeInteger(parts[6]),
safeInteger(parts[7]),
- safeInteger(parts[8]),
+ safeLong(parts[8]),
safeLong(parts[9]),
- safeLong(parts[10]),
+ new DateTime(Long.valueOf(parts[10])*1000),
new DateTime(Long.valueOf(parts[11])*1000),
- new DateTime(Long.valueOf(parts[12])*1000),
- parts[13],
- parts[14]
+ parts[12],
+ parts[13]
);
}
diff --git a/src/main/java/org/graylog/aws/config/AWSPluginConfiguration.java b/src/main/java/org/graylog/aws/config/AWSPluginConfiguration.java
index ee5839bf..eac17718 100644
--- a/src/main/java/org/graylog/aws/config/AWSPluginConfiguration.java
+++ b/src/main/java/org/graylog/aws/config/AWSPluginConfiguration.java
@@ -64,16 +64,10 @@ public static Builder builder() {
return new AutoValue_AWSPluginConfiguration.Builder();
}
- @JsonIgnore
- public boolean isComplete() {
- return accessKey() != null && secretKey() != null
- && !accessKey().isEmpty() && !secretKey().isEmpty();
- }
-
@JsonIgnore
public List getLookupRegions() {
if (lookupRegions() == null || lookupRegions().isEmpty()) {
- return Collections.EMPTY_LIST;
+ return Collections.emptyList();
}
ImmutableList.Builder builder = ImmutableList.builder();
diff --git a/src/main/java/org/graylog/aws/inputs/cloudwatch/CloudWatchLogsInput.java b/src/main/java/org/graylog/aws/inputs/cloudwatch/CloudWatchLogsInput.java
new file mode 100644
index 00000000..12d3d3ad
--- /dev/null
+++ b/src/main/java/org/graylog/aws/inputs/cloudwatch/CloudWatchLogsInput.java
@@ -0,0 +1,65 @@
+package org.graylog.aws.inputs.cloudwatch;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.inject.assistedinject.Assisted;
+import org.graylog.aws.inputs.codecs.CloudWatchRawLogCodec;
+import org.graylog.aws.inputs.transports.KinesisTransport;
+import org.graylog2.plugin.LocalMetricRegistry;
+import org.graylog2.plugin.ServerStatus;
+import org.graylog2.plugin.configuration.Configuration;
+import org.graylog2.plugin.inputs.MessageInput;
+import org.graylog2.plugin.inputs.annotations.ConfigClass;
+import org.graylog2.plugin.inputs.annotations.FactoryClass;
+
+import javax.inject.Inject;
+
+public class CloudWatchLogsInput extends MessageInput {
+ private static final String NAME = "AWS Logs";
+
+ @Inject
+ public CloudWatchLogsInput(@Assisted Configuration configuration,
+ MetricRegistry metricRegistry,
+ KinesisTransport.Factory transport,
+ LocalMetricRegistry localRegistry,
+ CloudWatchRawLogCodec.Factory codec,
+ Config config,
+ Descriptor descriptor,
+ ServerStatus serverStatus) {
+ super(
+ metricRegistry,
+ configuration,
+ transport.create(configuration),
+ localRegistry,
+ codec.create(configuration),
+ config,
+ descriptor,
+ serverStatus
+ );
+ }
+
+ @FactoryClass
+ public interface Factory extends MessageInput.Factory {
+ @Override
+ CloudWatchLogsInput create(Configuration configuration);
+
+ @Override
+ Config getConfig();
+
+ @Override
+ Descriptor getDescriptor();
+ }
+
+ public static class Descriptor extends MessageInput.Descriptor {
+ public Descriptor() {
+ super(NAME, false, "");
+ }
+ }
+
+ @ConfigClass
+ public static class Config extends MessageInput.Config {
+ @Inject
+ public Config(KinesisTransport.Factory transport, CloudWatchRawLogCodec.Factory codec) {
+ super(transport.getConfig(), codec.getConfig());
+ }
+ }
+}
diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogCodec.java b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchFlowLogCodec.java
similarity index 76%
rename from src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogCodec.java
rename to src/main/java/org/graylog/aws/inputs/codecs/CloudWatchFlowLogCodec.java
index 23034edb..8c7254db 100644
--- a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogCodec.java
+++ b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchFlowLogCodec.java
@@ -1,8 +1,10 @@
-package org.graylog.aws.inputs.flowlogs;
+package org.graylog.aws.inputs.codecs;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.assistedinject.Assisted;
import org.graylog.aws.AWS;
+import org.graylog.aws.cloudwatch.CloudWatchLogEvent;
+import org.graylog.aws.cloudwatch.FlowLogMessage;
+import org.graylog.aws.inputs.flowlogs.IANAProtocolNumbers;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
@@ -10,10 +12,7 @@
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
-import org.graylog2.plugin.journal.RawMessage;
import org.joda.time.Seconds;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -21,38 +20,29 @@
import java.util.HashMap;
import java.util.Map;
-public class FlowLogCodec implements Codec {
- private static final Logger LOG = LoggerFactory.getLogger(FlowLogCodec.class);
+public class CloudWatchFlowLogCodec extends CloudWatchLogDataCodec {
public static final String NAME = "AWSFlowLog";
private final Configuration configuration;
- private final ObjectMapper objectMapper;
-
private final IANAProtocolNumbers protocolNumbers;
@Inject
- public FlowLogCodec(@Assisted Configuration configuration, ObjectMapper objectMapper) {
+ public CloudWatchFlowLogCodec(@Assisted Configuration configuration) {
this.configuration = configuration;
- this.objectMapper = objectMapper;
-
this.protocolNumbers = new IANAProtocolNumbers();
}
@Nullable
@Override
- public Message decode(@Nonnull RawMessage rawMessage) {
+ public Message decodeLogData(@Nonnull final CloudWatchLogEvent logEvent) {
try {
- String rawString = new String(rawMessage.getPayload());
- String[] parts = rawString.split(" ");
+ final FlowLogMessage flowLogMessage = FlowLogMessage.fromLogEvent(logEvent);
- if (parts.length != 15) {
- LOG.warn("Received FlowLog message with not exactly 15 fields. Skipping. Message was: [{}]", rawString);
+ if (flowLogMessage == null) {
return null;
}
- FlowLogMessage flowLogMessage = FlowLogMessage.fromParts(parts);
-
- Message result = new Message(
+ final Message result = new Message(
buildSummary(flowLogMessage),
"aws-flowlogs",
flowLogMessage.getTimestamp()
@@ -113,9 +103,9 @@ public String getName() {
}
@FactoryClass
- public interface Factory extends Codec.Factory {
+ public interface Factory extends Codec.Factory {
@Override
- FlowLogCodec create(Configuration configuration);
+ CloudWatchFlowLogCodec create(Configuration configuration);
@Override
Config getConfig();
diff --git a/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchLogDataCodec.java b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchLogDataCodec.java
new file mode 100644
index 00000000..549a40b7
--- /dev/null
+++ b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchLogDataCodec.java
@@ -0,0 +1,57 @@
+package org.graylog.aws.inputs.codecs;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.graylog.aws.cloudwatch.CloudWatchLogData;
+import org.graylog.aws.cloudwatch.CloudWatchLogEvent;
+import org.graylog2.plugin.Message;
+import org.graylog2.plugin.inputs.codecs.MultiMessageCodec;
+import org.graylog2.plugin.journal.RawMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+
+public abstract class CloudWatchLogDataCodec implements MultiMessageCodec {
+ private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogDataCodec.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ @Nullable
+ @Override
+ public Message decode(@Nonnull RawMessage rawMessage) {
+ throw new UnsupportedOperationException("MultiMessageCodec " + getClass() + " does not support decode()");
+ }
+
+ @Nullable
+ @Override
+ public Collection decodeMessages(@Nonnull RawMessage rawMessage) {
+ try {
+ final CloudWatchLogData data = OBJECT_MAPPER.readValue(rawMessage.getPayload(), CloudWatchLogData.class);
+ final List messages = new ArrayList<>(data.logEvents.size());
+
+ for (final CloudWatchLogEvent logEvent : data.logEvents) {
+ try {
+ final Message message = decodeLogData(logEvent);
+ if (message != null) {
+ messages.add(message);
+ }
+ } catch (Exception e) {
+ LOG.error("Couldn't decode log event <{}>", logEvent);
+ }
+ }
+
+ return messages;
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't deserialize log data", e);
+ }
+ }
+
+ @Nullable
+ protected abstract Message decodeLogData(@Nonnull final CloudWatchLogEvent event);
+}
diff --git a/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchRawLogCodec.java b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchRawLogCodec.java
new file mode 100644
index 00000000..1ca800db
--- /dev/null
+++ b/src/main/java/org/graylog/aws/inputs/codecs/CloudWatchRawLogCodec.java
@@ -0,0 +1,75 @@
+package org.graylog.aws.inputs.codecs;
+
+import com.google.inject.assistedinject.Assisted;
+import org.graylog.aws.cloudwatch.CloudWatchLogEvent;
+import org.graylog2.plugin.Message;
+import org.graylog2.plugin.configuration.Configuration;
+import org.graylog2.plugin.configuration.ConfigurationRequest;
+import org.graylog2.plugin.inputs.annotations.ConfigClass;
+import org.graylog2.plugin.inputs.annotations.FactoryClass;
+import org.graylog2.plugin.inputs.codecs.Codec;
+import org.graylog2.plugin.inputs.codecs.CodecAggregator;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+public class CloudWatchRawLogCodec extends CloudWatchLogDataCodec {
+ public static final String NAME = "AWSCloudWatchRawLog";
+
+ private final Configuration configuration;
+
+ @Inject
+ public CloudWatchRawLogCodec(@Assisted Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Nullable
+ @Override
+ public Message decodeLogData(@Nonnull final CloudWatchLogEvent logEvent) {
+ try {
+ return new Message(logEvent.message, "aws-raw-logs", new DateTime(logEvent.timestamp));
+ } catch (Exception e) {
+ throw new RuntimeException("Could not deserialize AWS FlowLog record.", e);
+ }
+ }
+
+ @Nonnull
+ @Override
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Nullable
+ @Override
+ public CodecAggregator getAggregator() {
+ return null;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @FactoryClass
+ public interface Factory extends Codec.Factory {
+ @Override
+ CloudWatchRawLogCodec create(Configuration configuration);
+
+ @Override
+ Config getConfig();
+ }
+
+ @ConfigClass
+ public static class Config implements Codec.Config {
+ @Override
+ public ConfigurationRequest getRequestedConfiguration() {
+ return new ConfigurationRequest();
+ }
+
+ @Override
+ public void overrideDefaultValues(@Nonnull ConfigurationRequest cr) {
+ }
+ }
+}
diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogReader.java b/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogReader.java
deleted file mode 100644
index 17c6545f..00000000
--- a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogReader.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package org.graylog.aws.inputs.flowlogs;
-
-import com.amazonaws.regions.Region;
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
-import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
-import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
-import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
-import com.amazonaws.services.kinesis.model.Record;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import okhttp3.HttpUrl;
-import org.graylog.aws.auth.AWSAuthProvider;
-import org.graylog.aws.config.AWSPluginConfiguration;
-import org.graylog.aws.config.Proxy;
-import org.graylog.aws.inputs.flowlogs.json.FlowLogKinesisEvent;
-import org.graylog.aws.inputs.flowlogs.json.RawFlowLog;
-import org.graylog2.plugin.Tools;
-import org.graylog2.plugin.cluster.ClusterConfigService;
-import org.graylog2.plugin.inputs.MessageInput;
-import org.graylog2.plugin.journal.RawMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FlowLogReader implements Runnable {
- private static final Logger LOG = LoggerFactory.getLogger(FlowLogReader.class);
-
- private final Region region;
- private final ObjectMapper objectMapper;
- private final MessageInput sourceInput;
- private final String kinesisStreamName;
- private final AWSAuthProvider authProvider;
- private final HttpUrl proxyUrl;
- private final AWSPluginConfiguration awsConfig;
-
-
- private Worker worker;
-
- public FlowLogReader(String kinesisStreamName,
- Region region,
- MessageInput input,
- ClusterConfigService configService,
- AWSAuthProvider authProvider,
- HttpUrl proxyUrl) {
- this.kinesisStreamName = kinesisStreamName;
- this.region = region;
- this.sourceInput = input;
- this.objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
- this.authProvider = authProvider;
- this.proxyUrl = proxyUrl;
-
- awsConfig = configService.get(AWSPluginConfiguration.class);
-
- LOG.info("Starting AWS FlowLog reader.");
- }
-
- // TODO metrics
- public void run() {
- final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(
- "graylog-aws-plugin",
- kinesisStreamName,
- this.authProvider,
- "graylog-server-master")
- .withRegionName(region.getName());
-
- // Optional HTTP proxy
- if(awsConfig.proxyEnabled() && this.proxyUrl != null) {
- config.withCommonClientConfig(Proxy.forAWS(this.proxyUrl));
- }
-
- final IRecordProcessorFactory recordProcessorFactory = () -> new IRecordProcessor() {
- @Override
- public void initialize(InitializationInput initializationInput) {
- LOG.info("Initializing Kinesis worker.");
- }
-
- @Override
- public void processRecords(ProcessRecordsInput processRecordsInput) {
- for (Record record : processRecordsInput.getRecords()) {
- try {
- LOG.debug("Received FlowLog events via Kinesis.");
-
- FlowLogKinesisEvent event = objectMapper.readValue(Tools.decompressGzip(record.getData().array()), FlowLogKinesisEvent.class);
- for(RawFlowLog flowlog : event.logEvents) {
- String fullMessage = flowlog.timestamp + " " + flowlog.message;
- sourceInput.processRawMessage(new RawMessage(fullMessage.getBytes()));
- }
- } catch(Exception e) {
- LOG.error("Could not read FlowLog record from Kinesis stream.", e);
- }
- }
- }
-
- @Override
- public void shutdown(ShutdownInput shutdownInput) {
- LOG.info("Shutting down Kinesis worker.");
- }
- };
-
- this.worker = new Worker.Builder()
- .recordProcessorFactory(recordProcessorFactory)
- .config(config)
- .build();
-
- worker.run();
- }
-
- public void stop() {
- if (worker != null) {
- worker.shutdown();
- }
- }
-
-}
diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogsInput.java b/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogsInput.java
index 08014c4e..782baa32 100644
--- a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogsInput.java
+++ b/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogsInput.java
@@ -2,6 +2,8 @@
import com.codahale.metrics.MetricRegistry;
import com.google.inject.assistedinject.Assisted;
+import org.graylog.aws.inputs.codecs.CloudWatchFlowLogCodec;
+import org.graylog.aws.inputs.transports.KinesisTransport;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
@@ -12,14 +14,14 @@
import javax.inject.Inject;
public class FlowLogsInput extends MessageInput {
- private static final String NAME = "AWS FlowLogs Input";
+ private static final String NAME = "AWS Flow Logs";
@Inject
public FlowLogsInput(@Assisted Configuration configuration,
MetricRegistry metricRegistry,
- FlowLogTransport.Factory transport,
+ KinesisTransport.Factory transport,
LocalMetricRegistry localRegistry,
- FlowLogCodec.Factory codec,
+ CloudWatchFlowLogCodec.Factory codec,
Config config,
Descriptor descriptor,
ServerStatus serverStatus) {
@@ -56,7 +58,7 @@ public Descriptor() {
@ConfigClass
public static class Config extends MessageInput.Config {
@Inject
- public Config(FlowLogTransport.Factory transport, FlowLogCodec.Factory codec) {
+ public Config(KinesisTransport.Factory transport, CloudWatchFlowLogCodec.Factory codec) {
super(transport.getConfig(), codec.getConfig());
}
}
diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/json/FlowLogKinesisEvent.java b/src/main/java/org/graylog/aws/inputs/flowlogs/json/FlowLogKinesisEvent.java
deleted file mode 100644
index fc10ea61..00000000
--- a/src/main/java/org/graylog/aws/inputs/flowlogs/json/FlowLogKinesisEvent.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.graylog.aws.inputs.flowlogs.json;
-
-import java.util.List;
-
-public class FlowLogKinesisEvent {
-
- public List logEvents;
-
-}
diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/json/RawFlowLog.java b/src/main/java/org/graylog/aws/inputs/flowlogs/json/RawFlowLog.java
deleted file mode 100644
index e206bb79..00000000
--- a/src/main/java/org/graylog/aws/inputs/flowlogs/json/RawFlowLog.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.graylog.aws.inputs.flowlogs.json;
-
-public class RawFlowLog {
-
- public long timestamp;
- public String message;
-
-}
diff --git a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogTransport.java b/src/main/java/org/graylog/aws/inputs/transports/KinesisTransport.java
similarity index 73%
rename from src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogTransport.java
rename to src/main/java/org/graylog/aws/inputs/transports/KinesisTransport.java
index 109aa925..4667bdfc 100644
--- a/src/main/java/org/graylog/aws/inputs/flowlogs/FlowLogTransport.java
+++ b/src/main/java/org/graylog/aws/inputs/transports/KinesisTransport.java
@@ -1,4 +1,4 @@
-package org.graylog.aws.inputs.flowlogs;
+package org.graylog.aws.inputs.transports;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
@@ -9,6 +9,7 @@
import okhttp3.HttpUrl;
import org.graylog.aws.auth.AWSAuthProvider;
import org.graylog.aws.config.AWSPluginConfiguration;
+import org.graylog.aws.kinesis.KinesisConsumer;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.cluster.ClusterConfigService;
import org.graylog2.plugin.configuration.Configuration;
@@ -23,6 +24,8 @@
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.transports.ThrottleableTransport;
import org.graylog2.plugin.inputs.transports.Transport;
+import org.graylog2.plugin.journal.RawMessage;
+import org.graylog2.plugin.system.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,10 +33,11 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.function.Consumer;
-public class FlowLogTransport implements Transport {
- private static final Logger LOG = LoggerFactory.getLogger(FlowLogTransport.class);
- public static final String NAME = "flowlog";
+public class KinesisTransport implements Transport {
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisTransport.class);
+ public static final String NAME = "awskinesis";
private static final String CK_AWS_REGION = "aws_region";
private static final String CK_ACCESS_KEY = "aws_access_key";
@@ -42,19 +46,22 @@ public class FlowLogTransport implements Transport {
private final Configuration configuration;
private final org.graylog2.Configuration graylogConfiguration;
+ private final NodeId nodeId;
private final LocalMetricRegistry localRegistry;
private final ClusterConfigService clusterConfigService;
- private FlowLogReader reader;
+ private KinesisConsumer reader;
@Inject
- public FlowLogTransport(@Assisted final Configuration configuration,
+ public KinesisTransport(@Assisted final Configuration configuration,
org.graylog2.Configuration graylogConfiguration,
final ClusterConfigService clusterConfigService,
+ final NodeId nodeId,
LocalMetricRegistry localRegistry) {
this.clusterConfigService = clusterConfigService;
this.configuration = configuration;
this.graylogConfiguration = graylogConfiguration;
+ this.nodeId = nodeId;
this.localRegistry = localRegistry;
}
@@ -62,28 +69,33 @@ public FlowLogTransport(@Assisted final Configuration configuration,
public void launch(MessageInput input) throws MisfireException {
ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(true)
- .setNameFormat("aws-flowlog-reader-%d")
- .setUncaughtExceptionHandler((t, e) -> LOG.error("Uncaught exception in AWS FlowLogs reader.", e))
+ .setNameFormat("aws-kinesis-reader-%d")
+ .setUncaughtExceptionHandler((t, e) -> LOG.error("Uncaught exception in AWS Kinesis reader.", e))
.build());
final AWSPluginConfiguration awsConfig = clusterConfigService.getOrDefault(AWSPluginConfiguration.class,
AWSPluginConfiguration.createDefault());
- AWSAuthProvider authProvider = new AWSAuthProvider(awsConfig, this.configuration.getString(CK_ACCESS_KEY), this.configuration.getString(CK_SECRET_KEY));
+ AWSAuthProvider authProvider = new AWSAuthProvider(awsConfig, configuration.getString(CK_ACCESS_KEY), configuration.getString(CK_SECRET_KEY));
- this.reader = new FlowLogReader(
- this.configuration.getString(CK_KINESIS_STREAM_NAME),
- Region.getRegion(Regions.fromName(this.configuration.getString(CK_AWS_REGION))),
- input,
- clusterConfigService,
+ this.reader = new KinesisConsumer(
+ configuration.getString(CK_KINESIS_STREAM_NAME),
+ Region.getRegion(Regions.fromName(configuration.getString(CK_AWS_REGION))),
+ kinesisCallback(input),
+ awsConfig,
authProvider,
- this.graylogConfiguration.getHttpProxyUri() == null ? null : HttpUrl.get(this.graylogConfiguration.getHttpProxyUri())
+ nodeId,
+ graylogConfiguration.getHttpProxyUri() == null ? null : HttpUrl.get(graylogConfiguration.getHttpProxyUri())
);
- LOG.info("Starting FlowLogs Kinesis reader thread.");
+ LOG.info("Starting Kinesis reader thread for input [{}/{}]", input.getName(), input.getId());
executor.submit(this.reader);
}
+ private Consumer kinesisCallback(final MessageInput input) {
+ return (data) -> input.processRawMessage(new RawMessage(data));
+ }
+
@Override
public void stop() {
if(this.reader != null) {
@@ -102,9 +114,9 @@ public MetricSet getMetricSet() {
}
@FactoryClass
- public interface Factory extends Transport.Factory {
+ public interface Factory extends Transport.Factory {
@Override
- FlowLogTransport create(Configuration configuration);
+ KinesisTransport create(Configuration configuration);
@Override
Config getConfig();
@@ -126,7 +138,7 @@ public ConfigurationRequest getRequestedConfiguration() {
"AWS Region",
Regions.US_EAST_1.getName(),
regions,
- "The AWS region the FlowLogs are stored in.",
+ "The AWS region the Kinesis stream is running in.",
ConfigurationField.Optional.NOT_OPTIONAL
));
@@ -151,7 +163,7 @@ public ConfigurationRequest getRequestedConfiguration() {
CK_KINESIS_STREAM_NAME,
"Kinesis Stream name",
"",
- "The name of the Kinesis Stream that receives your FlowLog messages. See README for instructions on how to connect FlowLogs to a Kinesis Stream.",
+ "The name of the Kinesis stream that receives your messages. See README for instructions on how to connect messages to a Kinesis Stream.",
ConfigurationField.Optional.NOT_OPTIONAL
));
diff --git a/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java b/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java
new file mode 100644
index 00000000..bd4ce1ad
--- /dev/null
+++ b/src/main/java/org/graylog/aws/kinesis/KinesisConsumer.java
@@ -0,0 +1,177 @@
+package org.graylog.aws.kinesis;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import com.amazonaws.services.kinesis.model.Record;
+import com.github.rholder.retry.Attempt;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import okhttp3.HttpUrl;
+import org.graylog.aws.auth.AWSAuthProvider;
+import org.graylog.aws.config.AWSPluginConfiguration;
+import org.graylog.aws.config.Proxy;
+import org.graylog2.plugin.Tools;
+import org.graylog2.plugin.system.NodeId;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Locale;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import static java.util.Objects.requireNonNull;
+
+public class KinesisConsumer implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
+
+ private final Region region;
+ private final String kinesisStreamName;
+ private final AWSAuthProvider authProvider;
+ private final NodeId nodeId;
+ private final HttpUrl proxyUrl;
+ private final AWSPluginConfiguration awsConfig;
+ private final Consumer dataHandler;
+
+ private Worker worker;
+
+ public KinesisConsumer(String kinesisStreamName,
+ Region region,
+ Consumer dataHandler,
+ AWSPluginConfiguration awsConfig,
+ AWSAuthProvider authProvider,
+ NodeId nodeId,
+ @Nullable HttpUrl proxyUrl) {
+ this.kinesisStreamName = requireNonNull(kinesisStreamName, "kinesisStreamName");
+ this.region = requireNonNull(region, "region");
+ this.dataHandler = requireNonNull(dataHandler, "dataHandler");
+ this.awsConfig = requireNonNull(awsConfig, "awsConfig");
+ this.authProvider = requireNonNull(authProvider, "authProvider");
+ this.nodeId = requireNonNull(nodeId, "nodeId");
+ this.proxyUrl = proxyUrl;
+ }
+
+ // TODO metrics
+ public void run() {
+ final String workerId = String.format(Locale.ENGLISH, "graylog-node-%s", nodeId.anonymize());
+ final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(
+ // The application name needs to be unique per input. Using the same name for two different Kinesis
+ // streams will cause trouble with state handling in DynamoDB. (used by the Kinesis client under the
+ // hood to keep state)
+ String.format(Locale.ENGLISH, "graylog-aws-plugin-%s", kinesisStreamName),
+ kinesisStreamName,
+ authProvider,
+ workerId
+ ).withRegionName(region.getName());
+
+ // Optional HTTP proxy
+ if (awsConfig.proxyEnabled() && proxyUrl != null) {
+ config.withCommonClientConfig(Proxy.forAWS(proxyUrl));
+ }
+
+ final IRecordProcessorFactory recordProcessorFactory = () -> new IRecordProcessor() {
+ private DateTime lastCheckpoint = DateTime.now();
+
+ @Override
+ public void initialize(InitializationInput initializationInput) {
+ LOG.info("Initializing Kinesis worker for stream <{}>", kinesisStreamName);
+ }
+
+ @Override
+ public void processRecords(ProcessRecordsInput processRecordsInput) {
+ LOG.debug("Received {} Kinesis events", processRecordsInput.getRecords().size());
+
+ for (Record record : processRecordsInput.getRecords()) {
+ try {
+ // Create a read-only view of the data and use a safe method to convert it to a byte array
+ // as documented in Record#getData(). (using ByteBuffer#array() can fail)
+ final ByteBuffer dataBuffer = record.getData().asReadOnlyBuffer();
+ final byte[] dataBytes = new byte[dataBuffer.remaining()];
+ dataBuffer.get(dataBytes);
+
+ dataHandler.accept(Tools.decompressGzip(dataBytes).getBytes());
+ } catch (Exception e) {
+ LOG.error("Couldn't read Kinesis record from stream <{}>", kinesisStreamName, e);
+ }
+ }
+
+ // According to the Kinesis client documentation, we should not checkpoint for every record but
+ // rather periodically.
+ // TODO: Make interval configurable (global)
+ if (lastCheckpoint.plusMinutes(1).isBeforeNow()) {
+ lastCheckpoint = DateTime.now();
+ LOG.debug("Checkpointing stream <{}>", kinesisStreamName);
+ checkpoint(processRecordsInput);
+ }
+ }
+
+ private void checkpoint(ProcessRecordsInput processRecordsInput) {
+ final Retryer retryer = RetryerBuilder.newBuilder()
+ .retryIfExceptionOfType(ThrottlingException.class)
+ .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
+ .withStopStrategy(StopStrategies.stopAfterDelay(10, TimeUnit.MINUTES))
+ .withRetryListener(new RetryListener() {
+ @Override
+ public void onRetry(Attempt attempt) {
+ if (attempt.hasException()) {
+ LOG.warn("Checkpointing stream <{}> failed, retrying. (attempt {})", kinesisStreamName, attempt.getAttemptNumber());
+ }
+ }
+ })
+ .build();
+
+ try {
+ retryer.call(() -> {
+ try {
+ processRecordsInput.getCheckpointer().checkpoint();
+ } catch (InvalidStateException e) {
+ LOG.error("Couldn't save checkpoint to DynamoDB table used by the Kinesis client library - check database table", e);
+ } catch (ShutdownException e) {
+ LOG.debug("Processor is shutting down, skipping checkpoint");
+ }
+ return null;
+ });
+ } catch (ExecutionException e) {
+ LOG.error("Couldn't checkpoint stream <{}>", kinesisStreamName, e);
+ } catch (RetryException e) {
+ LOG.error("Checkpoint retry for stream <{}> finally failed", kinesisStreamName, e);
+ }
+ }
+
+ @Override
+ public void shutdown(ShutdownInput shutdownInput) {
+ LOG.info("Shutting down Kinesis worker for stream <{}>", kinesisStreamName);
+ }
+ };
+
+ this.worker = new Worker.Builder()
+ .recordProcessorFactory(recordProcessorFactory)
+ .config(config)
+ .build();
+
+ worker.run();
+ }
+
+ public void stop() {
+ if (worker != null) {
+ worker.shutdown();
+ }
+ }
+
+}
diff --git a/src/main/java/org/graylog/aws/plugin/AWSModule.java b/src/main/java/org/graylog/aws/plugin/AWSModule.java
index b51eebe6..af3bf85d 100644
--- a/src/main/java/org/graylog/aws/plugin/AWSModule.java
+++ b/src/main/java/org/graylog/aws/plugin/AWSModule.java
@@ -3,26 +3,33 @@
import org.graylog.aws.inputs.cloudtrail.CloudTrailInput;
import org.graylog.aws.inputs.cloudtrail.CloudTrailCodec;
import org.graylog.aws.inputs.cloudtrail.CloudTrailTransport;
-import org.graylog.aws.inputs.flowlogs.FlowLogCodec;
-import org.graylog.aws.inputs.flowlogs.FlowLogTransport;
+import org.graylog.aws.inputs.cloudwatch.CloudWatchLogsInput;
+import org.graylog.aws.inputs.codecs.CloudWatchRawLogCodec;
+import org.graylog.aws.inputs.codecs.CloudWatchFlowLogCodec;
+import org.graylog.aws.inputs.transports.KinesisTransport;
import org.graylog.aws.inputs.flowlogs.FlowLogsInput;
import org.graylog.aws.processors.instancelookup.AWSInstanceNameLookupProcessor;
+import org.graylog.aws.processors.instancelookup.InstanceLookupTable;
import org.graylog2.plugin.PluginModule;
public class AWSModule extends PluginModule {
@Override
protected void configure() {
- // CloudTrail input
+ // CloudTrail
addCodec(CloudTrailCodec.NAME, CloudTrailCodec.class);
addTransport(CloudTrailTransport.NAME, CloudTrailTransport.class);
addMessageInput(CloudTrailInput.class);
- // FlowLog input
- addCodec(FlowLogCodec.NAME, FlowLogCodec.class);
- addTransport(FlowLogTransport.NAME, FlowLogTransport.class);
+ // CloudWatch
+ addCodec(CloudWatchFlowLogCodec.NAME, CloudWatchFlowLogCodec.class);
+ addCodec(CloudWatchRawLogCodec.NAME, CloudWatchRawLogCodec.class);
+ addTransport(KinesisTransport.NAME, KinesisTransport.class);
addMessageInput(FlowLogsInput.class);
+ addMessageInput(CloudWatchLogsInput.class);
// Instance name lookup
addMessageProcessor(AWSInstanceNameLookupProcessor.class, AWSInstanceNameLookupProcessor.Descriptor.class);
+
+ bind(InstanceLookupTable.class).asEagerSingleton();
}
}
diff --git a/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java b/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java
index a67caf8a..9181fd2d 100644
--- a/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java
+++ b/src/main/java/org/graylog/aws/processors/instancelookup/AWSInstanceNameLookupProcessor.java
@@ -1,11 +1,11 @@
package org.graylog.aws.processors.instancelookup;
-import com.amazonaws.auth.BasicAWSCredentials;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import okhttp3.HttpUrl;
import org.graylog.aws.AWS;
+import org.graylog.aws.auth.AWSAuthProvider;
import org.graylog.aws.config.AWSPluginConfiguration;
import org.graylog2.Configuration;
import org.graylog2.plugin.Message;
@@ -49,27 +49,31 @@ public String className() {
@Inject
public AWSInstanceNameLookupProcessor(ClusterConfigService clusterConfigService,
+ InstanceLookupTable instanceLookupTable,
MetricRegistry metricRegistry,
Configuration configuration) {
this.metricRegistry = metricRegistry;
- this.table = InstanceLookupTable.getInstance();
+ this.table = instanceLookupTable;
Runnable refresh = new Runnable() {
@Override
public void run() {
try {
- config = clusterConfigService.get(AWSPluginConfiguration.class);
+ config = clusterConfigService.getOrDefault(AWSPluginConfiguration.class,
+ AWSPluginConfiguration.createDefault());
- if(config == null || !config.isComplete()) {
- LOG.warn("AWS plugin is not fully configured. No instance lookups will happen.");
+ if (!config.lookupsEnabled()) {
+ LOG.debug("AWS instance name lookups are disabled.");
return;
}
- if (!config.lookupsEnabled()) {
- LOG.debug("AWS instance name lookups are disabled.");
+ if (config.lookupsEnabled() && config.getLookupRegions().isEmpty()) {
+ LOG.warn("AWS region configuration is not complete. No instance lookups will happen.");
return;
}
+ final AWSAuthProvider awsAuthProvider = new AWSAuthProvider(config);
+
LOG.debug("Refreshing AWS instance lookup table.");
final HttpUrl proxyUrl = config.proxyEnabled() && configuration.getHttpProxyUri() != null
@@ -77,7 +81,7 @@ public void run() {
table.reload(
config.getLookupRegions(),
- new BasicAWSCredentials(config.accessKey(), config.secretKey()),
+ awsAuthProvider,
proxyUrl
);
} catch (Exception e) {
diff --git a/src/main/java/org/graylog/aws/processors/instancelookup/InstanceLookupTable.java b/src/main/java/org/graylog/aws/processors/instancelookup/InstanceLookupTable.java
index a1960a8e..fc8da8e8 100644
--- a/src/main/java/org/graylog/aws/processors/instancelookup/InstanceLookupTable.java
+++ b/src/main/java/org/graylog/aws/processors/instancelookup/InstanceLookupTable.java
@@ -1,7 +1,7 @@
package org.graylog.aws.processors.instancelookup;
-import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.regions.Regions;
+import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import com.amazonaws.services.ec2.model.DescribeInstancesResult;
import com.amazonaws.services.ec2.model.DescribeNetworkInterfacesResult;
@@ -14,17 +14,18 @@
import com.amazonaws.services.ec2.model.Tag;
import com.google.common.collect.ImmutableMap;
import okhttp3.HttpUrl;
+import org.graylog.aws.auth.AWSAuthProvider;
import org.graylog.aws.config.Proxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.inject.Singleton;
import java.util.List;
+@Singleton
public class InstanceLookupTable {
private static final Logger LOG = LoggerFactory.getLogger(InstanceLookupTable.class);
- private static InstanceLookupTable INSTANCE = new InstanceLookupTable();
-
enum InstanceType {
RDS,
EC2,
@@ -39,13 +40,7 @@ enum InstanceType {
// TODO METRICS
- public static InstanceLookupTable getInstance() {
- return INSTANCE;
- }
-
- private InstanceLookupTable() { /* nope */ }
-
- public void reload(List regions, AWSCredentials credentials, HttpUrl proxyUrl) {
+ public void reload(List regions, AWSAuthProvider awsAuthProvider, HttpUrl proxyUrl) {
LOG.info("Reloading AWS instance lookup table.");
ImmutableMap.Builder ec2InstancesBuilder = ImmutableMap.builder();
@@ -53,16 +48,21 @@ public void reload(List regions, AWSCredentials credentials, HttpUrl pr
for (Regions region : regions) {
try {
- AmazonEC2Client ec2Client;
+ AmazonEC2 ec2Client;
if(proxyUrl != null) {
- ec2Client = new AmazonEC2Client(credentials, Proxy.forAWS(proxyUrl));
+ ec2Client = AmazonEC2Client.builder()
+ .withCredentials(awsAuthProvider)
+ .withRegion(region)
+ .withClientConfiguration(Proxy.forAWS(proxyUrl))
+ .build();
} else {
- ec2Client = new AmazonEC2Client(credentials);
+ ec2Client = AmazonEC2Client.builder()
+ .withCredentials(awsAuthProvider)
+ .withRegion(region)
+ .build();
}
- ec2Client.configureRegion(region);
-
// Load network interfaces
LOG.debug("Requesting AWS network interface descriptions in [{}].", region.getName());
DescribeNetworkInterfacesResult interfaces = ec2Client.describeNetworkInterfaces();
diff --git a/src/test/java/org/graylog/aws/config/AWSPluginConfigurationTest.java b/src/test/java/org/graylog/aws/config/AWSPluginConfigurationTest.java
new file mode 100644
index 00000000..239f4778
--- /dev/null
+++ b/src/test/java/org/graylog/aws/config/AWSPluginConfigurationTest.java
@@ -0,0 +1,29 @@
+package org.graylog.aws.config;
+
+import com.amazonaws.regions.Regions;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.graylog.aws.config.AWSPluginConfiguration.createDefault;
+
+public class AWSPluginConfigurationTest {
+ @Test
+ public void lookupRegions() throws Exception {
+ final AWSPluginConfiguration config = createDefault()
+ .toBuilder()
+ .lookupRegions("us-west-1,eu-west-1 , us-east-1 ")
+ .build();
+
+ assertThat(config.getLookupRegions()).containsExactly(Regions.US_WEST_1, Regions.EU_WEST_1, Regions.US_EAST_1);
+ }
+
+ @Test
+ public void lookupRegionsWithEmptyValue() throws Exception {
+ final AWSPluginConfiguration config = createDefault()
+ .toBuilder()
+ .lookupRegions("")
+ .build();
+
+ assertThat(config.getLookupRegions()).isEmpty();
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/graylog/aws/inputs/cloudtrail/notifications/CloudtrailSNSNotificationParserTest.java b/src/test/java/org/graylog/aws/inputs/cloudtrail/notifications/CloudtrailSNSNotificationParserTest.java
index 9fa0387e..ee32028f 100644
--- a/src/test/java/org/graylog/aws/inputs/cloudtrail/notifications/CloudtrailSNSNotificationParserTest.java
+++ b/src/test/java/org/graylog/aws/inputs/cloudtrail/notifications/CloudtrailSNSNotificationParserTest.java
@@ -1,11 +1,11 @@
package org.graylog.aws.inputs.cloudtrail.notifications;
import com.amazonaws.services.sqs.model.Message;
-import org.testng.annotations.Test;
+import org.junit.Test;
import java.util.List;
-import static org.testng.Assert.assertEquals;
+import static org.junit.Assert.assertEquals;
public class CloudtrailSNSNotificationParserTest {
diff --git a/src/test/java/org/graylog/aws/inputs/flowlogs/FlowLogMessageTest.java b/src/test/java/org/graylog/aws/inputs/flowlogs/FlowLogMessageTest.java
index 44f59639..94087f2b 100644
--- a/src/test/java/org/graylog/aws/inputs/flowlogs/FlowLogMessageTest.java
+++ b/src/test/java/org/graylog/aws/inputs/flowlogs/FlowLogMessageTest.java
@@ -1,33 +1,36 @@
package org.graylog.aws.inputs.flowlogs;
-import org.testng.annotations.Test;
+import org.graylog.aws.cloudwatch.CloudWatchLogEvent;
+import org.graylog.aws.cloudwatch.FlowLogMessage;
+import org.junit.Test;
-import static org.testng.Assert.*;
+import static org.junit.Assert.assertEquals;
public class FlowLogMessageTest {
@Test
public void testFromPartsDoesNotFailWithMissingIntegerFields() throws Exception {
- FlowLogMessage m = FlowLogMessage.fromParts(
- new String[]{
- "0",
- "-",
- "foo",
- "eth0",
- "127.0.0.1",
- "127.0.0.1",
- "-",
- "-",
- "-",
- "100",
- "100",
- "0",
- "0",
- "ACCEPT",
- "OK"
- }
- );
+ final CloudWatchLogEvent logEvent = new CloudWatchLogEvent();
+ final String[] strings = {
+ "-",
+ "foo",
+ "eth0",
+ "127.0.0.1",
+ "127.0.0.1",
+ "-",
+ "-",
+ "-",
+ "100",
+ "100",
+ "0",
+ "0",
+ "ACCEPT",
+ "OK"
+ };
+ logEvent.message = String.join(" ", strings);
+
+ FlowLogMessage m = FlowLogMessage.fromLogEvent(logEvent);
assertEquals(m.getDestinationPort(), 0);
assertEquals(m.getSourcePort(), 0);
@@ -37,25 +40,25 @@ public void testFromPartsDoesNotFailWithMissingIntegerFields() throws Exception
@Test
public void testFromPartsDoesNotFailWithMissingLongFields() throws Exception {
- FlowLogMessage m = FlowLogMessage.fromParts(
- new String[]{
- "0",
- "1",
- "foo",
- "eth0",
- "127.0.0.1",
- "127.0.0.1",
- "80",
- "80",
- "1",
- "-",
- "-",
- "0",
- "0",
- "ACCEPT",
- "OK"
- }
- );
+ final String[] strings = {
+ "1",
+ "foo",
+ "eth0",
+ "127.0.0.1",
+ "127.0.0.1",
+ "80",
+ "80",
+ "1",
+ "-",
+ "-",
+ "0",
+ "0",
+ "ACCEPT",
+ "OK"
+ };
+ final CloudWatchLogEvent logEvent = new CloudWatchLogEvent();
+ logEvent.message = String.join(" ", strings);
+ FlowLogMessage m = FlowLogMessage.fromLogEvent(logEvent);
assertEquals(m.getBytes(), 0);
assertEquals(m.getPackets(), 0);