Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CloudWatch related code and add logs input #43

Merged
merged 14 commits into from
Aug 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
<maven.deploy.skip>true</maven.deploy.skip>
<maven.site.skip>true</maven.site.skip>
<graylog.version>2.3.0</graylog.version>
<aws-java-sdk.version>1.11.98</aws-java-sdk.version>
<aws-java-sdk.version>1.11.174</aws-java-sdk.version>
<aws-kinesis-client.version>1.8.1</aws-kinesis-client.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -114,7 +115,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.7.4</version>
<version>${aws-kinesis-client.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
Expand All @@ -135,10 +136,9 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.9.10</version>
<scope>test</scope>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
</dependency>
</dependencies>

Expand Down
16 changes: 11 additions & 5 deletions src/main/java/org/graylog/aws/auth/AWSAuthProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import static com.google.common.base.Strings.isNullOrEmpty;

public class AWSAuthProvider implements AWSCredentialsProvider {
private static final Logger LOG = LoggerFactory.getLogger(AWSAuthProvider.class);

private AWSCredentialsProvider credentials;

public AWSAuthProvider(AWSPluginConfiguration config, String accessKey, String secretKey) {
if (accessKey != null && secretKey != null
&& !accessKey.isEmpty() && !secretKey.isEmpty()) {
public AWSAuthProvider(AWSPluginConfiguration config) {
this(config, null, null);
}

public AWSAuthProvider(AWSPluginConfiguration config, @Nullable String accessKey, @Nullable String secretKey) {
if (!isNullOrEmpty(accessKey) && !isNullOrEmpty(secretKey)) {
this.credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
LOG.debug("Using input specific config");
} else if (config.accessKey() != null && config.secretKey() != null
&& !config.accessKey().isEmpty() && !config.secretKey().isEmpty()) {
} else if (!isNullOrEmpty(config.accessKey()) && !isNullOrEmpty(config.secretKey())) {
this.credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials(config.accessKey(), config.secretKey()));
LOG.debug("Using AWS Plugin config");
} else {
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.graylog.aws.cloudwatch;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;

/**
* A collection of CloudWatch log events.
* <p/>
* Example payload:
* <pre>
* {
* "messageType": "DATA_MESSAGE",
* "owner": "123456789",
* "logGroup": "aws-plugin-test-flows",
* "logStream": "eni-aaaaaaaa-all",
* "subscriptionFilters": ["match-all"],
* "logEvents": [
* {
* "id": "33503748002479370955346306650196094071913271643270021120",
* "timestamp": 1502360020000,
* "message": "2 123456789 eni-aaaaaaaa 10.0.27.226 10.42.96.199 3604 17720 17 1 132 1502360020 1502360079 REJECT OK"
* },
* {
* "id": "33503748002479370955346306650196094071913271643270021127",
* "timestamp": 1502360020000,
* "message": "2 123456789 eni-aaaaaaaa 10.0.34.113 10.42.96.199 53421 17720 6 1 48 1502360020 1502360079 REJECT OK"
* }
* ]
* }
* </pre>
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class CloudWatchLogData {
@JsonProperty("logEvents")
public List<CloudWatchLogEvent> logEvents;
}
34 changes: 34 additions & 0 deletions src/main/java/org/graylog/aws/cloudwatch/CloudWatchLogEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.graylog.aws.cloudwatch;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects;

/**
* A single CloudWatch log event.
* <p/>
* Example payload:
* <pre>
* {
* "id": "33503748002479370955346306650196094071913271643270021120",
* "timestamp": 1502360020000,
* "message": "2 123456789 eni-aaaaaaaa 10.0.27.226 10.42.96.199 3604 17720 17 1 132 1502360020 1502360079 REJECT OK"
* }
* </pre>
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class CloudWatchLogEvent {
@JsonProperty("timestamp")
public long timestamp;

@JsonProperty("message")
public String message;

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("timestamp", timestamp)
.add("message", message)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.graylog.aws.inputs.flowlogs;
package org.graylog.aws.cloudwatch;

import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

public class FlowLogMessage {
private static final Logger LOG = LoggerFactory.getLogger(FlowLogMessage.class);

Expand Down Expand Up @@ -55,26 +57,31 @@ public FlowLogMessage(DateTime timestamp,
this.logStatus = logStatus;
}

public static FlowLogMessage fromParts(String[] parts) {
if(parts == null || parts.length != 15) {
throw new IllegalArgumentException("Message parts were null or not length of 15");
@Nullable
public static FlowLogMessage fromLogEvent(final CloudWatchLogEvent logEvent) {
final String[] parts = logEvent.message.split(" ");

if (parts.length != 14) {
LOG.warn("Received FlowLog message with not exactly 14 fields. Skipping. Message was: [{}]", logEvent.message);
return null;
}

return new FlowLogMessage(
new DateTime(Long.valueOf(parts[0])),
safeInteger(parts[1]),
new DateTime(Long.valueOf(logEvent.timestamp)),
safeInteger(parts[0]),
parts[1],
parts[2],
parts[3],
parts[4],
parts[5],
safeInteger(parts[5]),
safeInteger(parts[6]),
safeInteger(parts[7]),
safeInteger(parts[8]),
safeLong(parts[8]),
safeLong(parts[9]),
safeLong(parts[10]),
new DateTime(Long.valueOf(parts[10])*1000),
new DateTime(Long.valueOf(parts[11])*1000),
new DateTime(Long.valueOf(parts[12])*1000),
parts[13],
parts[14]
parts[12],
parts[13]
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,10 @@ public static Builder builder() {
return new AutoValue_AWSPluginConfiguration.Builder();
}

@JsonIgnore
public boolean isComplete() {
return accessKey() != null && secretKey() != null
&& !accessKey().isEmpty() && !secretKey().isEmpty();
}

@JsonIgnore
public List<Regions> getLookupRegions() {
if (lookupRegions() == null || lookupRegions().isEmpty()) {
return Collections.EMPTY_LIST;
return Collections.emptyList();
}

ImmutableList.Builder<Regions> builder = ImmutableList.<Regions>builder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.graylog.aws.inputs.cloudwatch;

import com.codahale.metrics.MetricRegistry;
import com.google.inject.assistedinject.Assisted;
import org.graylog.aws.inputs.codecs.CloudWatchRawLogCodec;
import org.graylog.aws.inputs.transports.KinesisTransport;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;

import javax.inject.Inject;

public class CloudWatchLogsInput extends MessageInput {
private static final String NAME = "AWS Logs";

@Inject
public CloudWatchLogsInput(@Assisted Configuration configuration,
MetricRegistry metricRegistry,
KinesisTransport.Factory transport,
LocalMetricRegistry localRegistry,
CloudWatchRawLogCodec.Factory codec,
Config config,
Descriptor descriptor,
ServerStatus serverStatus) {
super(
metricRegistry,
configuration,
transport.create(configuration),
localRegistry,
codec.create(configuration),
config,
descriptor,
serverStatus
);
}

@FactoryClass
public interface Factory extends MessageInput.Factory<CloudWatchLogsInput> {
@Override
CloudWatchLogsInput create(Configuration configuration);

@Override
Config getConfig();

@Override
Descriptor getDescriptor();
}

public static class Descriptor extends MessageInput.Descriptor {
public Descriptor() {
super(NAME, false, "");
}
}

@ConfigClass
public static class Config extends MessageInput.Config {
@Inject
public Config(KinesisTransport.Factory transport, CloudWatchRawLogCodec.Factory codec) {
super(transport.getConfig(), codec.getConfig());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,58 +1,48 @@
package org.graylog.aws.inputs.flowlogs;
package org.graylog.aws.inputs.codecs;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.assistedinject.Assisted;
import org.graylog.aws.AWS;
import org.graylog.aws.cloudwatch.CloudWatchLogEvent;
import org.graylog.aws.cloudwatch.FlowLogMessage;
import org.graylog.aws.inputs.flowlogs.IANAProtocolNumbers;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.journal.RawMessage;
import org.joda.time.Seconds;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;

public class FlowLogCodec implements Codec {
private static final Logger LOG = LoggerFactory.getLogger(FlowLogCodec.class);
public class CloudWatchFlowLogCodec extends CloudWatchLogDataCodec {
public static final String NAME = "AWSFlowLog";

private final Configuration configuration;
private final ObjectMapper objectMapper;

private final IANAProtocolNumbers protocolNumbers;

@Inject
public FlowLogCodec(@Assisted Configuration configuration, ObjectMapper objectMapper) {
public CloudWatchFlowLogCodec(@Assisted Configuration configuration) {
this.configuration = configuration;
this.objectMapper = objectMapper;

this.protocolNumbers = new IANAProtocolNumbers();
}

@Nullable
@Override
public Message decode(@Nonnull RawMessage rawMessage) {
public Message decodeLogData(@Nonnull final CloudWatchLogEvent logEvent) {
try {
String rawString = new String(rawMessage.getPayload());
String[] parts = rawString.split(" ");
final FlowLogMessage flowLogMessage = FlowLogMessage.fromLogEvent(logEvent);

if (parts.length != 15) {
LOG.warn("Received FlowLog message with not exactly 15 fields. Skipping. Message was: [{}]", rawString);
if (flowLogMessage == null) {
return null;
}

FlowLogMessage flowLogMessage = FlowLogMessage.fromParts(parts);

Message result = new Message(
final Message result = new Message(
buildSummary(flowLogMessage),
"aws-flowlogs",
flowLogMessage.getTimestamp()
Expand Down Expand Up @@ -113,9 +103,9 @@ public String getName() {
}

@FactoryClass
public interface Factory extends Codec.Factory<FlowLogCodec> {
public interface Factory extends Codec.Factory<CloudWatchFlowLogCodec> {
@Override
FlowLogCodec create(Configuration configuration);
CloudWatchFlowLogCodec create(Configuration configuration);

@Override
Config getConfig();
Expand Down
Loading