From 7c48d6c4fcdcb066ed988ffba52acc588b7c05c4 Mon Sep 17 00:00:00 2001 From: cstella Date: Tue, 5 Jun 2018 15:34:20 -0400 Subject: [PATCH 1/5] Updating kafka writer and test --- metron-platform/metron-parsers/README.md | 9 +++ .../metron/writer/kafka/KafkaWriter.java | 32 ++++++++-- .../metron/writer/kafka/KafkaWriterTest.java | 63 +++++++++++++++++++ 3 files changed, 100 insertions(+), 4 deletions(-) diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index 8254bafedf..6bd79d10c6 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -166,6 +166,15 @@ then it is assumed to be a regex and will match any topic matching the pattern ( * `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met. Optional. If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm parameter `topology.message.timeout.secs`. Ignored if batchSize is `1`, since this disables batching. + * The kafka writer can be configured within the parser config as well. (This is all configured a priori, but this is convenient for overriding the settings) : + * `kafka.brokerUrl` : The broker URL + * `kafka.keySerializer` : The key serializer (defaults to `StringSerializer`) + * `kafka.valueSerializer` : The key serializer (defaults to `StringSerializer`) + * `kafka.zkQuorum` : The zookeeper quorum + * `kafka.requiredAcks` : Whether to require acks. + * `kafka.topic` : The topic to write to + * `kafka.topicField` : The field to pull the topic from. If this is specified, then the producer will use this. If it is unspecified, then it will default to the `kafka.topic` property. If neither are specified, then an error will occur. + * `kafka.producerConfigs` : A map of kafka producer configs for advanced customization. * `fieldTransformations` : An array of complex objects representing the transformations to be done on the message generated from the parser before writing out to the kafka topic. * `spoutParallelism` : The kafka spout parallelism (default to `1`). This can be overridden on the command line. * `spoutNumTasks` : The number of tasks for the spout (default to `1`). This can be overridden on the command line. diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java index efb241803b..2c762b7fda 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java @@ -50,6 +50,7 @@ public enum Configurations { ,VALUE_SERIALIZER("kafka.valueSerializer") ,REQUIRED_ACKS("kafka.requiredAcks") ,TOPIC("kafka.topic") + ,TOPIC_FIELD("kafka.topicField") ,PRODUCER_CONFIGS("kafka.producerConfigs"); ; String key; @@ -81,6 +82,7 @@ public T getAndConvert(Optional configPrefix, Map co private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer"; private int requiredAcks = 1; private String kafkaTopic = Constants.ENRICHMENT_TOPIC; + private String kafkaTopicField = null; private KafkaProducer kafkaProducer; private String configPrefix = null; private String zkQuorum = null; @@ -120,6 +122,12 @@ public KafkaWriter withTopic(String topic) { this.kafkaTopic= topic; return this; } + + public KafkaWriter withTopicField(String topicField) { + this.kafkaTopicField = topicField; + return this; + } + public KafkaWriter withConfigPrefix(String prefix) { this.configPrefix = prefix; return this; @@ -166,6 +174,10 @@ public void configure(String sensorName, WriterConfiguration configuration) { if(topic != null) { withTopic(topic); } + String topicField = Configurations.TOPIC_FIELD.getAndConvert(getConfigPrefix(), configMap, String.class); + if(topicField != null) { + withTopicField(topicField); + } Map producerConfigs = (Map)Configurations.PRODUCER_CONFIGS.get(getConfigPrefix(), configMap); if(producerConfigs != null) { withProducerConfigs(producerConfigs); @@ -197,6 +209,15 @@ public Map createProducerConfigs() { return producerConfig; } + public Optional getKafkaTopic(JSONObject message) { + if(kafkaTopicField != null) { + return Optional.ofNullable((String)message.get(kafkaTopicField)); + } + else { + return Optional.ofNullable(kafkaTopic); + } + } + @Override public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable tuples, List messages) { @@ -212,10 +233,13 @@ public BulkWriterResponse write(String sensorType, WriterConfiguration configura writerResponse.addError(t, tuple); continue; } - Future future = kafkaProducer - .send(new ProducerRecord(kafkaTopic, jsonMessage)); - // we want to manage the batching - results.add(new AbstractMap.SimpleEntry<>(tuple, future)); + Optional topic = getKafkaTopic(message); + if(topic.isPresent()) { + Future future = kafkaProducer + .send(new ProducerRecord(kafkaTopic, jsonMessage)); + // we want to manage the batching + results.add(new AbstractMap.SimpleEntry<>(tuple, future)); + } } try { diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java index 1b9543055a..9d201b83ec 100644 --- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java @@ -19,10 +19,12 @@ package org.apache.metron.writer.kafka; import com.google.common.collect.ImmutableMap; +import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; import org.apache.metron.common.configuration.writer.ParserWriterConfiguration; import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.json.simple.JSONObject; import org.junit.Assert; import org.junit.Test; @@ -85,4 +87,65 @@ public void testHappyPathGlobalConfigWithPrefix() throws Exception { Assert.assertEquals(producerConfigs.get("key1"), 1); Assert.assertEquals(producerConfigs.get("key2"), "value2"); } + + @Test + public void testTopicField_bothTopicAndFieldSpecified() throws Exception { + KafkaWriter writer = new KafkaWriter(); + WriterConfiguration configuration = createConfiguration( + new HashMap() {{ + put("kafka.brokerUrl" , "localhost:6667"); + put("kafka.topic" , SENSOR_TYPE); + put("kafka.topicField" , "kafka_topic"); + put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2")); + }} + ); + + writer.configure(SENSOR_TYPE, configuration); + Assert.assertEquals( "metron" + , writer.getKafkaTopic(new JSONObject() {{ + put("kafka_topic", "metron"); + }}).get() + ); + Assert.assertFalse( writer.getKafkaTopic(new JSONObject()).isPresent() ); + + } + + @Test + public void testTopicField_onlyFieldSpecified() throws Exception { + KafkaWriter writer = new KafkaWriter(); + WriterConfiguration configuration = createConfiguration( + new HashMap() {{ + put("kafka.brokerUrl" , "localhost:6667"); + put("kafka.topicField" , "kafka_topic"); + put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2")); + }} + ); + + writer.configure(SENSOR_TYPE, configuration); + Assert.assertEquals( "metron" + , writer.getKafkaTopic(new JSONObject() {{ + put("kafka_topic", "metron"); + }}).get() + ); + Assert.assertFalse( writer.getKafkaTopic(new JSONObject()).isPresent() ); + } + + @Test + public void testTopicField_neitherSpecified() throws Exception { + KafkaWriter writer = new KafkaWriter(); + WriterConfiguration configuration = createConfiguration( + new HashMap() {{ + put("kafka.brokerUrl" , "localhost:6667"); + put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2")); + }} + ); + + writer.configure(SENSOR_TYPE, configuration); + Assert.assertEquals(Constants.ENRICHMENT_TOPIC + , writer.getKafkaTopic(new JSONObject() {{ + put("kafka_topic", "metron"); + }}).get() + ); + Assert.assertTrue( writer.getKafkaTopic(new JSONObject()).isPresent() ); + } } From 5cffccb3bd7a5894982931a0548b00c3def8a2cc Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 8 Jun 2018 01:33:01 -0400 Subject: [PATCH 2/5] Typo. --- .../main/java/org/apache/metron/writer/kafka/KafkaWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java index 2c762b7fda..83b5c9f36a 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java @@ -236,7 +236,7 @@ public BulkWriterResponse write(String sensorType, WriterConfiguration configura Optional topic = getKafkaTopic(message); if(topic.isPresent()) { Future future = kafkaProducer - .send(new ProducerRecord(kafkaTopic, jsonMessage)); + .send(new ProducerRecord(topic.get(), jsonMessage)); // we want to manage the batching results.add(new AbstractMap.SimpleEntry<>(tuple, future)); } From bff12f1cc06a68354a28bfa1bcad57b9694812a8 Mon Sep 17 00:00:00 2001 From: cstella Date: Mon, 2 Jul 2018 13:34:02 -0400 Subject: [PATCH 3/5] Updating docs --- metron-platform/metron-parsers/README.md | 10 +--------- metron-platform/metron-writer/README.md | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index 00722eebdc..7ddfdeaf43 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -166,15 +166,7 @@ then it is assumed to be a regex and will match any topic matching the pattern ( * `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met. Optional. If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm parameter `topology.message.timeout.secs`. Ignored if batchSize is `1`, since this disables batching. - * The kafka writer can be configured within the parser config as well. (This is all configured a priori, but this is convenient for overriding the settings) : - * `kafka.brokerUrl` : The broker URL - * `kafka.keySerializer` : The key serializer (defaults to `StringSerializer`) - * `kafka.valueSerializer` : The key serializer (defaults to `StringSerializer`) - * `kafka.zkQuorum` : The zookeeper quorum - * `kafka.requiredAcks` : Whether to require acks. - * `kafka.topic` : The topic to write to - * `kafka.topicField` : The field to pull the topic from. If this is specified, then the producer will use this. If it is unspecified, then it will default to the `kafka.topic` property. If neither are specified, then an error will occur. - * `kafka.producerConfigs` : A map of kafka producer configs for advanced customization. + * The kafka writer can be configured within the parser config as well. (This is all configured a priori, but this is convenient for overriding the settings). See [here](../metron-writer/README.md#kafka-writer) * `fieldTransformations` : An array of complex objects representing the transformations to be done on the message generated from the parser before writing out to the kafka topic. * `spoutParallelism` : The kafka spout parallelism (default to `1`). This can be overridden on the command line. * `spoutNumTasks` : The number of tasks for the spout (default to `1`). This can be overridden on the command line. diff --git a/metron-platform/metron-writer/README.md b/metron-platform/metron-writer/README.md index 16c66868c0..21cdbcac86 100644 --- a/metron-platform/metron-writer/README.md +++ b/metron-platform/metron-writer/README.md @@ -20,6 +20,30 @@ limitations under the License. ## Introduction The writer module provides some utilties for writing to outside components from within Storm. This includes managing bulk writing. An implemention is included for writing to HDFS in this module. Other writers can be found in their own modules. +## Kafka Writer +We have an implementation of a writer which will write batches of +messages to Kafka. An interesting aspect of this writer is that it can +be configured to allow users to specify a message field which contains +the topic for the message. + +The configuration for this writer is held in the individual Sensor +Configurations: +* [Enrichment](../metron-enrichment/README.md#sensor-enrichment-configuration) under the `config` element +* [Parsers](../metron-parsers/README.md#parser-configuration) in the `parserConfig` element +* Profiler - Unsupported currently + +In each of these, the kafka writer can be configured via a map which has +the following elements: +* `kafka.brokerUrl` : The broker URL +* `kafka.keySerializer` : The key serializer (defaults to `StringSerializer`) +* `kafka.valueSerializer` : The key serializer (defaults to `StringSerializer`) +* `kafka.zkQuorum` : The zookeeper quorum +* `kafka.requiredAcks` : Whether to require acks. +* `kafka.topic` : The topic to write to +* `kafka.topicField` : The field to pull the topic from. If this is specified, then the producer will use this. If it is unspecified, then it will default to the `kafka.topic` property. If neither are specified, then an error will occur. +* `kafka.producerConfigs` : A map of kafka producer configs for advanced customization. + + ## HDFS Writer The HDFS writer included here expands on what Storm has in several ways. There's customization in syncing to HDFS, rotation policy, etc. In addition, the writer allows for users to define output paths based on the fields in the provided JSON message. This can be defined using Stellar. From 4f92105dadcdbdeb339cf052ae9dfbbb3d4e88c6 Mon Sep 17 00:00:00 2001 From: cstella Date: Mon, 2 Jul 2018 15:52:15 -0400 Subject: [PATCH 4/5] Added integration test --- .../WriterBoltIntegrationTest.java | 75 ++++++++++++++++++- .../metron/writer/kafka/KafkaWriter.java | 12 ++- 2 files changed, 82 insertions(+), 5 deletions(-) diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java index dfadfdc7fe..99506de894 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java @@ -22,6 +22,7 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import java.io.IOException; import java.io.Serializable; @@ -32,6 +33,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.function.Predicate; import javax.annotation.Nullable; import org.adrianwalker.multilinestring.Multiline; import org.apache.hadoop.hbase.util.Bytes; @@ -66,7 +68,7 @@ public static class MockValidator implements FieldValidation { @Override public boolean isValid(Map input, Map validationConfig, Map globalConfig, Context context) { - if (input.get("action").equals("invalid")) { + if (input.get("action") != null && input.get("action").equals("invalid")) { return false; } return true; @@ -105,6 +107,69 @@ public void initialize(Map validationConfig, Map @Multiline public static String parserConfigJSON; + /** + * { + * "parserClassName" : "org.apache.metron.parsers.csv.CSVParser", + * "sensorTopic": "dummy", + * "outputTopic": "output", + * "errorTopic": "parser_error", + * "parserConfig": { + * "batchSize" : 1, + * "columns" : { + * "name" : 0, + * "dummy" : 1 + * }, + * "kafka.topicField" : "route_field" + * } + * ,"fieldTransformations" : [ + * { + * "transformation" : "STELLAR" + * ,"input" : ["name"] + * ,"output" : ["route_field"] + * ,"config" : { + * "route_field" : "match{ name == 'metron' => 'output', default => NULL}" + * } + * } + * ] + * } + */ + @Multiline + public static String parserConfigJSONKafkaRedirection; + + @Test + public void test_topic_redirection() throws Exception { + final String sensorType = "dummy"; + SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSONKafkaRedirection, SensorParserConfig.class); + final List inputMessages = new ArrayList() {{ + add(Bytes.toBytes("metron,foo")); + add(Bytes.toBytes("notmetron,foo")); + add(Bytes.toBytes("metron,bar")); + add(Bytes.toBytes("metron,baz")); + }}; + + final Properties topologyProperties = new Properties(); + ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigWithValidation); + try { + runner.start(); + kafkaComponent.writeMessages(sensorType, inputMessages); + KafkaProcessor>> kafkaProcessor = getKafkaProcessor( + parserConfig.getOutputTopic(), parserConfig.getErrorTopic(), kafkaMessageSet -> kafkaMessageSet.getMessages().size() == 3 && kafkaMessageSet.getErrors().isEmpty()); + ProcessorResult>> result = runner.process(kafkaProcessor); + + // validate the output messages + Map> outputMessages = result.getResult(); + for(JSONObject j : outputMessages.get(Constants.ENRICHMENT_TOPIC)) { + Assert.assertEquals("metron", j.get("name")); + Assert.assertEquals("output", j.get("route_field")); + Assert.assertTrue(ImmutableSet.of("foo", "bar", "baz").contains(j.get("dummy"))); + } + } finally { + if(runner != null) { + runner.stop(); + } + } + } + @Test public void parser_with_global_validations_writes_bad_records_to_error_topic() throws Exception { final String sensorType = "dummy"; @@ -192,9 +257,13 @@ public ComponentRunner setupTopologyComponents(Properties topologyProperties, St .build(); } - @SuppressWarnings("unchecked") private KafkaProcessor>> getKafkaProcessor(String outputTopic, String errorTopic) { + return getKafkaProcessor(outputTopic, errorTopic, messageSet -> (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 2)); + } + @SuppressWarnings("unchecked") + private KafkaProcessor>> getKafkaProcessor(String outputTopic, + String errorTopic, Predicate predicate) { return new KafkaProcessor<>() .withKafkaComponentName("kafka") @@ -204,7 +273,7 @@ private KafkaProcessor>> getKafkaProcessor(String o @Nullable @Override public Boolean apply(@Nullable KafkaMessageSet messageSet) { - return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 2); + return predicate.test(messageSet); } }) .withProvideResult(new Function>>() { diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java index 83b5c9f36a..5a6313c25f 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java @@ -19,6 +19,7 @@ import com.google.common.base.Joiner; import java.io.Serializable; +import java.lang.invoke.MethodHandles; import java.util.AbstractMap; import java.util.ArrayList; import java.util.HashMap; @@ -41,8 +42,11 @@ import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaWriter extends AbstractWriter implements BulkMessageWriter, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public enum Configurations { BROKER("kafka.brokerUrl") ,KEY_SERIALIZER("kafka.keySerializer") @@ -210,12 +214,16 @@ public Map createProducerConfigs() { } public Optional getKafkaTopic(JSONObject message) { + String t = null; if(kafkaTopicField != null) { - return Optional.ofNullable((String)message.get(kafkaTopicField)); + t = (String)message.get(kafkaTopicField); + LOG.debug("Sending to topic: {} based on the field {}", t, kafkaTopicField); } else { - return Optional.ofNullable(kafkaTopic); + t = kafkaTopic; + LOG.debug("Sending to topic: {}", t); } + return Optional.ofNullable(t); } @Override From 107184ca2aa0c75f4ab52802f0fc091e0ad8f7d6 Mon Sep 17 00:00:00 2001 From: cstella Date: Mon, 2 Jul 2018 16:30:56 -0400 Subject: [PATCH 5/5] One more debug log --- .../main/java/org/apache/metron/writer/kafka/KafkaWriter.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java index 5a6313c25f..599ecbdc23 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java @@ -248,6 +248,9 @@ public BulkWriterResponse write(String sensorType, WriterConfiguration configura // we want to manage the batching results.add(new AbstractMap.SimpleEntry<>(tuple, future)); } + else { + LOG.debug("Dropping {} because no topic is specified.", jsonMessage); + } } try {