From 498313a8c12bcc15c8a179f5a97afff1d673d0b2 Mon Sep 17 00:00:00 2001 From: Otto Fowler Date: Tue, 4 Sep 2018 06:50:38 -0400 Subject: [PATCH 1/7] METRON-1761, allow application of grok statement multiple times --- .../src/main/sample/patterns/test | 1 + metron-platform/metron-parsers/README.md | 1 + .../org/apache/metron/parsers/GrokParser.java | 82 +++++++------ .../parsers/MultiLineGrokParserTest.java | 112 ++++++++++++++++++ .../websphere/GrokWebSphereParserTest.java | 13 -- .../test/resources/logData/multi_elb_log.txt | 10 ++ 6 files changed, 170 insertions(+), 49 deletions(-) create mode 100644 metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java create mode 100644 metron-platform/metron-parsers/src/test/resources/logData/multi_elb_log.txt diff --git a/metron-platform/metron-integration-test/src/main/sample/patterns/test b/metron-platform/metron-integration-test/src/main/sample/patterns/test index a88a2559d0..ebbf9c4428 100644 --- a/metron-platform/metron-integration-test/src/main/sample/patterns/test +++ b/metron-platform/metron-integration-test/src/main/sample/patterns/test @@ -1,2 +1,3 @@ YAF_TIME_FORMAT %{YEAR:UNWANTED}-%{MONTHNUM:UNWANTED}-%{MONTHDAY:UNWANTED}[T ]%{HOUR:UNWANTED}:%{MINUTE:UNWANTED}:%{SECOND:UNWANTED} YAF_DELIMITED %{NUMBER:start_time}\|%{YAF_TIME_FORMAT:end_time}\|%{SPACE:UNWANTED}%{BASE10NUM:duration}\|%{SPACE:UNWANTED}%{BASE10NUM:rtt}\|%{SPACE:UNWANTED}%{INT:protocol}\|%{SPACE:UNWANTED}%{IP:ip_src_addr}\|%{SPACE:UNWANTED}%{INT:ip_src_port}\|%{SPACE:UNWANTED}%{IP:ip_dst_addr}\|%{SPACE:UNWANTED}%{INT:ip_dst_port}\|%{SPACE:UNWANTED}%{DATA:iflags}\|%{SPACE:UNWANTED}%{DATA:uflags}\|%{SPACE:UNWANTED}%{DATA:riflags}\|%{SPACE:UNWANTED}%{DATA:ruflags}\|%{SPACE:UNWANTED}%{WORD:isn}\|%{SPACE:UNWANTED}%{DATA:risn}\|%{SPACE:UNWANTED}%{DATA:tag}\|%{GREEDYDATA:rtag}\|%{SPACE:UNWANTED}%{INT:pkt}\|%{SPACE:UNWANTED}%{INT:oct}\|%{SPACE:UNWANTED}%{INT:rpkt}\|%{SPACE:UNWANTED}%{INT:roct}\|%{SPACE:UNWANTED}%{INT:app}\|%{GREEDYDATA:end_reason} +ELBACCESSLOGS %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:clientip}:%{INT:clientport} (?:(%{IP:backendip}:?:%{INT:backendport})|-) %{NUMBER:request_processing_time} %{NUMBER:backend_processing_time} %{NUMBER:response_processing_time} (?:-|%{INT:elb_status_code}) (?:-|%{INT:backend_status_code}) %{INT:received_bytes} %{INT:sent_bytes} \"(?:-|(?:%{WORD:verb} %{URIPROTO:proto}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST:urihost})?(?:%{URIPATH:path}(?:%{URIPARAM:params})?)?(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest}))\" \"(?:-|%{DATA:user_agent})\" (?:-|%{NOTSPACE:ssl_cipher}) (?:-|%{NOTSPACE:ssl_protocol}) diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index fd6b4708ee..41e3542acb 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -33,6 +33,7 @@ There are two general types types of parsers: * `timeFields` : A list of fields to be treated as time * `dateFormat` : The date format to use to parse the time fields * `timezone` : The timezone to use. `UTC` is default. + * The Grok parser supports either 1 line to parse per incoming message, or incoming messages with multiple log lines, and will produce a json message per line * CSV Parser: `org.apache.metron.parsers.csv.CSVParser` with possible `parserConfig` entries of * `timestampFormat` : The date format of the timestamp to use. If unspecified, the parser assumes the timestamp is ms since unix epoch. * `columns` : A map of column names you wish to extract from the CSV to their offsets (e.g. `{ 'name' : 1, 'profession' : 3}` would be a column map for extracting the 2nd and 4th columns from a CSV) diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java index 99ac3905d6..db6421385b 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,15 +15,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.metron.parsers; import com.google.common.base.Joiner; import com.google.common.base.Splitter; +import oi.thekraken.grok.api.Grok; +import oi.thekraken.grok.api.Match; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.metron.common.Constants; +import org.apache.metron.parsers.interfaces.MessageParser; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Serializable; +import java.io.StringReader; import java.lang.invoke.MethodHandles; +import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -31,16 +46,7 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; -import oi.thekraken.grok.api.Grok; -import oi.thekraken.grok.api.Match; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.metron.common.Constants; -import org.apache.metron.parsers.interfaces.MessageParser; -import org.json.simple.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + public class GrokParser implements MessageParser, Serializable { @@ -55,6 +61,7 @@ public class GrokParser implements MessageParser, Serializable { protected String patternsCommonDir = "/patterns/common"; @Override + @SuppressWarnings("unchecked") public void configure(Map parserConfig) { this.grokPath = (String) parserConfig.get("grokPath"); this.patternLabel = (String) parserConfig.get("patternLabel"); @@ -132,33 +139,36 @@ public List parse(byte[] rawMessage) { } List messages = new ArrayList<>(); String originalMessage = null; - try { - originalMessage = new String(rawMessage, "UTF-8"); - LOG.debug("Grok parser parsing message: {}",originalMessage); - Match gm = grok.match(originalMessage); - gm.captures(); - JSONObject message = new JSONObject(); - message.putAll(gm.toMap()); - - if (message.size() == 0) - throw new RuntimeException("Grok statement produced a null message. Original message was: " - + originalMessage + " and the parsed message was: " + message + " . Check the pattern at: " - + grokPath); - - message.put("original_string", originalMessage); - for (String timeField : timeFields) { - String fieldValue = (String) message.get(timeField); - if (fieldValue != null) { - message.put(timeField, toEpoch(fieldValue)); + // read the incoming raw data as if it may have multiple lines of logs + // if there is only only one line, it will just get processed. + try (BufferedReader reader = new BufferedReader(new StringReader(new String(rawMessage, StandardCharsets.UTF_8)))) { + while ((originalMessage = reader.readLine()) != null) { + LOG.debug("Grok parser parsing message: {}", originalMessage); + Match gm = grok.match(originalMessage); + gm.captures(); + JSONObject message = new JSONObject(); + message.putAll(gm.toMap()); + + if (message.size() == 0) + throw new RuntimeException("Grok statement produced a null message. Original message was: " + + originalMessage + " and the parsed message was: " + message + " . Check the pattern at: " + + grokPath); + + message.put("original_string", originalMessage); + for (String timeField : timeFields) { + String fieldValue = (String) message.get(timeField); + if (fieldValue != null) { + message.put(timeField, toEpoch(fieldValue)); + } } + if (timestampField != null) { + message.put(Constants.Fields.TIMESTAMP.getName(), formatTimestamp(message.get(timestampField))); + } + message.remove(patternLabel); + postParse(message); + messages.add(message); + LOG.debug("Grok parser parsed message: {}", message); } - if (timestampField != null) { - message.put(Constants.Fields.TIMESTAMP.getName(), formatTimestamp(message.get(timestampField))); - } - message.remove(patternLabel); - postParse(message); - messages.add(message); - LOG.debug("Grok parser parsed message: {}", message); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new IllegalStateException("Grok parser Error: " + e.getMessage() + " on " + originalMessage , e); diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java new file mode 100644 index 0000000000..0ecf5e24df --- /dev/null +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.parsers; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.commons.io.IOUtils; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MultiLineGrokParserTest extends GrokParserTest { + + /** + * Test that if a byte[] with multiple lines of log is passed in + * it will be parsed into the correct number of messages. + * @throws IOException if we can't read from disk + * @throws ParseException if we can't parse + */ + @Override + @Test + public void test() throws IOException, ParseException { + + Map parserConfig = new HashMap<>(); + parserConfig.put("grokPath", getGrokPath()); + parserConfig.put("patternLabel", getGrokPatternLabel()); + parserConfig.put("timestampField", getTimestampField()); + parserConfig.put("dateFormat", getDateFormat()); + parserConfig.put("timeFields", getTimeFields()); + + GrokParser grokParser = new GrokParser(); + grokParser.configure(parserConfig); + grokParser.init(); + + JSONParser jsonParser = new JSONParser(); + Map testData = getTestData(); + for( Map.Entry e : testData.entrySet() ) { + byte[] rawMessage = e.getKey().getBytes(); + List parsedList = grokParser.parse(rawMessage); + Assert.assertEquals(10, parsedList.size()); + } + } + + @Override + public Map getTestData() { + + Map testData = new HashMap(); + String input; + try (FileInputStream stream = new FileInputStream(new File("src/test/resources/logData/multi_elb_log.txt"))) { + input = IOUtils.toString(stream); + } catch (IOException ioe) { + throw new IllegalStateException("failed to open file",ioe); + } + // not checking values, just that we get the right number of messages + testData.put(input,""); + return testData; + + } + + @Override + public String getGrokPath() { + return "../metron-integration-test/src/main/sample/patterns/test"; + } + + @Override + public String getGrokPatternLabel() { + return "ELBACCESSLOGS"; + } + + @Override + public List getTimeFields() { + return new ArrayList() {{ + add("timestamp"); + }}; + } + + @Override + public String getDateFormat() { + return "yyyy-MM-dd'T'HH:mm:ss.S'Z'"; + } + + @Override + public String getTimestampField() { + return "timestamp"; + } +} diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java index 230c14715e..2923a4f34c 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/websphere/GrokWebSphereParserTest.java @@ -244,17 +244,4 @@ public void tetsParseMalformedOtherLine() throws Exception { assertEquals("trans 191) admindefaultsystem*): ntp-service 'NTP Service' - Operational state down:", parsedJSON.get("message")); } - - @Test(expected=RuntimeException.class) - public void testParseEmptyLine() throws Exception { - - //Set up parser, attempt to parse malformed message - GrokWebSphereParser parser = new GrokWebSphereParser(); - parser.configure(parserConfig); - String testString = ""; - UnitTestHelper.setLog4jLevel(GrokParser.class, Level.FATAL); - List result = parser.parse(testString.getBytes()); - UnitTestHelper.setLog4jLevel(GrokParser.class, Level.ERROR); - } - } diff --git a/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_log.txt b/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_log.txt new file mode 100644 index 0000000000..95d3fec8e3 --- /dev/null +++ b/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_log.txt @@ -0,0 +1,10 @@ +2018-08-30T21:02:32.047266Z vault-development 192.168.113.53:60002 192.168.82.130:8200 0.000695 0.000017 0.000015 - - 607 3078 "- - - " "-" - - +2018-08-30T21:02:22.119595Z vault-development 192.168.113.29:57322 192.168.72.75:8200 0.00051 0.00001 0.000013 - - 607 3079 "- - - " "-" - - +2018-08-30T21:05:58.275961Z vault-production 192.168.205.159:58390 192.168.68.196:8200 0.000767 0.000009 0.000012 - - 673 3210 "- - - " "-" - - +2018-08-30T21:05:59.222277Z vault-production 192.168.228.182:26358 192.168.81.224:8200 0.000882 0.000014 0.000024 - - 519 3920 "- - - " "-" - - +2018-08-30T21:05:59.234471Z vault-production 192.168.228.182:35506 192.168.79.6:8200 0.000377 0.000011 0.000009 - - 519 3919 "- - - " "-" - - +2018-08-30T21:05:59.237375Z vault-production 192.168.228.182:52516 192.168.68.196:8200 0.000628 0.000007 0.00001 - - 519 3918 "- - - " "-" - - +2018-08-30T21:06:05.235460Z vault-production 192.168.228.182:41783 192.168.79.6:8200 0.000309 0.000006 0.00001 - - 519 3918 "- - - " "-" - - +2018-08-30T21:06:05.226698Z vault-production 192.168.228.182:40008 192.168.81.224:8200 0.000955 0.000014 0.000013 - - 519 3919 "- - - " "-" - - +2018-08-30T21:06:05.237946Z vault-production 192.168.228.182:19261 192.168.68.196:8200 0.000661 0.000006 0.000009 - - 519 3918 "- - - " "-" - - +2018-08-30T21:06:11.229542Z vault-production 192.168.228.182:44082 192.168.81.224:8200 0.000912 0.000009 0.000014 - - 519 3919 "- - - " "-" - - From c2b3bb88d2a06e5cde39fd90a87f92207906eac4 Mon Sep 17 00:00:00 2001 From: Otto Fowler Date: Fri, 28 Sep 2018 11:40:37 -0400 Subject: [PATCH 2/7] per review, do not require derivation --- .../parsers/MultiLineGrokParserTest.java | 30 +++++++------------ 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java index 0ecf5e24df..5b719d2535 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,8 +17,6 @@ */ package org.apache.metron.parsers; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.adrianwalker.multilinestring.Multiline; import org.apache.commons.io.IOUtils; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -28,14 +26,13 @@ import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -public class MultiLineGrokParserTest extends GrokParserTest { +public class MultiLineGrokParserTest { /** * Test that if a byte[] with multiple lines of log is passed in @@ -43,8 +40,8 @@ public class MultiLineGrokParserTest extends GrokParserTest { * @throws IOException if we can't read from disk * @throws ParseException if we can't parse */ - @Override @Test + @SuppressWarnings("unchecked") public void test() throws IOException, ParseException { Map parserConfig = new HashMap<>(); @@ -59,53 +56,48 @@ public void test() throws IOException, ParseException { grokParser.init(); JSONParser jsonParser = new JSONParser(); - Map testData = getTestData(); - for( Map.Entry e : testData.entrySet() ) { + Map testData = getTestData(); + for (Map.Entry e : testData.entrySet()) { byte[] rawMessage = e.getKey().getBytes(); List parsedList = grokParser.parse(rawMessage); Assert.assertEquals(10, parsedList.size()); } } - @Override + @SuppressWarnings("unchecked") public Map getTestData() { - Map testData = new HashMap(); + Map testData = new HashMap(); String input; try (FileInputStream stream = new FileInputStream(new File("src/test/resources/logData/multi_elb_log.txt"))) { input = IOUtils.toString(stream); } catch (IOException ioe) { - throw new IllegalStateException("failed to open file",ioe); + throw new IllegalStateException("failed to open file", ioe); } // not checking values, just that we get the right number of messages - testData.put(input,""); + testData.put(input, ""); return testData; } - @Override public String getGrokPath() { return "../metron-integration-test/src/main/sample/patterns/test"; } - @Override public String getGrokPatternLabel() { return "ELBACCESSLOGS"; } - @Override public List getTimeFields() { return new ArrayList() {{ add("timestamp"); }}; } - @Override public String getDateFormat() { return "yyyy-MM-dd'T'HH:mm:ss.S'Z'"; } - @Override public String getTimestampField() { return "timestamp"; } From 7d88f471c064457e922ed4f53cf23e31f3cc4e0e Mon Sep 17 00:00:00 2001 From: Otto Fowler Date: Sun, 7 Oct 2018 10:12:29 -0400 Subject: [PATCH 3/7] Add support for handling a mix of successfull and unsuccessfull message results from the same byte[] --- .../parsers/DefaultMessageParserResult.java | 76 ++++++++++ .../org/apache/metron/parsers/GrokParser.java | 81 +++++++--- .../metron/parsers/bolt/ParserBolt.java | 59 +++++--- .../parsers/interfaces/MessageParser.java | 28 +++- .../interfaces/MessageParserResult.java | 48 ++++++ .../parsers/MultiLineGrokParserTest.java | 39 +++++ .../MultiLineWithErrorsGrokParserTest.java | 142 ++++++++++++++++++ .../metron/parsers/bolt/ParserBoltTest.java | 15 +- .../parsers/integration/ParserDriver.java | 6 +- .../logData/multi_elb_with_errors_log.txt | 13 ++ 10 files changed, 452 insertions(+), 55 deletions(-) create mode 100644 metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java create mode 100644 metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java create mode 100644 metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java create mode 100644 metron-platform/metron-parsers/src/test/resources/logData/multi_elb_with_errors_log.txt diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java new file mode 100644 index 0000000000..11d15eb8a8 --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/DefaultMessageParserResult.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.parsers; + +import org.apache.metron.parsers.interfaces.MessageParserResult; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class DefaultMessageParserResult implements MessageParserResult { + private List messages = new ArrayList<>(); + private Map errors = new HashMap<>(); + private Throwable masterThrowable; + + public DefaultMessageParserResult() { + } + + public DefaultMessageParserResult(Throwable masterThrowable) { + this.masterThrowable = masterThrowable; + } + + public DefaultMessageParserResult(List list) { + messages.addAll(list); + } + + public DefaultMessageParserResult(Map map) { + errors.putAll(map); + } + + public DefaultMessageParserResult(List list, Map map) { + messages.addAll(list); + errors.putAll(map); + } + + public void addMessage(T message) { + messages.add(message); + } + + public void addError(Object message, Throwable throwable) { + errors.put(message, throwable); + } + + @Override + public List getMessages() { + return messages; + } + + @Override + public Map getMessageThrowables() { + return errors; + } + + @Override + public Optional getMasterThrowable() { + return Optional.ofNullable(masterThrowable); + } +} diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java index db6421385b..7737fdaf8a 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.metron.common.Constants; import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.parsers.interfaces.MessageParserResult; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,9 +43,12 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TimeZone; @@ -134,46 +138,73 @@ public void init() { @SuppressWarnings("unchecked") @Override public List parse(byte[] rawMessage) { + Optional> resultOptional = parseOptionalResult(rawMessage); + if (!resultOptional.isPresent()) { + return Collections.EMPTY_LIST; + } + Map errors = resultOptional.get().getMessageThrowables(); + if (!errors.isEmpty()) { + throw new RuntimeException(errors.entrySet().iterator().next().getValue()); + } + + return resultOptional.get().getMessages(); + } + + @SuppressWarnings("unchecked") + @Override + public Optional> parseOptionalResult(byte[] rawMessage) { if (grok == null) { init(); } List messages = new ArrayList<>(); + Map errors = new HashMap<>(); String originalMessage = null; // read the incoming raw data as if it may have multiple lines of logs // if there is only only one line, it will just get processed. try (BufferedReader reader = new BufferedReader(new StringReader(new String(rawMessage, StandardCharsets.UTF_8)))) { while ((originalMessage = reader.readLine()) != null) { LOG.debug("Grok parser parsing message: {}", originalMessage); - Match gm = grok.match(originalMessage); - gm.captures(); - JSONObject message = new JSONObject(); - message.putAll(gm.toMap()); - - if (message.size() == 0) - throw new RuntimeException("Grok statement produced a null message. Original message was: " - + originalMessage + " and the parsed message was: " + message + " . Check the pattern at: " - + grokPath); - - message.put("original_string", originalMessage); - for (String timeField : timeFields) { - String fieldValue = (String) message.get(timeField); - if (fieldValue != null) { - message.put(timeField, toEpoch(fieldValue)); + try { + Match gm = grok.match(originalMessage); + gm.captures(); + JSONObject message = new JSONObject(); + message.putAll(gm.toMap()); + + if (message.size() == 0) { + Throwable rte = new RuntimeException("Grok statement produced a null message. Original message was: " + + originalMessage + " and the parsed message was: " + message + " . Check the pattern at: " + + grokPath); + errors.put(originalMessage, rte); + continue; } + message.put("original_string", originalMessage); + for (String timeField : timeFields) { + String fieldValue = (String) message.get(timeField); + if (fieldValue != null) { + message.put(timeField, toEpoch(fieldValue)); + } + } + if (timestampField != null) { + message.put(Constants.Fields.TIMESTAMP.getName(), formatTimestamp(message.get(timestampField))); + } + message.remove(patternLabel); + postParse(message); + messages.add(message); + LOG.debug("Grok parser parsed message: {}", message); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + errors.put(originalMessage, e); } - if (timestampField != null) { - message.put(Constants.Fields.TIMESTAMP.getName(), formatTimestamp(message.get(timestampField))); - } - message.remove(patternLabel); - postParse(message); - messages.add(message); - LOG.debug("Grok parser parsed message: {}", message); } - } catch (Exception e) { + } catch (IOException e) { LOG.error(e.getMessage(), e); - throw new IllegalStateException("Grok parser Error: " + e.getMessage() + " on " + originalMessage , e); + Exception innerException = new IllegalStateException("Grok parser Error: " + + e.getMessage() + + " on " + + originalMessage, e); + return Optional.of(new DefaultMessageParserResult<>(innerException)); } - return messages; + return Optional.of(new DefaultMessageParserResult<>(messages, errors)); } @Override diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java index 213d02cf09..a7dd7591c8 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,23 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.metron.parsers.bolt; import com.github.benmanes.caffeine.cache.Cache; -import java.io.Serializable; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredParserBolt; @@ -48,6 +36,7 @@ import org.apache.metron.parsers.filters.Filters; import org.apache.metron.parsers.interfaces.MessageFilter; import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.parsers.interfaces.MessageParserResult; import org.apache.metron.parsers.topology.ParserComponents; import org.apache.metron.stellar.common.CachingStellarProcessor; import org.apache.metron.stellar.dsl.Context; @@ -66,6 +55,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; + public class ParserBolt extends ConfiguredParserBolt implements Serializable { @@ -306,8 +309,17 @@ public void execute(Tuple tuple) { ); metadata = rawMessage.getMetadata(); - Optional> messages = parser.parseOptional(rawMessage.getMessage()); - for (JSONObject message : messages.orElse(Collections.emptyList())) { + Optional> results = parser.parseOptionalResult(rawMessage.getMessage()); + + // check if there is a master error + if (results.isPresent() && results.get().getMasterThrowable() != null) { + handleError(originalMessage, tuple, results.get().getMasterThrowable(), collector); + return; + } + + // Handle the message results + List messages = results.isPresent() ? results.get().getMessages() : Collections.EMPTY_LIST; + for (JSONObject message : messages) { //we want to ack the tuple in the situation where we have are not doing a bulk write //otherwise we want to defer to the writerComponent who will ack on bulk commit. WriterHandler writer = sensorToComponentMap.get(sensor).getWriter(); @@ -370,6 +382,19 @@ public void execute(Tuple tuple) { } } } + + // Handle the error results + Map messageErrors = results.isPresent() + ? results.get().getMessageThrowables() : Collections.EMPTY_MAP; + + for (Entry entry : messageErrors.entrySet()) { + MetronError error = new MetronError() + .withErrorType(Constants.ErrorType.PARSER_ERROR) + .withThrowable(entry.getValue()) + .withSensorType(sensorToComponentMap.keySet()) + .addRawMessage(originalMessage); + ErrorUtils.handleError(collector, error); + } } //if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer //(meaning that none of the messages are valid either globally or locally) @@ -383,7 +408,7 @@ public void execute(Tuple tuple) { } } - protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { + protected void handleError(Object originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { MetronError error = new MetronError() .withErrorType(Constants.ErrorType.PARSER_ERROR) .withThrowable(ex) diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java index e3b903ee07..f8243b9c05 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java @@ -17,7 +17,11 @@ */ package org.apache.metron.parsers.interfaces; +import org.apache.metron.parsers.DefaultMessageParserResult; + import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -31,23 +35,41 @@ public interface MessageParser extends Configurable { /** * Take raw data and convert it to a list of messages. * - * @param rawMessage + * @param rawMessage the raw bytes of the message * @return If null is returned, this is treated as an empty list. */ List parse(byte[] rawMessage); /** * Take raw data and convert it to an optional list of messages. - * @param parseMessage + * @param parseMessage the raw bytes of the message * @return If null is returned, this is treated as an empty list. */ default Optional> parseOptional(byte[] parseMessage) { return Optional.ofNullable(parse(parseMessage)); } + /** + * Take raw data and convert it to messages. Each raw message may produce multiple messages and therefore + * multiple errors. A {@link MessageParserResult} is returned, which will have both the messages produced + * and the errors. + * @param parseMessage the raw bytes of the message + * @return Optional of {@link MessageParserResult} + */ + default Optional> parseOptionalResult(byte[] parseMessage) { + List list = new ArrayList<>(); + try { + Optional> optionalMessages = parseOptional(parseMessage); + optionalMessages.ifPresent(list::addAll); + } catch (Throwable t) { + return Optional.of(new DefaultMessageParserResult<>(t)); + } + return Optional.of(new DefaultMessageParserResult(list)); + } + /** * Validate the message to ensure that it's correct. - * @param message + * @param message the message to validate * @return true if the message is valid, false if not */ boolean validate(T message); diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java new file mode 100644 index 0000000000..891e94ffa6 --- /dev/null +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParserResult.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.parsers.interfaces; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Result object MessageParser calls. + * @param + */ +public interface MessageParserResult { + /** + * Returns the Message objects of {@code T} + * @return {@code List} + */ + List getMessages(); + + /** + * Returns a map of raw message objects to the {@code Throwable} they triggered. + * @return {@code Map} + */ + Map getMessageThrowables(); + + /** + * Returns a master {@code Throwable} for a parse call. This represents a complete + * call failure, as opposed to one associated with a message. + * @return {@code Optional}{@code Throwable} + */ + Optional getMasterThrowable(); +} diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java index 5b719d2535..aeb46c1e77 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java @@ -18,6 +18,7 @@ package org.apache.metron.parsers; import org.apache.commons.io.IOUtils; +import org.apache.metron.parsers.interfaces.MessageParserResult; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; @@ -31,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; public class MultiLineGrokParserTest { @@ -64,6 +66,43 @@ public void test() throws IOException, ParseException { } } + /** + * Test that if a byte[] with multiple lines of log is passed in + * it will be parsed into the correct number of messages using the + * parseOptionalResult call. + * @throws IOException if we can't read from disk + * @throws ParseException if we can't parse + */ + @Test + @SuppressWarnings("unchecked") + public void testOptionalResult() throws IOException, ParseException { + + Map parserConfig = new HashMap<>(); + parserConfig.put("grokPath", getGrokPath()); + parserConfig.put("patternLabel", getGrokPatternLabel()); + parserConfig.put("timestampField", getTimestampField()); + parserConfig.put("dateFormat", getDateFormat()); + parserConfig.put("timeFields", getTimeFields()); + + GrokParser grokParser = new GrokParser(); + grokParser.configure(parserConfig); + grokParser.init(); + + JSONParser jsonParser = new JSONParser(); + Map testData = getTestData(); + for (Map.Entry e : testData.entrySet()) { + byte[] rawMessage = e.getKey().getBytes(); + Optional> resultOptional = grokParser.parseOptionalResult(rawMessage); + Assert.assertTrue(resultOptional.isPresent()); + Optional throwableOptional = resultOptional.get().getMasterThrowable(); + List resultList = resultOptional.get().getMessages(); + Map errorMap = resultOptional.get().getMessageThrowables(); + Assert.assertFalse(throwableOptional.isPresent()); + Assert.assertEquals(0, errorMap.size()); + Assert.assertEquals(10, resultList.size()); + } + } + @SuppressWarnings("unchecked") public Map getTestData() { diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java new file mode 100644 index 0000000000..c391da544f --- /dev/null +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.parsers; + +import org.apache.commons.io.IOUtils; +import org.apache.metron.parsers.interfaces.MessageParserResult; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class MultiLineWithErrorsGrokParserTest { + + /** + * Test that if a byte[] with multiple lines of log is passed in + * it will be parsed into the correct number of messages. + * @throws IOException if we can't read from disk + * @throws ParseException if we can't parse + */ + @Test(expected = RuntimeException.class) + @SuppressWarnings("unchecked") + public void test() throws IOException, ParseException { + + Map parserConfig = new HashMap<>(); + parserConfig.put("grokPath", getGrokPath()); + parserConfig.put("patternLabel", getGrokPatternLabel()); + parserConfig.put("timestampField", getTimestampField()); + parserConfig.put("dateFormat", getDateFormat()); + parserConfig.put("timeFields", getTimeFields()); + + GrokParser grokParser = new GrokParser(); + grokParser.configure(parserConfig); + grokParser.init(); + + JSONParser jsonParser = new JSONParser(); + Map testData = getTestData(); + for (Map.Entry e : testData.entrySet()) { + byte[] rawMessage = e.getKey().getBytes(); + List parsedList = grokParser.parse(rawMessage); + } + } + + /** + * Test that if a byte[] with multiple lines of log is passed in + * it will be parsed into the correct number of messages using the + * parseOptionalResult call. + * @throws IOException if we can't read from disk + * @throws ParseException if we can't parse + */ + @Test + @SuppressWarnings("unchecked") + public void testOptionalResult() throws IOException, ParseException { + + Map parserConfig = new HashMap<>(); + parserConfig.put("grokPath", getGrokPath()); + parserConfig.put("patternLabel", getGrokPatternLabel()); + parserConfig.put("timestampField", getTimestampField()); + parserConfig.put("dateFormat", getDateFormat()); + parserConfig.put("timeFields", getTimeFields()); + + GrokParser grokParser = new GrokParser(); + grokParser.configure(parserConfig); + grokParser.init(); + + JSONParser jsonParser = new JSONParser(); + Map testData = getTestData(); + for (Map.Entry e : testData.entrySet()) { + byte[] rawMessage = e.getKey().getBytes(); + Optional> resultOptional = grokParser.parseOptionalResult(rawMessage); + Assert.assertTrue(resultOptional.isPresent()); + Optional throwableOptional = resultOptional.get().getMasterThrowable(); + List resultList = resultOptional.get().getMessages(); + Map errorMap = resultOptional.get().getMessageThrowables(); + Assert.assertFalse(throwableOptional.isPresent()); + Assert.assertEquals(3, errorMap.size()); + Assert.assertEquals(10, resultList.size()); + } + } + + @SuppressWarnings("unchecked") + public Map getTestData() { + + Map testData = new HashMap(); + String input; + try (FileInputStream stream = new FileInputStream(new File("src/test/resources/logData/multi_elb_with_errors_log.txt"))) { + input = IOUtils.toString(stream); + } catch (IOException ioe) { + throw new IllegalStateException("failed to open file", ioe); + } + // not checking values, just that we get the right number of messages + testData.put(input, ""); + return testData; + + } + + public String getGrokPath() { + return "../metron-integration-test/src/main/sample/patterns/test"; + } + + public String getGrokPatternLabel() { + return "ELBACCESSLOGS"; + } + + public List getTimeFields() { + return new ArrayList() {{ + add("timestamp"); + }}; + } + + public String getDateFormat() { + return "yyyy-MM-dd'T'HH:mm:ss.S'Z'"; + } + + public String getTimestampField() { + return "timestamp"; + } +} diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java index 06f4cecac8..0ae2817261 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java @@ -56,6 +56,7 @@ import org.apache.metron.common.writer.MessageWriter; import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; import org.apache.metron.parsers.BasicParser; +import org.apache.metron.parsers.DefaultMessageParserResult; import org.apache.metron.parsers.interfaces.MessageFilter; import org.apache.metron.parsers.interfaces.MessageParser; import org.apache.metron.parsers.topology.ParserComponents; @@ -254,7 +255,7 @@ protected ConfigurationsUpdater createUpdater() { parsedMessage.put("guid", "this-is-unique-identifier-for-tuple"); List messageList = new ArrayList<>(); messageList.add(parsedMessage); - when(parser.parseOptional(sampleBinary)).thenReturn(Optional.of(messageList)); + when(parser.parseOptionalResult(sampleBinary)).thenReturn(Optional.of(new DefaultMessageParserResult<>(messageList))); when(parser.validate(parsedMessage)).thenReturn(true); parserBolt.execute(tuple); @@ -303,7 +304,7 @@ protected ConfigurationsUpdater createUpdater() { final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"source.type\":\"" + sensorType + "\", \"guid\": \"this-is-unique-identifier-for-tuple\" }"); final JSONObject finalMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\", \"source.type\":\"" + sensorType + "\", \"guid\": \"this-is-unique-identifier-for-tuple\" }"); when(tuple.getBinary(0)).thenReturn(sampleBinary); - when(parser.parseOptional(sampleBinary)).thenReturn(Optional.of(messages)); + when(parser.parseOptionalResult(sampleBinary)).thenReturn(Optional.of(new DefaultMessageParserResult<>(messages))); when(parser.validate(eq(messages.get(0)))).thenReturn(true); when(parser.validate(eq(messages.get(1)))).thenReturn(false); parserBolt.execute(tuple); @@ -358,9 +359,9 @@ public void testFilterSuccess() throws Exception { when(successResponse.getSuccesses()).thenReturn(ImmutableList.of(t1)); when(batchWriter.write(any(), any(), any(), any())).thenReturn(successResponse); when(parser.validate(any())).thenReturn(true); - when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject(new HashMap() {{ + when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject(new HashMap() {{ put("field1", "blah"); - }})))); + }}))))); parserBolt.execute(t1); verify(batchWriter, times(1)).write(any(), any(), any(), any()); verify(outputCollector, times(1)).ack(t1); @@ -543,7 +544,7 @@ protected ConfigurationsUpdater createUpdater() { verify(parser, times(1)).init(); verify(batchWriter, times(1)).init(any(), any(), any()); when(parser.validate(any())).thenReturn(true); - when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); + when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject())))); when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); int oneLessThanDefaultBatchSize = ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE - 1; BulkWriterResponse response = new BulkWriterResponse(); @@ -624,7 +625,7 @@ protected ConfigurationsUpdater createUpdater() { verify(parser, times(1)).init(); verify(batchWriter, times(1)).init(any(), any(), any()); when(parser.validate(any())).thenReturn(true); - when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); + when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject())))); when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); Set tuples = Stream.of(t1, t2, t3, t4, t5).collect(Collectors.toSet()); BulkWriterResponse response = new BulkWriterResponse(); @@ -671,7 +672,7 @@ protected ConfigurationsUpdater createUpdater() { doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), any()); when(parser.validate(any())).thenReturn(true); - when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject()))); + when(parser.parseOptionalResult(any())).thenReturn(Optional.of(new DefaultMessageParserResult<>(ImmutableList.of(new JSONObject())))); when(filter.emitTuple(any(), any(Context.class))).thenReturn(true); parserBolt.execute(t1); parserBolt.execute(t2); diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java index 2cba40a886..76dc2b3e87 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java @@ -125,16 +125,16 @@ protected void prepCache() { } @Override - protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { - errors.add(originalMessage); + protected void handleError(Object originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { + errors.add((byte[])originalMessage); LOG.error("Error parsing message: " + ex.getMessage(), ex); } + @SuppressWarnings("unchecked") public ProcessorResult> getResults() { return new ProcessorResult.Builder>().withProcessErrors(errors) .withResult(output) .build(); - } } diff --git a/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_with_errors_log.txt b/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_with_errors_log.txt new file mode 100644 index 0000000000..3525fc40ec --- /dev/null +++ b/metron-platform/metron-parsers/src/test/resources/logData/multi_elb_with_errors_log.txt @@ -0,0 +1,13 @@ +2018-08-30T21:02:32.047266Z vault-development 192.168.113.53:60002 192.168.82.130:8200 0.000695 0.000017 0.000015 - - 607 3078 "- - - " "-" - - +2018-08-30T21:02:22.119595Z vault-development 192.168.113.29:57322 192.168.72.75:8200 0.00051 0.00001 0.000013 - - 607 3079 "- - - " "-" - - +2018-08-30T21:05:58.275961Z vault-production 192.168.205.159:58390 192.168.68.196:8200 0.000767 0.000009 0.000012 - - 673 3210 "- - - " "-" - - +2018-08-30T21:05:59.222277Z vault-production 192.168.228.182:26358 192.168.81.224:8200 0.000882 0.000014 0.000024 - - 519 3920 "- - - " "-" - - +BOOM +BLAM +BOP +2018-08-30T21:05:59.234471Z vault-production 192.168.228.182:35506 192.168.79.6:8200 0.000377 0.000011 0.000009 - - 519 3919 "- - - " "-" - - +2018-08-30T21:05:59.237375Z vault-production 192.168.228.182:52516 192.168.68.196:8200 0.000628 0.000007 0.00001 - - 519 3918 "- - - " "-" - - +2018-08-30T21:06:05.235460Z vault-production 192.168.228.182:41783 192.168.79.6:8200 0.000309 0.000006 0.00001 - - 519 3918 "- - - " "-" - - +2018-08-30T21:06:05.226698Z vault-production 192.168.228.182:40008 192.168.81.224:8200 0.000955 0.000014 0.000013 - - 519 3919 "- - - " "-" - - +2018-08-30T21:06:05.237946Z vault-production 192.168.228.182:19261 192.168.68.196:8200 0.000661 0.000006 0.000009 - - 519 3918 "- - - " "-" - - +2018-08-30T21:06:11.229542Z vault-production 192.168.228.182:44082 192.168.81.224:8200 0.000912 0.000009 0.000014 - - 519 3919 "- - - " "-" - - From a833bdac35fc677be471547c9fe1cb993286e9f3 Mon Sep 17 00:00:00 2001 From: Otto Fowler Date: Sun, 7 Oct 2018 12:41:31 -0400 Subject: [PATCH 4/7] documentation and test fixes --- metron-platform/metron-parsers/README.md | 1 + .../org/apache/metron/parsers/GrokParser.java | 59 +++++++++++++++++++ .../metron/parsers/bolt/ParserBolt.java | 4 +- .../apache/metron/parsers/GrokParserTest.java | 1 + .../parsers/MultiLineGrokParserTest.java | 5 +- .../MultiLineWithErrorsGrokParserTest.java | 4 ++ .../metron/parsers/SampleGrokParserTest.java | 3 + .../metron/parsers/SquidParserTest.java | 2 + .../apache/metron/parsers/YafParserTest.java | 3 + 9 files changed, 79 insertions(+), 3 deletions(-) diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index f71e6070e4..0732a1ef7a 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -29,6 +29,7 @@ There are two general types types of parsers: * Grok parser: `org.apache.metron.parsers.GrokParser` with possible `parserConfig` entries of * `grokPath` : The path in HDFS (or in the Jar) to the grok statement * `patternLabel` : The pattern label to use from the grok statement + * `multiLine` : The raw data passed in should be handled as a long with multiple lines, with each line to be parsed separately. This setting's valid values are 'true' or 'false'. The default if unset is 'false'. When set the parser will handle multiple lines with successfully processed lines emitted normally, and lines with errors sent to the error topic. * `timestampField` : The field to use for timestamp * `timeFields` : A list of fields to be treated as time * `dateFormat` : The date format to use to parse the time fields diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java index 7737fdaf8a..bead477d0c 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java @@ -22,6 +22,7 @@ import com.google.common.base.Splitter; import oi.thekraken.grok.api.Grok; import oi.thekraken.grok.api.Match; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -58,6 +59,7 @@ public class GrokParser implements MessageParser, Serializable { protected transient Grok grok; protected String grokPath; + protected boolean multiLine = false; protected String patternLabel; protected List timeFields = new ArrayList<>(); protected String timestampField; @@ -68,6 +70,10 @@ public class GrokParser implements MessageParser, Serializable { @SuppressWarnings("unchecked") public void configure(Map parserConfig) { this.grokPath = (String) parserConfig.get("grokPath"); + String multiLineString = (String) parserConfig.get("multiLine"); + if (!StringUtils.isBlank(multiLineString)) { + multiLine = Boolean.parseBoolean(multiLineString); + } this.patternLabel = (String) parserConfig.get("patternLabel"); this.timestampField = (String) parserConfig.get("timestampField"); List timeFieldsParam = (List) parserConfig.get("timeFields"); @@ -156,6 +162,14 @@ public Optional> parseOptionalResult(byte[] rawM if (grok == null) { init(); } + if (multiLine) { + return parseMultiLine(rawMessage); + } + return parseSingleLine(rawMessage); + } + + @SuppressWarnings("unchecked") + private Optional> parseMultiLine(byte[] rawMessage) { List messages = new ArrayList<>(); Map errors = new HashMap<>(); String originalMessage = null; @@ -207,6 +221,51 @@ public Optional> parseOptionalResult(byte[] rawM return Optional.of(new DefaultMessageParserResult<>(messages, errors)); } + @SuppressWarnings("unchecked") + private Optional> parseSingleLine(byte[] rawMessage) { + List messages = new ArrayList<>(); + Map errors = new HashMap<>(); + String originalMessage = null; + try { + originalMessage = new String(rawMessage, "UTF-8"); + LOG.debug("Grok parser parsing message: {}",originalMessage); + Match gm = grok.match(originalMessage); + gm.captures(); + JSONObject message = new JSONObject(); + message.putAll(gm.toMap()); + + if (message.size() == 0) { + Throwable rte = new RuntimeException("Grok statement produced a null message. Original message was: " + + originalMessage + " and the parsed message was: " + message + " . Check the pattern at: " + + grokPath); + errors.put(originalMessage, rte); + } else { + message.put("original_string", originalMessage); + for (String timeField : timeFields) { + String fieldValue = (String) message.get(timeField); + if (fieldValue != null) { + message.put(timeField, toEpoch(fieldValue)); + } + } + if (timestampField != null) { + message.put(Constants.Fields.TIMESTAMP.getName(), formatTimestamp(message.get(timestampField))); + } + message.remove(patternLabel); + postParse(message); + messages.add(message); + LOG.debug("Grok parser parsed message: {}", message); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + Exception innerException = new IllegalStateException("Grok parser Error: " + + e.getMessage() + + " on " + + originalMessage, e); + return Optional.of(new DefaultMessageParserResult<>(innerException)); + } + return Optional.of(new DefaultMessageParserResult(messages, errors)); + } + @Override public boolean validate(JSONObject message) { LOG.debug("Grok parser validating message: {}", message); diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java index a7dd7591c8..8ff0dbdc8f 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java @@ -312,8 +312,8 @@ public void execute(Tuple tuple) { Optional> results = parser.parseOptionalResult(rawMessage.getMessage()); // check if there is a master error - if (results.isPresent() && results.get().getMasterThrowable() != null) { - handleError(originalMessage, tuple, results.get().getMasterThrowable(), collector); + if (results.isPresent() && results.get().getMasterThrowable().isPresent()) { + handleError(originalMessage, tuple, results.get().getMasterThrowable().get(), collector); return; } diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java index 1a50deaedc..7fa6a318d0 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/GrokParserTest.java @@ -92,4 +92,5 @@ public boolean compare(JSONObject expected, JSONObject actual) { public abstract List getTimeFields(); public abstract String getDateFormat(); public abstract String getTimestampField(); + public abstract String getMultiLine(); } diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java index aeb46c1e77..566c572ac2 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java @@ -52,7 +52,7 @@ public void test() throws IOException, ParseException { parserConfig.put("timestampField", getTimestampField()); parserConfig.put("dateFormat", getDateFormat()); parserConfig.put("timeFields", getTimeFields()); - + parserConfig.put("multiLine", getMultiLine()); GrokParser grokParser = new GrokParser(); grokParser.configure(parserConfig); grokParser.init(); @@ -83,6 +83,7 @@ public void testOptionalResult() throws IOException, ParseException { parserConfig.put("timestampField", getTimestampField()); parserConfig.put("dateFormat", getDateFormat()); parserConfig.put("timeFields", getTimeFields()); + parserConfig.put("multiLine", getMultiLine()); GrokParser grokParser = new GrokParser(); grokParser.configure(parserConfig); @@ -119,6 +120,8 @@ public Map getTestData() { } + public String getMultiLine() { return "true";} + public String getGrokPath() { return "../metron-integration-test/src/main/sample/patterns/test"; } diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java index c391da544f..fb5db7b7d9 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java @@ -52,6 +52,7 @@ public void test() throws IOException, ParseException { parserConfig.put("timestampField", getTimestampField()); parserConfig.put("dateFormat", getDateFormat()); parserConfig.put("timeFields", getTimeFields()); + parserConfig.put("multiLine",getMultiLine()); GrokParser grokParser = new GrokParser(); grokParser.configure(parserConfig); @@ -82,6 +83,7 @@ public void testOptionalResult() throws IOException, ParseException { parserConfig.put("timestampField", getTimestampField()); parserConfig.put("dateFormat", getDateFormat()); parserConfig.put("timeFields", getTimeFields()); + parserConfig.put("multiLine",getMultiLine()); GrokParser grokParser = new GrokParser(); grokParser.configure(parserConfig); @@ -132,6 +134,8 @@ public List getTimeFields() { }}; } + public String getMultiLine() { return "true"; } + public String getDateFormat() { return "yyyy-MM-dd'T'HH:mm:ss.S'Z'"; } diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java index e060559d3a..35e07f8cca 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SampleGrokParserTest.java @@ -68,6 +68,9 @@ public Map getTestData() { } + @Override + public String getMultiLine() { return "false"; } + @Override public String getGrokPath() { return "../metron-integration-test/src/main/sample/patterns/test"; diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java index 93c8276a2b..cb424fbd4a 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/SquidParserTest.java @@ -73,6 +73,8 @@ public Map getTestData() { } + @Override + public String getMultiLine() { return "false"; } @Override public String getGrokPath() { diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java index 8dd75a02d5..15ce19f65f 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/YafParserTest.java @@ -68,6 +68,9 @@ public Map getTestData() { } + @Override + public String getMultiLine() { return "false"; } + @Override public String getGrokPath() { return "../metron-parsers/src/main/resources/patterns/yaf"; From d0e15988e7c5d5408a459d7ae752f8e5197fb2bd Mon Sep 17 00:00:00 2001 From: Otto Fowler Date: Tue, 9 Oct 2018 15:41:59 -0400 Subject: [PATCH 5/7] per review, better test names, revert function parameter back to original --- .../main/java/org/apache/metron/parsers/bolt/ParserBolt.java | 2 +- .../org/apache/metron/parsers/MultiLineGrokParserTest.java | 4 ++-- .../metron/parsers/MultiLineWithErrorsGrokParserTest.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java index 8ff0dbdc8f..ff5c1d49f8 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java @@ -408,7 +408,7 @@ public void execute(Tuple tuple) { } } - protected void handleError(Object originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { + protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { MetronError error = new MetronError() .withErrorType(Constants.ErrorType.PARSER_ERROR) .withThrowable(ex) diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java index 566c572ac2..cc4d20fb4e 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineGrokParserTest.java @@ -44,7 +44,7 @@ public class MultiLineGrokParserTest { */ @Test @SuppressWarnings("unchecked") - public void test() throws IOException, ParseException { + public void testLegacyInterfaceReturnsMultiline() throws IOException, ParseException { Map parserConfig = new HashMap<>(); parserConfig.put("grokPath", getGrokPath()); @@ -75,7 +75,7 @@ public void test() throws IOException, ParseException { */ @Test @SuppressWarnings("unchecked") - public void testOptionalResult() throws IOException, ParseException { + public void testOptionalResultReturnsMultiline() throws IOException, ParseException { Map parserConfig = new HashMap<>(); parserConfig.put("grokPath", getGrokPath()); diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java index fb5db7b7d9..8ab82464eb 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/MultiLineWithErrorsGrokParserTest.java @@ -44,7 +44,7 @@ public class MultiLineWithErrorsGrokParserTest { */ @Test(expected = RuntimeException.class) @SuppressWarnings("unchecked") - public void test() throws IOException, ParseException { + public void testLegacyInterfaceThrowsOneExceptionWithMultiline() throws IOException, ParseException { Map parserConfig = new HashMap<>(); parserConfig.put("grokPath", getGrokPath()); @@ -75,7 +75,7 @@ public void test() throws IOException, ParseException { */ @Test @SuppressWarnings("unchecked") - public void testOptionalResult() throws IOException, ParseException { + public void testResultInterfaceReturnsErrorsAndMessagesWithMultiline() throws IOException, ParseException { Map parserConfig = new HashMap<>(); parserConfig.put("grokPath", getGrokPath()); From 823c7bc732d87e7d9179a10c419ad4197ec5ec85 Mon Sep 17 00:00:00 2001 From: Otto Fowler Date: Tue, 9 Oct 2018 15:57:14 -0400 Subject: [PATCH 6/7] oops --- .../org/apache/metron/parsers/integration/ParserDriver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java index 76dc2b3e87..0d6eef88e8 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java @@ -125,8 +125,8 @@ protected void prepCache() { } @Override - protected void handleError(Object originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { - errors.add((byte[])originalMessage); + protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) { + errors.add(originalMessage); LOG.error("Error parsing message: " + ex.getMessage(), ex); } From 967096b4bc14883a4ad8ab461eb45b2e6927ef03 Mon Sep 17 00:00:00 2001 From: Otto Fowler Date: Wed, 10 Oct 2018 11:12:26 -0400 Subject: [PATCH 7/7] Add new implemenation note readme for parsers Fix some spelling that you wouldn't see in markdown mode because of no line wraps reformat the GrokWebSphereParser --- metron-platform/metron-parsers/README.md | 8 +- .../message-parser-implementation-notes.md | 57 +++++ .../websphere/GrokWebSphereParser.java | 217 +++++++++--------- 3 files changed, 170 insertions(+), 112 deletions(-) create mode 100644 metron-platform/metron-parsers/message-parser-implementation-notes.md diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index 0732a1ef7a..cfcf6ed818 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -515,9 +515,13 @@ Java parser adapters are intended for higher-velocity topologies and are not eas * org.apache.metron.parsers.syslog.Syslog5424Parser : Parse Syslog RFC 5424 messages ### Grok Parser Adapters -Grok parser adapters are designed primarly for someone who is not a Java coder for quickly standing up a parser adapter for lower velocity topologies. Grok relies on Regex for message parsing, which is much slower than purpose-built Java parsers, but is more extensible. Grok parsers are defined via a config file and the topplogy does not need to be recombiled in order to make changes to them. An example of a Grok perser is: +Grok parser adapters are designed primarily for someone who is not a Java coder for quickly standing up a parser adapter for lower velocity topologies. Grok relies on Regex for message parsing, which is much slower than purpose-built Java parsers, but is more extensible. Grok parsers are defined via a config file and the topplogy does not need to be recompiled in order to make changes to them. Example of a Grok parsers are: -* org.apache.metron.parsers.GrokParser +* org.apache.metron.parsers.GrokParser and org.apache.metron.parsers.websphere.GrokWebSphereParser + +Parsers that derive from GrokParser typically allow the GrokParser to parse the messages, and then override the methods for postParse to do further parsing. +When this is the case, and the Parser has not overridden `parse(byte[])` or `parseResultOptional(byte[])` these parsers will gain support for treating byte[] input as multiple lines, with each line parsed as a separate message ( and returned as such). +This is enabled by using the `"multiline":"true"` Parser configuration option. For more information on the Grok project please refer to the following link: diff --git a/metron-platform/metron-parsers/message-parser-implementation-notes.md b/metron-platform/metron-parsers/message-parser-implementation-notes.md new file mode 100644 index 0000000000..b8afe04a9a --- /dev/null +++ b/metron-platform/metron-parsers/message-parser-implementation-notes.md @@ -0,0 +1,57 @@ + +# `MessageParser` implementation notes + + +1. Supporting multiple JSONObject returns from a single byte[] +The original `MessageParser` interface supported parsing a message and returning a `List`. Therefore explicitly supporting multiple messages from one input. +While this is fine, it only allows for the complete failure of a message for any reason. There can only be one exception thrown. This means that if there _are_ multiple messages in the buffer, any one failure will necessarily fail all of them. +To improve on this situation, a new method was added to the `MessageParser` interface (with a default implementation), that introduces a return type to provide not only the JSONObjects produced, but also a `Map` of messages -> throwable. + +To support this in your parser, you should: + +- Implement the new method + +```java + @Override + public Optional> parseOptionalResult(byte[] rawMessage) +``` + +- Implement the original `List parse(byte[] message)` to delegate to that method such as below: + +```java + @Override + public List parse(byte[] rawMessage) { + Optional> resultOptional = parseOptionalResult(rawMessage); + if (!resultOptional.isPresent()) { + return Collections.EMPTY_LIST; + } + Map errors = resultOptional.get().getMessageThrowables(); + if (!errors.isEmpty()) { + throw new RuntimeException(errors.entrySet().iterator().next().getValue()); + } + + return resultOptional.get().getMessages(); + } +``` + +- You *may* want to govern treating the incoming buffer as multiline or not by adding a configuration option for your parser, such as `"multiline":"true"|"false"` + +- See the org.apache.metron.parsers.GrokParser for an example of this implementation. + +The Metron system itself will call the new `parseOptionalResult` method during processing. The default implementation in the interface handles backwards compatability with previous implementations. diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java index 178719b601..a58e0c9efd 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/websphere/GrokWebSphereParser.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -27,117 +27,114 @@ public class GrokWebSphereParser extends GrokParser { - private static final long serialVersionUID = 4860439408055777358L; + private static final long serialVersionUID = 4860439408055777358L; - @Override - protected long formatTimestamp(Object value) { - long epochTimestamp = System.currentTimeMillis(); - if (value != null) { - try { - epochTimestamp = toEpoch(Calendar.getInstance().get(Calendar.YEAR) + " " + value); - } catch (ParseException e) { - //default to current time - } - } - return epochTimestamp; - } + @Override + protected long formatTimestamp(Object value) { + long epochTimestamp = System.currentTimeMillis(); + if (value != null) { + try { + epochTimestamp = toEpoch(Calendar.getInstance().get(Calendar.YEAR) + " " + value); + } catch (ParseException e) { + //default to current time + } + } + return epochTimestamp; + } - @Override - protected void postParse(JSONObject message) { - removeEmptyFields(message); - message.remove("timestamp_string"); - if (message.containsKey("message")) { - String messageValue = (String) message.get("message"); - if (messageValue.contains("logged into")) { - parseLoginMessage(message); - } - else if (messageValue.contains("logged out")) { - parseLogoutMessage(message); - } - else if (messageValue.contains("rbm(")) { - parseRBMMessage(message); - } - else { - parseOtherMessage(message); - } - } - } + @Override + protected void postParse(JSONObject message) { + removeEmptyFields(message); + message.remove("timestamp_string"); + if (message.containsKey("message")) { + String messageValue = (String) message.get("message"); + if (messageValue.contains("logged into")) { + parseLoginMessage(message); + } else if (messageValue.contains("logged out")) { + parseLogoutMessage(message); + } else if (messageValue.contains("rbm(")) { + parseRBMMessage(message); + } else { + parseOtherMessage(message); + } + } + } - @SuppressWarnings("unchecked") - private void removeEmptyFields(JSONObject json) { - Iterator keyIter = json.keySet().iterator(); - while (keyIter.hasNext()) { - Object key = keyIter.next(); - Object value = json.get(key); - if (null == value || "".equals(value.toString())) { - keyIter.remove(); - } - } - } + @SuppressWarnings("unchecked") + private void removeEmptyFields(JSONObject json) { + Iterator keyIter = json.keySet().iterator(); + while (keyIter.hasNext()) { + Object key = keyIter.next(); + Object value = json.get(key); + if (null == value || "".equals(value.toString())) { + keyIter.remove(); + } + } + } - //Extracts the appropriate fields from login messages - @SuppressWarnings("unchecked") - private void parseLoginMessage(JSONObject json) { - json.put("event_subtype", "login"); - String message = (String) json.get("message"); - if (message.contains(":")){ - String parts[] = message.split(":"); - String user = parts[0]; - String ip_src_addr = parts[1]; - if (user.contains("user(") && user.contains(")")) { - user = user.substring(user.indexOf("user(") + "user(".length()); - user = user.substring(0, user.indexOf(")")); - json.put("username", user); - } - if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) { - ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1); - ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]")); - json.put("ip_src_addr", ip_src_addr); - } - json.remove("message"); - } - } + //Extracts the appropriate fields from login messages + @SuppressWarnings("unchecked") + private void parseLoginMessage(JSONObject json) { + json.put("event_subtype", "login"); + String message = (String) json.get("message"); + if (message.contains(":")) { + String[] parts = message.split(":"); + String user = parts[0]; + String ip_src_addr = parts[1]; + if (user.contains("user(") && user.contains(")")) { + user = user.substring(user.indexOf("user(") + "user(".length()); + user = user.substring(0, user.indexOf(")")); + json.put("username", user); + } + if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) { + ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1); + ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]")); + json.put("ip_src_addr", ip_src_addr); + } + json.remove("message"); + } + } - //Extracts the appropriate fields from logout messages - @SuppressWarnings("unchecked") - private void parseLogoutMessage(JSONObject json) { - json.put("event_subtype", "logout"); - String message = (String) json.get("message"); - if (message.matches(".*'.*'.*'.*'.*")) { - String parts[] = message.split("'"); - String ip_src_addr = parts[0]; - if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) { - ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1); - ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]")); - json.put("ip_src_addr", ip_src_addr); - } - json.put("username", parts[1]); - json.put("security_domain", parts[3]); - json.remove("message"); - } - } + //Extracts the appropriate fields from logout messages + @SuppressWarnings("unchecked") + private void parseLogoutMessage(JSONObject json) { + json.put("event_subtype", "logout"); + String message = (String) json.get("message"); + if (message.matches(".*'.*'.*'.*'.*")) { + String parts[] = message.split("'"); + String ip_src_addr = parts[0]; + if (ip_src_addr.contains("[") && ip_src_addr.contains("]")) { + ip_src_addr = ip_src_addr.substring(ip_src_addr.indexOf("[") + 1); + ip_src_addr = ip_src_addr.substring(0, ip_src_addr.indexOf("]")); + json.put("ip_src_addr", ip_src_addr); + } + json.put("username", parts[1]); + json.put("security_domain", parts[3]); + json.remove("message"); + } + } - //Extracts the appropriate fields from RBM messages - @SuppressWarnings("unchecked") - private void parseRBMMessage(JSONObject json) { - String message = (String) json.get("message"); - if (message.contains("(")) { - json.put("process", message.substring(0, message.indexOf("("))); - if (message.contains(":")) { - json.put("message", message.substring(message.indexOf(":") + 2)); - } - } - } + //Extracts the appropriate fields from RBM messages + @SuppressWarnings("unchecked") + private void parseRBMMessage(JSONObject json) { + String message = (String) json.get("message"); + if (message.contains("(")) { + json.put("process", message.substring(0, message.indexOf("("))); + if (message.contains(":")) { + json.put("message", message.substring(message.indexOf(":") + 2)); + } + } + } - //Extracts the appropriate fields from other messages - @SuppressWarnings("unchecked") - private void parseOtherMessage(JSONObject json) { - String message = (String) json.get("message"); - if (message.contains("(")) { - json.put("process", message.substring(0, message.indexOf("("))); - if (message.contains(":")) { - json.put("message", message.substring(message.indexOf(":") + 2)); - } - } - } + //Extracts the appropriate fields from other messages + @SuppressWarnings("unchecked") + private void parseOtherMessage(JSONObject json) { + String message = (String) json.get("message"); + if (message.contains("(")) { + json.put("process", message.substring(0, message.indexOf("("))); + if (message.contains(":")) { + json.put("message", message.substring(message.indexOf(":") + 2)); + } + } + } }