Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-954: Create ability to change output topic of parsers from the CLI #588

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public static TopologyBuilder build(String zookeeperUrl,
int errorWriterParallelism,
int errorWriterNumTasks,
Map<String, Object> kafkaSpoutConfig,
Optional<String> securityProtocol
Optional<String> securityProtocol,
Optional<String> outputTopic
) throws Exception {

// fetch configuration from zookeeper
Expand All @@ -87,7 +88,7 @@ public static TopologyBuilder build(String zookeeperUrl,
.setNumTasks(spoutNumTasks);

// create the parser bolt
ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig);
ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, outputTopic);
builder.setBolt("parserBolt", parserBolt, parserParallelism)
.setNumTasks(parserNumTasks)
.shuffleGrouping("kafkaSpout");
Expand Down Expand Up @@ -169,6 +170,7 @@ private static ParserBolt createParserBolt( String zookeeperUrl
, Optional<String> securityProtocol
, ParserConfigurations configs
, SensorParserConfig parserConfig
, Optional<String> outputTopic
)
{

Expand All @@ -181,7 +183,7 @@ private static ParserBolt createParserBolt( String zookeeperUrl
createKafkaWriter( brokerUrl
, zookeeperUrl
, securityProtocol
).withTopic(Constants.ENRICHMENT_TOPIC) :
).withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC)) :
ReflectionUtils.createInstance(parserConfig.getWriterClassName());
writer.configure(sensorType, new ParserWriterConfiguration(configs));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.metron.parsers.topology;

import org.apache.metron.common.Constants;
import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
Expand Down Expand Up @@ -188,6 +189,18 @@ public enum ParserOptions {
return o;
}
)
,OUTPUT_TOPIC("ot", code -> {
Option o = new Option(code
, "output_topic"
, true
, "The output kafka topic for the parser. If unset, the default is " + Constants.ENRICHMENT_TOPIC
);
o.setArgName("KAFKA_TOPIC");
o.setRequired(false);
o.setType(String.class);
return o;
}
)
,TEST("t", code ->
{
Option o = new Option("t", "test", true, "Run in Test Mode");
Expand Down Expand Up @@ -296,6 +309,7 @@ public static void main(String[] args) {
if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
}
Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty();
Optional<String> securityProtocol = ParserOptions.SECURITY_PROTOCOL.has(cmd)?Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)):Optional.empty();
securityProtocol = getSecurityProtocol(securityProtocol, spoutConfig);
TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
Expand All @@ -308,7 +322,8 @@ public static void main(String[] args) {
errorParallelism,
errorNumTasks,
spoutConfig,
securityProtocol
securityProtocol,
outputTopic
);
Config stormConf = ParserOptions.getConfig(cmd);
if (ParserOptions.TEST.has(cmd)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void test() throws Exception {
final Properties topologyProperties = new Properties();
final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
add(new KafkaComponent.Topic(sensorType, 1));
add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
add(new KafkaComponent.Topic(getOutputTopic(), 1));
add(new KafkaComponent.Topic(ERROR_TOPIC,1));
}});
topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
Expand All @@ -62,6 +62,7 @@ public void test() throws Exception {
ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
.withSensorType(sensorType)
.withTopologyProperties(topologyProperties)
.withOutputTopic(getOutputTopic())
.withBrokerUrl(kafkaComponent.getBrokerList()).build();

//UnitTestHelper.verboseLogging();
Expand Down Expand Up @@ -114,7 +115,7 @@ private KafkaProcessor<List<byte[]>> getProcessor(){

return new KafkaProcessor<>()
.withKafkaComponentName("kafka")
.withReadTopic(Constants.ENRICHMENT_TOPIC)
.withReadTopic(getOutputTopic())
.withErrorTopic(ERROR_TOPIC)
.withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
@Nullable
Expand All @@ -133,5 +134,8 @@ public List<byte[]> apply(@Nullable KafkaMessageSet messageSet) {
}
abstract String getSensorType();
abstract List<ParserValidation> getValidations();
String getOutputTopic() {
return Constants.ENRICHMENT_TOPIC;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ String getSensorType() {
return "squid";
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required? Could we write a test just for testing the setting?

Copy link
Member Author

@cestella cestella May 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could spin up a new integration test, but I thought this did the trick and didn't add the cost of another integration test to the build time.

I do have a test ensuring the value gets passed through to the parser. This just completed the loop. If we don't feel that an integration test is necessary here, then I can remove it too.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a couple of things that come to mind, not all relevant:

  • With 777 and 942, squid is a real parser extension, installed and usable etc. So throwing in a test of something outside the parser into it's integration test, and making it diffferent than the others seems not right to me.

  • Same for any parser really. The parsers tests are for testing the parsers. You guys did a lot of work making the tests alike, and understandable, we should maintain that.

  • If you don't change that, I don't think I'll have to do conflict work in 777, which I continue to have to maintain ;) ( disregard this )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha ok, you got it. I'll adjust it tomorrow to be a different type of test and not piggyback on squid. You're right, the parser tests are for testing the parsers, not testing the ancillary stuff, like kafka writers. Thanks for keeping me honest, @ottobackwards

String getOutputTopic() {
return "squid_out";
}

@Override
List<ParserValidation> getValidations() {
return new ArrayList<ParserValidation>() {{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ public class ParserTopologyComponent implements InMemoryComponent {
private String brokerUrl;
private String sensorType;
private LocalCluster stormCluster;
private String outputTopic;

public static class Builder {
Properties topologyProperties;
String brokerUrl;
String sensorType;
String outputTopic;
public Builder withTopologyProperties(Properties topologyProperties) {
this.topologyProperties = topologyProperties;
return this;
Expand All @@ -63,15 +65,21 @@ public Builder withSensorType(String sensorType) {
return this;
}

public Builder withOutputTopic(String topic) {
this.outputTopic = topic;
return this;
}

public ParserTopologyComponent build() {
return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType);
return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic);
}
}

public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType) {
public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic) {
this.topologyProperties = topologyProperties;
this.brokerUrl = brokerUrl;
this.sensorType = sensorType;
this.outputTopic = outputTopic;
}


Expand All @@ -89,6 +97,7 @@ public void start() throws UnableToStartException {
, 1
, null
, Optional.empty()
, Optional.ofNullable(outputTopic)
);
Map<String, Object> stormConf = new HashMap<>();
stormConf.put(Config.TOPOLOGY_DEBUG, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public void testCLI_insufficientArg() throws ParseException {
.build(true);
UnitTestHelper.setLog4jLevel(Parser.class, Level.ERROR);
}

public void happyPath(boolean longOpt) throws ParseException {
CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
Expand Down Expand Up @@ -147,6 +148,22 @@ public void testConfig_noExtra(boolean longOpt) throws ParseException {
Assert.assertEquals(3, config.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
Assert.assertEquals(4, config.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
}

@Test
public void testOutputTopic() throws Exception {
testOutputTopic(true);
testOutputTopic(false);
}

public void testOutputTopic(boolean longOpt) throws ParseException {
CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
.with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
.with(ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC, "my_topic")
.build(longOpt);
Assert.assertEquals("my_topic", ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC.get(cli));
}

/**
{
"string" : "foo"
Expand Down