Skip to content

Commit

Permalink
nifi-2565: Add Grok parser
Browse files Browse the repository at this point in the history
  • Loading branch information
selim-namsi committed Oct 25, 2016
1 parent 94f8a96 commit e7a2833
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 168 deletions.
11 changes: 11 additions & 0 deletions nifi-commons/nifi-processor-utilities/pom.xml
Expand Up @@ -45,5 +45,16 @@
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.thekraken</groupId>
<artifactId>grok</artifactId>
<version>0.1.4</version>
<exclusions>
<exclusion>
<groupId>com.google.code</groupId>
<artifactId>gson</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
Expand Up @@ -26,6 +26,8 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import oi.thekraken.grok.api.Grok;
import oi.thekraken.grok.api.exception.GrokException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
Expand Down Expand Up @@ -367,6 +369,35 @@ public ValidationResult validate(final String subject, final String input, final

public static final Validator FILE_EXISTS_VALIDATOR = new FileExistsValidator(true);

public static final Validator GROK_EXPRESSION_VALIDATOR = new Validator() {

@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {

Grok grok = new Grok();
try {
grok.compile(input);
} catch (GrokException e) {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(false)
.explanation("Not a valid Grok Expression")
.build();
} catch (java.util.regex.PatternSyntaxException e) {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(false)
.explanation("Not a valid Grok Expression")
.build();
}

return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();

}
};

//
//
// FACTORY METHODS FOR VALIDATORS
Expand Down
Expand Up @@ -390,9 +390,9 @@ language governing permissions and limitations under the License. -->
<exclude>src/test/resources/TestEncryptContent/salted_128_raw.enc</exclude>
<exclude>src/test/resources/TestEncryptContent/unsalted_raw.enc</exclude>
<exclude>src/test/resources/TestEncryptContent/unsalted_128_raw.enc</exclude>
<exclude>src/test/resources/TestGrokParser/apache.log</exclude>
<exclude>src/test/resources/TestGrokParser/simple_text.log</exclude>
<exclude>src/test/resources/TestGrokParser/patterns</exclude>
<exclude>src/test/resources/TestExtractGrok/apache.log</exclude>
<exclude>src/test/resources/TestExtractGrok/simple_text.log</exclude>
<exclude>src/test/resources/TestExtractGrok/patterns</exclude>
<!-- This file is copied from https://github.com/jeremyh/jBCrypt because the binary is compiled for Java 8 and we must support Java 7 -->
<exclude>src/main/java/org/apache/nifi/processors/standard/util/crypto/bcrypt/BCrypt.java</exclude>
</excludes>
Expand Down
Expand Up @@ -21,12 +21,9 @@
import oi.thekraken.grok.api.Grok;
import oi.thekraken.grok.api.Match;
import oi.thekraken.grok.api.exception.GrokException;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
Expand All @@ -35,7 +32,6 @@

import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
Expand All @@ -45,6 +41,8 @@
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -56,42 +54,47 @@
import java.util.HashSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;


@Tags({"Grok Processor"})
@CapabilityDescription("Use Grok expression ,a la logstash, to parse data.")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class GrokParser extends AbstractProcessor {
@Tags({"Grok Processor","grok", "log", "text", "parse", "delimit", "extract"})
@CapabilityDescription("Evaluates one or more Grok Expressions against the content of a FlowFile, "+
"adding the results as attributes or replacing the content of the FlowFile with a JSON "+
"notation of the matched content")
@WritesAttributes({
@WritesAttribute(attribute = "grok.XXX", description ="Each of the Grok identifier that is matched in the flowfile will be added as an attribute, prefixed with \"grok.\" For example,"+
"if the grok identifier \"timestamp\" is matched, then the value will be added to an attribute named \"grok.timestamp\"")})
public class ExtractGrok extends AbstractProcessor {


public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
public static final String DESTINATION_CONTENT = "flowfile-content";
public static final String FLOWFILE_ATTRIBUTE = "flowfile-attribute";
public static final String FLOWFILE_CONTENT = "flowfile-content";
private static final String APPLICATION_JSON = "application/json";

public static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor
.Builder().name("Grok Expression")
.description("Grok expression")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.GROK_EXPRESSION_VALIDATOR)
.build();

public static final PropertyDescriptor GROK_PATTERN_FILE = new PropertyDescriptor
.Builder().name("Grok Pattern file")
.description("Grok Pattern file definition")
.required(false)
.required(true)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.build();

public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
.name("Destination")
.description("Control if Grok output value is written as a new flowfile attribute " +
"or written in the flowfile content. Writing to flowfile content will overwrite any " +
"existing flowfile content.")
.description("Control if Grok output value is written as a new flowfile attributes, in this case "+
"each of the Grok identifier that is matched in the flowfile will be added as an attribute, "+
"prefixed with \"grok.\" or written in the flowfile content. Writing to flowfile content "+
"will overwrite any existing flowfile content.")

.required(true)
.allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT)
.defaultValue(DESTINATION_ATTRIBUTE)
.allowableValues(FLOWFILE_ATTRIBUTE, FLOWFILE_CONTENT)
.defaultValue(FLOWFILE_ATTRIBUTE)
.build();

public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor
Expand Down Expand Up @@ -121,35 +124,33 @@ public class GrokParser extends AbstractProcessor {
.description("FlowFiles are routed to this relationship when no provided Grok Expression matches the content of the FlowFile")
.build();

private List<PropertyDescriptor> descriptors;

private Set<Relationship> relationships;
private final static List<PropertyDescriptor> descriptors;
private final static Set<Relationship> relationships;

private static final ObjectMapper objectMapper = new ObjectMapper();

private Grok grok;
private final static Grok grok = Grok.EMPTY;
private byte[] buffer;


@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(GROK_EXPRESSION);
descriptors.add(GROK_PATTERN_FILE);
descriptors.add(DESTINATION);
descriptors.add(CHARACTER_SET);
descriptors.add(MAX_BUFFER_SIZE);
this.descriptors = Collections.unmodifiableList(descriptors);

final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_MATCH);
relationships.add(REL_NO_MATCH);
this.relationships = Collections.unmodifiableSet(relationships);
static {
final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_MATCH);
_relationships.add(REL_NO_MATCH);
relationships = Collections.unmodifiableSet(_relationships);

final List<PropertyDescriptor> _descriptors = new ArrayList<>();
_descriptors.add(GROK_EXPRESSION);
_descriptors.add(GROK_PATTERN_FILE);
_descriptors.add(DESTINATION);
_descriptors.add(CHARACTER_SET);
_descriptors.add(MAX_BUFFER_SIZE);
descriptors = Collections.unmodifiableList(_descriptors);
}


@Override
public Set<Relationship> getRelationships() {
return this.relationships;
return relationships;
}

@Override
Expand All @@ -158,18 +159,13 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
}

@OnScheduled
public void onScheduled(final ProcessContext context) {
public void onScheduled(final ProcessContext context) throws GrokException{

final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
buffer = new byte[maxBufferSize];

try{
grok = Grok.create(context.getProperty(GROK_PATTERN_FILE).getValue());
grok.compile(context.getProperty(GROK_EXPRESSION).getValue());
}catch (GrokException e){
getLogger().error("Failed to initialize ExtractGrok due to: ", e);
}

grok.addPatternFromFile(context.getProperty(GROK_PATTERN_FILE).getValue());
grok.compile(context.getProperty(GROK_EXPRESSION).getValue());

}

Expand All @@ -179,9 +175,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
if ( flowFile == null ) {
return;
}

final StopWatch stopWatch = new StopWatch(true);
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final Map<String, String> grokResults = new HashMap<>();
final byte[] byteBuffer = buffer;
session.read(flowFile, new InputStreamCallback() {
@Override
Expand All @@ -195,40 +190,42 @@ public void process(InputStream in) throws IOException {

final Match gm = grok.match(contentString);
gm.captures();
for(Map.Entry<String,Object> entry: gm.toMap().entrySet()){
if(null != entry.getValue() ) {
grokResults.put(entry.getKey(), entry.getValue().toString());
}
}

if (grokResults.isEmpty()) {

if (gm.toMap().isEmpty()) {
session.transfer(flowFile, REL_NO_MATCH);
getLogger().info("Did not match any Grok Expressions for FlowFile {}", new Object[]{flowFile});
return ;
}
final ObjectMapper objectMapper = new ObjectMapper();
switch (context.getProperty(DESTINATION).getValue()){
case DESTINATION_ATTRIBUTE:
case FLOWFILE_ATTRIBUTE:

Map<String, String> grokResults = new HashMap<>();
for(Map.Entry<String,Object> entry: gm.toMap().entrySet()){
if(null != entry.getValue() ) {
grokResults.put("grok."+entry.getKey(), entry.getValue().toString());
}
}

flowFile = session.putAllAttributes(flowFile, grokResults);

session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_MATCH);
getLogger().info("Matched {} Grok Expressions and added attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile});

break;
case DESTINATION_CONTENT:
case FLOWFILE_CONTENT:

FlowFile conFlowfile = session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
outputStream.write(objectMapper.writeValueAsBytes(grokResults));
outputStream.write(objectMapper.writeValueAsBytes(gm.toMap()));
}
}
});
conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
session.getProvenanceReporter().modifyContent(conFlowfile,"Replaced content with parsed Grok fields and values");
session.getProvenanceReporter().modifyContent(conFlowfile,"Replaced content with parsed Grok fields and values",stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(conFlowfile, REL_MATCH);

break;
Expand Down
Expand Up @@ -46,7 +46,7 @@ org.apache.nifi.processors.standard.JoltTransformJSON
org.apache.nifi.processors.standard.GenerateTableFetch
org.apache.nifi.processors.standard.GetJMSQueue
org.apache.nifi.processors.standard.GetJMSTopic
org.apache.nifi.processors.standard.GrokParser
org.apache.nifi.processors.standard.ExtractGrok
org.apache.nifi.processors.standard.ListDatabaseTables
org.apache.nifi.processors.standard.ListFile
org.apache.nifi.processors.standard.ListenHTTP
Expand Down

0 comments on commit e7a2833

Please sign in to comment.