Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.

METRON-1642: KafkaWriter should be able choose the topic from a field in addition to topology construction time#1082

Closed
cestella wants to merge 6 commits intoapache:masterfrom
cestella:kafkaWriterFromField2
Closed

METRON-1642: KafkaWriter should be able choose the topic from a field in addition to topology construction time#1082
cestella wants to merge 6 commits intoapache:masterfrom
cestella:kafkaWriterFromField2

Conversation

@cestella
Copy link
Copy Markdown
Member

@cestella cestella commented Jun 27, 2018

Contributor Comments

Currently, we choose the kafka topic via the kafka.topic field. It would be useful to allow people to specify the topic via a field. This would enable multi-stage (or chain) parsing, among other use-cases.

Manual Test

Ensure that ZOOKEEPER, BROKERLIST and METRON_HOME are set

  • Create a topics for 2 sensors:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create --topic 1642_pre --partitions 1 --replication-factor 1
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create --topic 1642_second --partitions 1 --replication-factor 1
  • Open $METRON_HOME/config/zookeeper/parsers/1642_pre.json and save
{
  "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
  ,"sensorTopic" : "1642_pre"
  , "parserConfig": {
     "columns" : { "name": 0
                 , "first_field": 1
                 , "timestamp" : 2
                 },
     "kafka.topicField" : "route_field"
   }
  ,"fieldTransformations" : [
    {
     "transformation" : "STELLAR"
    ,"input" :  ["name"]
    ,"output" :  ["route_field"]
    ,"config" : {
      "route_field" : "match{ name == 'metron' => '1642_second', default => 'enrichments'}"
                }
    }
                           ]

}
  • Open $METRON_HOME/config/zookeeper/parsers/1642_second.json and save
{
  "parserClassName" : "org.apache.metron.parsers.json.JSONMapParser"
  ,"sensorTopic" : "1642_second"
  , "parserConfig": {
   }
  ,"fieldTransformations" : [
    {
     "transformation" : "STELLAR"
    ,"input" :  ["first_field"]
    ,"output" :  ["new_field"]
    ,"config" : {
      "new_field" : "TO_UPPER(first_field)"
                }
    }
                           ]
}
  • Push configs via $METRON_HOME/bin/zk_load_configs.sh --mode PUSH -z $ZOOKEEPER -i $METRON_HOME/config/zookeeper/
  • Stop the parsers in ambari
  • Start the 1642_pre and 1642_second parsers
$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s 1642_pre
$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s 1642_second
  • Open ~/data.json and save:
notmetron,f1,1
metron,f2,2
metron,f3,3
metron,f4,4
metron,f5,5
metron,f6,6
metron,f7,7
notmetroneither,f8,8
  • Send data into the 1642_pre topic via
cat data.json | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic 1642_pre

From here you should see data flowing into the 1642_pre and 1642_second indices depending on the first field. When the field is metron, then it should go into the 1642_second index and a new field new_field should be created. Otherwise, it will go in as-is in the 1642_pre index.

Pull Request Checklist

Thank you for submitting a contribution to Apache Metron.
Please refer to our Development Guidelines for the complete guide to follow for contributions.
Please refer also to our Build Verification Guidelines for complete smoke testing guides.

In order to streamline the review of the contribution we ask you follow these guidelines and ask you to double check the following:

For all changes:

  • Is there a JIRA ticket associated with this PR? If not one needs to be created at Metron Jira.
  • Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
  • Has your PR been rebased against the latest commit within the target branch (typically master)?

For code changes:

  • Have you included steps to reproduce the behavior or problem that is being changed or addressed?

  • Have you included steps or a guide to how the change may be verified and tested manually?

  • Have you ensured that the full suite of tests and checks have been executed in the root metron folder via:

    mvn -q clean integration-test install && dev-utilities/build-utils/verify_licenses.sh 
    
  • Have you written or updated unit tests and or integration tests to verify your changes?

  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?

  • Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered by building and verifying the site-book? If not then run the following commands and the verify changes via site-book/target/site/index.html:

    cd site-book
    mvn site
    

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
It is also recommended that travis-ci is set up for your personal repository such that your branches are built there before submitting a pull request.

@cestella
Copy link
Copy Markdown
Member Author

Ok, the manual test script is ready for this and can be reviewed.

Copy link
Copy Markdown
Contributor

@nickwallen nickwallen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the flexibility this provides for the user. Good functionality.

// we want to manage the batching
results.add(new AbstractMap.SimpleEntry<>(tuple, future));
Optional<String> topic = getKafkaTopic(message);
if(topic.isPresent()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what cases would a topic not be present? If that's an unexpected condition, we should probably log something. Or can a user choose to not route a message by returning an empty topic value?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the topic is not present, then the message is dropped because we don't know where to send it. It's not unexpected necessarily.

* `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met. Optional.
If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm
parameter `topology.message.timeout.secs`. Ignored if batchSize is `1`, since this disables batching.
* The kafka writer can be configured within the parser config as well. (This is all configured a priori, but this is convenient for overriding the settings) :
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ability to route messages based on a message field applies for all topologies that use a KafkaWriter, not just the Parsers, right? That would include Enrichment and Profiler?

This is documented under the Parsers. Would it be worth mentioning that the same settings would impact any topology that uses a KafkaWriter (although it may not be advised.)

It might make sense to put these docs in metron-writer (with the KafkaWriter class) and then link to those docs from the Parser docs here. Then in matron-writer it would make sense to mention that this functionality could be used by any topology that uses a KafkaWriter.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good point, I can move this documentation into metron-writer and link to it.

return producerConfig;
}

public Optional<String> getKafkaTopic(JSONObject message) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming this logic applies to any topology that uses a KafkaWriter. Would it be easy enough for a user to run into an infinite loop scenario if they have any two sequential topologies both using a KafkaWriter?

Parser -> Stage1 -> Stage2 -> ...

  • In the Parser, it ingests a message where the "outputTopic" = "stage1".
  • This sends the message to my Stage1 processing
  • If the Stage1 logic does not change the value of that field for whatever reason, then the message will go right back to Stage1 and be reprocessed.
  • Wash, rinse, repeat and you've got a mess on your hands that is difficult to debug.

Maybe I am thinking too hard about this. There may be nothing we can really do about that. With power comes responsibility. If an advanced user wants to customize routing, then they need to own this risk.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They absolutely could create loops in kafka, which wouldn't be ideal. I don't know how I could prevent it, sadly. I think this falls in the "with great power comes great responsibility". Should we call this a spiderman bug (spiderbug?).

Copy link
Copy Markdown
Contributor

@nickwallen nickwallen Jul 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we unset the value of the topic field after the redirect occurs? This would force the user to set the field again, if they really want another redirect.

It seems like on day 1 of a user attempting to use this functionality, they are going to fall into this trap. To really use this, you need to set the value in one step and then unset or change it in the next step. If you don't, its going to loop.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear to me that we really want to do that. It could be that users are going to have logic that depends on that field downstream. It seems wrong to me to remove a field that they're adding or have in the message.

I mean, in order to create a loop, one of two things would have to happen:

  • the user specifies the input topic as the output topic (aka a simple loop), which removing the kafka topic field wouldn't help because it is likely to be computed (e.g. in the manual test script)
  • the user creates a non-simple loop where by sensor A -> B -> ... C -> A, but it's exceedingly unlikely that they're all going to be parsers of the same type, so messages from C will unlikely fail to parse in A. In the case that they do, it's likely that the kafka topic will be computed in first parser, so it'll be recomputed and removing the field after the first parse won't have helped.

TL;DR
Ultimately, I think removing the field won't appreciably help the situation and puts us into the state of removing data, which makes me uncomfortable.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see your point. I think we're ok, though, because we're not defaulting kafka.topicField. If it's not specified AND the kafka.topic isn't specified, then we don't send the message anywhere. I did this specifically so people didn't accidentally forget to unset a field and end up in a loop. You have to go out of your way (and set the kafka.topicField) to make the mistake.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that makes sense to me. A user would have to take additional steps to really shoot their self in the foot here. If 'topic field' was a global setting this might happen, but its not.

I agree with you. I don't think there is anything we need to change here.

}
else {
return Optional.ofNullable(kafkaTopic);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make sense to add some debug statements showing which topic was chosen and what the field value is. This would make debugging a routing issue much simpler for the user.

@cestella
Copy link
Copy Markdown
Member Author

cestella commented Jul 2, 2018

Ok, I think I addressed the issues here and I also added an integration test that will exercise this particular scenario. Let me know what you think, @nickwallen et al

@nickwallen
Copy link
Copy Markdown
Contributor

+1 Nice bit of functionality. Thanks!

@asfgit asfgit closed this in 097ce95 Jul 3, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants