Skip to content
This repository has been archived by the owner on Mar 31, 2018. It is now read-only.

Commit

Permalink
Add the ability to extract JSON from extracted fields even if the fie…
Browse files Browse the repository at this point in the history
…ld starts with a non-JSON string
  • Loading branch information
Daniel Vassallo committed Aug 3, 2015
1 parent 39acfc7 commit 9eef3cd
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 36 deletions.
2 changes: 1 addition & 1 deletion configuration/cloudformation/cwl-elasticsearch.template
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"Mappings" : {
"Constants" : {
"S3DownloadPath" : { "Value": "aws-cloudwatch/downloads/cloudwatch-logs-subscription-consumer" },
"S3DownloadFile" : { "Value": "cloudwatch-logs-subscription-consumer-1.1.0" }
"S3DownloadFile" : { "Value": "cloudwatch-logs-subscription-consumer-1.2.0" }
},

"AWSInstanceType2Arch" : {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.amazonaws</groupId>
<artifactId>cloudwatch-logs-subscription-consumer</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
<name>CloudWatch Logs Subscription Consumer</name>
<description>The CloudWatch Logs Subscription Consumer helps Java developers consume a real-time feed of CloudWatch Logs data for custom processing, analysis, or loading to other systems.</description>
<url>https://aws.amazon.com/cloudwatch</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,30 @@ private JSONObject getFields(String message, Map<String, String> extractedFields
if (value == null) {
// nothing to add
continue;
} else if (ElasticsearchTransformerUtils.isMessageValidJson(value)) {
// the field value is valid json - put it as a nested object
extractedFieldsInJson.put(JSON_FIELD_NAME_PREFIX + fieldName, new JSONObject(value));
} else if (StringUtils.isNumeric(value)) {
}

if (StringUtils.isNumeric(value)) {
// the field value is a number - put it as a double
extractedFieldsInJson.put(fieldName, Double.parseDouble(value));
} else {
// else put the field as a string
extractedFieldsInJson.put(fieldName, value);
continue;
}

String jsonSubString = ElasticsearchTransformerUtils.extractJson(value);

if (jsonSubString != null) {
// the field value contains valid json - copy the json to a new Elasticsearch object field
extractedFieldsInJson.put(JSON_FIELD_NAME_PREFIX + fieldName, new JSONObject(jsonSubString));
}

// put the raw extracted field as a string
extractedFieldsInJson.put(fieldName, value);
}

return extractedFieldsInJson;
}

// if the message is valid JSON, use the message as is for Elasticsearch fields
if (ElasticsearchTransformerUtils.isMessageValidJson(message)) {
if (ElasticsearchTransformerUtils.extractJson(message) != null) {
return new JSONObject(message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,27 @@ public class ElasticsearchTransformerUtils {

private ElasticsearchTransformerUtils() {}

public static boolean isMessageValidJson(String message) {
/**
* Checks if the substring from the first occurrence of { is valid json and
* returns the substring. Otherwise returns null.
*/
public static String extractJson(String message) {
int jsonStart = message.indexOf("{");

if (jsonStart < 0) {
return null;
}

String jsonSubString = message.substring(jsonStart);

try {
JsonNode rootNode = JSON_OBJECT_MAPPER.readTree(message);
JsonNode rootNode = JSON_OBJECT_MAPPER.readTree(jsonSubString);
if (rootNode.isValueNode()) {
return false;
return null;
}
} catch (IOException e) {
return false;
return null;
}
return true;
return jsonSubString;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ public void transformLambdaLog() throws IOException {

assertEquals("2015-01-13T02:28:53.213Z", sourceNode.get("timestamp").asText());
assertEquals("c342155b-1ec0-11e5-b0e2-f317438eb2f6", sourceNode.get("request_id").asText());

assertEquals("{ \"key1\": 100, \"key2\": \"value\", \"key3\": { \"key4\": \"level2\" } }",
sourceNode.get("event").asText());
assertEquals(100, sourceNode.get("$event").get("key1").asLong());
assertEquals("value", sourceNode.get("$event").get("key2").asText());
assertEquals("level2", sourceNode.get("$event").get("key3").get("key4").asText());
Expand All @@ -219,5 +222,25 @@ public void transformLambdaLog() throws IOException {
assertEquals("2015-01-13T02:29:03.456Z", sourceNode.get("timestamp").asText());
assertEquals("c342155b-1ec0-11e5-b0e2-f317438eb2f6", sourceNode.get("request_id").asText());
assertEquals("Hello World", sourceNode.get("event").asText());

// event 3
sourceNode = JSON_OBJECT_MAPPER.readTree(new StringReader(elasticsearchDocuments.get(2).getSource()));

assertEquals("49545295115971876468408574808419866449919668408574808465", sourceNode.get("@id").asText());
assertEquals(1421200848954L, sourceNode.get("@timestamp").asLong());
assertEquals("123456789012", sourceNode.get("@owner").asText());
assertEquals("/aws/lambda/HelloWorld", sourceNode.get("@log_group").asText());
assertEquals("2015/06/30/1f77bc4743204b22b0d42cf3b85f40c7", sourceNode.get("@log_stream").asText());
assertEquals(
"2015-01-14T02:00:48.954Z c342155b-1ec0-11e5-b0e2-f317438eb2f6 Received event: { \"key1\": 100, \"key2\": \"value\", \"key3\": { \"key4\": \"level2\" } }",
sourceNode.get("@message").asText());

assertEquals("2015-01-14T02:00:48.954Z", sourceNode.get("timestamp").asText());
assertEquals("c342155b-1ec0-11e5-b0e2-f317438eb2f6", sourceNode.get("request_id").asText());
assertEquals("Received event: { \"key1\": 100, \"key2\": \"value\", \"key3\": { \"key4\": \"level2\" } }",
sourceNode.get("event").asText());
assertEquals(100, sourceNode.get("$event").get("key1").asLong());
assertEquals("value", sourceNode.get("$event").get("key2").asText());
assertEquals("level2", sourceNode.get("$event").get("key3").get("key4").asText());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,43 @@
*/
package com.amazonaws.services.logs.connectors.elasticsearch;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import org.junit.Test;

public class ElasticsearchTransformerUtilsTest {

@Test
public void isMessageValidJson() {
assertFalse(ElasticsearchTransformerUtils.isMessageValidJson("2"));
assertFalse(ElasticsearchTransformerUtils.isMessageValidJson("\"value\""));
assertFalse(ElasticsearchTransformerUtils.isMessageValidJson("100 \"value\""));
assertFalse(ElasticsearchTransformerUtils.isMessageValidJson("{ \"key\": 100"));
assertFalse(ElasticsearchTransformerUtils.isMessageValidJson("{ \"key\": \"100\""));
assertFalse(ElasticsearchTransformerUtils.isMessageValidJson("{ \"tree\": { \"key1\": \"100\" \"key2\": \"200\" } }"));
assertFalse(ElasticsearchTransformerUtils.isMessageValidJson("outside { \"tree\": { \"key1\": \"100\" \"key2\": \"200\" } }"));
assertFalse(ElasticsearchTransformerUtils.isMessageValidJson("{ \"tree\": { \"key1\": \"100\" \"key2\": \"200\" } } outside"));
assertFalse(ElasticsearchTransformerUtils.isMessageValidJson("{ \"array\": [ }"));
public void extractJson() {
assertNull(ElasticsearchTransformerUtils.extractJson("2"));
assertNull(ElasticsearchTransformerUtils.extractJson("\"value\""));
assertNull(ElasticsearchTransformerUtils.extractJson("100 \"value\""));
assertNull(ElasticsearchTransformerUtils.extractJson("{ \"key\": 100"));
assertNull(ElasticsearchTransformerUtils.extractJson("{ \"key\": \"100\""));
assertNull(ElasticsearchTransformerUtils.extractJson("{ \"tree\": { \"key1\": \"100\" \"key2\": \"200\" } }"));
assertNull(ElasticsearchTransformerUtils.extractJson("outside { \"tree\": { \"key1\": \"100\" \"key2\": \"200\" } }"));
assertNull(ElasticsearchTransformerUtils.extractJson("{ \"tree\": { \"key1\": \"100\" \"key2\": \"200\" } } outside"));
assertNull(ElasticsearchTransformerUtils.extractJson("{ \"array\": [ }"));

assertTrue(ElasticsearchTransformerUtils.isMessageValidJson("{ \"key\": 100 }"));
assertTrue(ElasticsearchTransformerUtils.isMessageValidJson("{ \"key\": \"100\" }"));
assertTrue(ElasticsearchTransformerUtils.isMessageValidJson("{ \"key1\": \"100\", \"key2\": \"200\" }"));
assertTrue(ElasticsearchTransformerUtils.isMessageValidJson("{ \"tree\": { \"key1\": \"100\", \"key2\": \"200\" } }"));
assertTrue(ElasticsearchTransformerUtils.isMessageValidJson("{ \"tree\": \n{ \"key1\": \"100\", \"key2\": \"200\" } }"));
assertTrue(ElasticsearchTransformerUtils.isMessageValidJson(" { \"tree\":\n \n{ \"key1\": \"100\",\n \"key2\": \"200\" }\n }\n "));
assertTrue(ElasticsearchTransformerUtils.isMessageValidJson("{ \"array\": [ { \"key1\": \"100\", \"key2\": \"200\" } ] }"));
assertTrue(ElasticsearchTransformerUtils.isMessageValidJson("{ \"array\": [ { \"key1\": \"100\" }, { \"key2\": \"200\" } ] }"));
assertTrue(ElasticsearchTransformerUtils.isMessageValidJson("{ \"array\": [] }"));
assertNotNull(ElasticsearchTransformerUtils.extractJson("{ \"key\": 100 }"));
assertNotNull(ElasticsearchTransformerUtils.extractJson("{ \"key\": \"100\" }"));
assertNotNull(ElasticsearchTransformerUtils.extractJson("{ \"key1\": \"100\", \"key2\": \"200\" }"));
assertNotNull(ElasticsearchTransformerUtils.extractJson("{ \"tree\": { \"key1\": \"100\", \"key2\": \"200\" } }"));
assertNotNull(ElasticsearchTransformerUtils.extractJson("{ \"tree\": \n{ \"key1\": \"100\", \"key2\": \"200\" } }"));
assertNotNull(ElasticsearchTransformerUtils.extractJson(" { \"tree\":\n \n{ \"key1\": \"100\",\n \"key2\": \"200\" }\n }\n "));
assertNotNull(ElasticsearchTransformerUtils.extractJson("{ \"array\": [ { \"key1\": \"100\", \"key2\": \"200\" } ] }"));
assertNotNull(ElasticsearchTransformerUtils.extractJson("{ \"array\": [ { \"key1\": \"100\" }, { \"key2\": \"200\" } ] }"));
assertNotNull(ElasticsearchTransformerUtils.extractJson("{ \"array\": [] }"));

assertEquals("{ \"key\": \"100\" }", ElasticsearchTransformerUtils.extractJson("{ \"key\": \"100\" }"));
assertEquals("{ \"key\": \"100\" }",
ElasticsearchTransformerUtils.extractJson("Received: { \"key\": \"100\" }"));
assertEquals("{ \"key\": \"100\" }",
ElasticsearchTransformerUtils.extractJson("Received event: { \"key\": \"100\" }"));
assertEquals(
"{ \"array\": [ { \"key1\": \"100\" }, { \"key2\": \"200\" } ] }",
ElasticsearchTransformerUtils.extractJson("Received event: { \"array\": [ { \"key1\": \"100\" }, { \"key2\": \"200\" } ] }"));
}
}
10 changes: 10 additions & 0 deletions src/test/resources/aws-lambda-log-example.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@
"request_id": "c342155b-1ec0-11e5-b0e2-f317438eb2f6",
"event": "Hello World"
}
},
{
"id": "49545295115971876468408574808419866449919668408574808465",
"timestamp": "1421200848954",
"message": "2015-01-14T02:00:48.954Z c342155b-1ec0-11e5-b0e2-f317438eb2f6 Received event: { \"key1\": 100, \"key2\": \"value\", \"key3\": { \"key4\": \"level2\" } }",
"extractedFields": {
"timestamp": "2015-01-14T02:00:48.954Z",
"request_id": "c342155b-1ec0-11e5-b0e2-f317438eb2f6",
"event": "Received event: { \"key1\": 100, \"key2\": \"value\", \"key3\": { \"key4\": \"level2\" } }"
}
}
]
}

0 comments on commit 9eef3cd

Please sign in to comment.