Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-1573 Enhance KAFKA_* functions to return partition and offset details #1030

Closed
wants to merge 15 commits into from

Conversation

nickwallen
Copy link
Contributor

@nickwallen nickwallen commented May 24, 2018

This PR is dependent on #1025 and should not be merged until that PR is addressed.

Changes

  • Implemented multiple 'views' of the messages fetched from Kafka. The "rich" view contains not only the message value, but Kafka metadata like the topic, partition, offset, key, and timestamp.

  • Updated KAFKA_GET, KAFKA_TAIL, and KAFKA_FIND so that each can optionally return a "rich" view of the messages retrieved from Kafka.

  • Updated KAFKA_PUT so that it can optionally returns a "rich" view describing the topic, partition, offset, and timestamp of the message that was just put/sent to Kafka.

  • The change is backwards compatible. By default, only the value (the "simple" view) is returned, just like before.

Demo

By default, only the message value is returned.

[Stellar]>>> KAFKA_GET("bro")
[{"http": {"ts":1527122184.853329,"uid":"CUrRne3iLIxXavQtci","id.orig_h":"192.168.66.1","id.orig_p":50451,"id.resp_h":"192.168.66.121","id.resp_p":8080,"trans_depth":177,"method":"GET","host":"node1","uri":"/api/v1/clusters?fields=Clusters/provisioning_state\u0026_=1484169117876","referrer":"http://node1:8080/","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36","request_body_len":0,"response_body_len":0,"tags":[]}}]

Turn on the "rich" view.

[Stellar]>>> %define stellar.kafka.message.view := "rich"
rich

Fetch the next message.

[Stellar]>>> KAFKA_GET("bro")
[{partition=0, offset=2101, topic=bro, value={"http": {"ts":1527122184.674819,"uid":"CUrRne3iLIxXavQtci","id.orig_h":"192.168.66.1","id.orig_p":50451,"id.resp_h":"192.168.66.121","id.resp_p":8080,"trans_depth":24,"method":"GET","host":"node1","uri":"/api/v1/clusters/metron_cluster?fields=Clusters/desired_configs/cluster-env\u0026_=1484168423682","referrer":"http://node1:8080/","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36","request_body_len":0,"response_body_len":0,"tags":[]}}, key=null, timestamp=1527122185741}]

Fetch from the tail end of the topic. Notice the offset change.

[Stellar]>>> KAFKA_TAIL("bro")
[{partition=0, offset=2600, topic=bro, value={"http": {"ts":1527122376.060672,"uid":"CUrRne3iLIxXavQtci","id.orig_h":"192.168.66.1","id.orig_p":50451,"id.resp_h":"192.168.66.121","id.resp_p":8080,"trans_depth":236,"method":"GET","host":"node1","uri":"/api/v1/clusters/metron_cluster/requests?to=end\u0026page_size=10\u0026fields=Requests\u0026_=1484169369084","referrer":"http://node1:8080/","user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36","request_body_len":0,"response_body_len":0,"tags":[]}}, key=null, timestamp=1527122377498}]

Put a message onto a topic.

[Stellar]>>> KAFKA_PUT("mytopic", "{ message! }")
[{topic=mytopic, partition=0, offset=1, timestamp=1527122377498}]

Testing

  1. Launch the dev environment.

  2. Run the Stellar REPL.

  3. Try both the "simple" and "rich" views for each of the functions; KAFKA_GET, KAFKA_TAIL and KAFKA_FIND.

Pull Request Checklist

  • Is there a JIRA ticket associated with this PR? If not one needs to be created at Metron Jira.
  • Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
  • Has your PR been rebased against the latest commit within the target branch (typically master)?
  • Have you included steps to reproduce the behavior or problem that is being changed or addressed?
  • Have you included steps or a guide to how the change may be verified and tested manually?
  • Have you ensured that the full suite of tests and checks have been executed in the root metron folder via:
  • Have you written or updated unit tests and or integration tests to verify your changes?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?

@james-sirota
Copy link

When I try:

[Stellar]>>> KAFKA_PUT("mytopic", "{ message! }")
[{partition=0, offset=0, topic=mytopic, timestamp=1528428506817}]
[Stellar]>>> KAFKA_PUT("mytopic", "{ message! }")
[{partition=0, offset=1, topic=mytopic, timestamp=1528428552606}]
[Stellar]>>> KAFKA_TAIL("mytopic")
[]
[Stellar]>>> KAFKA_GET("mytopic")
[]

So nothing is pulled back. But the topic and the data does exist:

/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --topic mytopic --zookeeper localhost:2181 --from-beginning

{ message! }
{ message! }

@nickwallen
Copy link
Contributor Author

nickwallen commented Jun 8, 2018

[Stellar]>>> KAFKA_TAIL("mytopic")
[]

KAFKA_TAIL always seeks to the end of the topic. So the consumer offset was set to the end of the topic (offset=2) after you call to KAFKA_TAIL. No more messages arrived on this topic, so it timed out and returned no data.

If you had run this on a topic actively receiving data, you would have seen the new messages arriving.

[Stellar]>>> KAFKA_GET("mytopic")
[]

KAFKA_GET will always use the existing consumer offset. The offset is at 2 based on the previous call to KAFKA_TAIL. Since there are no messages at offset 2, It didn't return anything.

Use KAFKA_TAIL on one of the sensor topics in the dev environment. You will be able to see the latest messages arriving.

KAFKA_TAIL("bro")

You can also manually change your group.id so that your KAFKA_GET call doesn't pick-up the existing consumer offsets, if that is what you want.

KAFKA_GET("mytopic",{ 'group.id':'new.group' })

@nickwallen nickwallen force-pushed the METRON-1573 branch 3 times, most recently from 5836fbd to 530b707 Compare June 8, 2018 15:37
@james-sirota
Copy link

+1 great work

@asfgit asfgit closed this in 0bb3580 Jun 18, 2018
@nickwallen nickwallen deleted the METRON-1573 branch September 17, 2018 19:35
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
2 participants