From 83ec7e979522a6932f46839edf2bca0969ebf817 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 22 May 2017 16:04:34 -0400 Subject: [PATCH] NIFI-3949: Updated Grok Reader to allow for sub-patterns to be used when determining the schema --- .../java/org/apache/nifi/grok/GrokReader.java | 45 ++++++++++++++----- .../nifi/grok/TestGrokRecordReader.java | 40 +++++++++++++++++ 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index a874632e8909..dcf8b5a9aca6 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -133,35 +133,56 @@ public void preCompile(final ConfigurationContext context) throws GrokException, appendUnmatchedLine = context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue()); - this.recordSchema = createRecordSchema(grok); + final String schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue(); + if (STRING_FIELDS_FROM_GROK_EXPRESSION.getValue().equals(schemaAccess)) { + this.recordSchema = createRecordSchema(grok); + } else { + this.recordSchema = null; + } } static RecordSchema createRecordSchema(final Grok grok) { final List fields = new ArrayList<>(); String grokExpression = grok.getOriginalGrokPattern(); + populateSchemaFieldNames(grok, grokExpression, fields); + + fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + return schema; + } + + private static void populateSchemaFieldNames(final Grok grok, String grokExpression, final List fields) { while (grokExpression.length() > 0) { final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression); if (matcher.find()) { - final Map namedGroups = GrokUtils.namedGroups(matcher, grokExpression); - final String fieldName = namedGroups.get("subname"); + final Map extractedGroups = GrokUtils.namedGroups(matcher, grokExpression); + final String subName = extractedGroups.get("subname"); - DataType dataType = RecordFieldType.STRING.getDataType(); - final RecordField recordField = new RecordField(fieldName, dataType); - fields.add(recordField); + if (subName == null) { + final String subPatternName = extractedGroups.get("pattern"); + if (subPatternName == null) { + continue; + } + + final String subExpression = grok.getPatterns().get(subPatternName); + populateSchemaFieldNames(grok, subExpression, fields); + } else { + DataType dataType = RecordFieldType.STRING.getDataType(); + final RecordField recordField = new RecordField(subName, dataType); + fields.add(recordField); + } if (grokExpression.length() > matcher.end() + 1) { - grokExpression = grokExpression.substring(matcher.end() + 1); + grokExpression = grokExpression.substring(matcher.end()); } else { break; } + } else { + break; } } - - fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType())); - - final RecordSchema schema = new SimpleRecordSchema(fields); - return schema; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java index ae5d433effa3..1f9d57206e5c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java @@ -25,12 +25,15 @@ import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.List; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; import org.junit.Test; import io.thekraken.grok.api.Grok; @@ -186,4 +189,41 @@ public void testParseStackTrace() throws GrokException, IOException, MalformedRe } } + @Test + public void testInheritNamedParameters() throws FileNotFoundException, IOException, GrokException, MalformedRecordException { + final String syslogMsg = "May 22 15:58:23 my-host nifi[12345]:My Message"; + final byte[] msgBytes = syslogMsg.getBytes(); + + try (final InputStream in = new ByteArrayInputStream(msgBytes)) { + final Grok grok = new Grok(); + grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); + grok.compile("%{SYSLOGBASE}%{GREEDYDATA:message}"); + + final RecordSchema schema = GrokReader.createRecordSchema(grok); + final List fieldNames = schema.getFieldNames(); + assertEquals(8, fieldNames.size()); + assertTrue(fieldNames.contains("timestamp")); + assertTrue(fieldNames.contains("logsource")); + assertTrue(fieldNames.contains("facility")); + assertTrue(fieldNames.contains("priority")); + assertTrue(fieldNames.contains("program")); + assertTrue(fieldNames.contains("pid")); + assertTrue(fieldNames.contains("message")); + assertTrue(fieldNames.contains("stackTrace")); // always implicitly there + + final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, true); + final Record record = deserializer.nextRecord(); + + assertEquals("May 22 15:58:23", record.getValue("timestamp")); + assertEquals("my-host", record.getValue("logsource")); + assertNull(record.getValue("facility")); + assertNull(record.getValue("priority")); + assertEquals("nifi", record.getValue("program")); + assertEquals("12345", record.getValue("pid")); + assertEquals("My Message", record.getValue("message")); + + assertNull(deserializer.nextRecord()); + } + } + }