New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for Kafka record headers #1025
Add support for Kafka record headers #1025
Conversation
13c4584
to
b01041d
Compare
Not sure why the build is failing. It passes on my fork which has same |
14d1b35
to
8ad4e03
Compare
8ad4e03
to
57be599
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 4 files at r1.
Reviewable status: 1 of 4 files reviewed, 2 unresolved discussions (waiting on @bhavin192 and @life1347)
mqtrigger/messageQueue/kafka.go, line 32 at r1 (raw file):
"github.com/fission/fission" "github.com/fission/fission/crd" "os"
please sort the import
mqtrigger/messageQueue/kafka.go, line 52 at r1 (raw file):
kafkaVersion, err := sarama.ParseKafkaVersion(mqKafkaVersion) if err != nil { log.Warningf("error parsing version string %q: %v. Falling back to %q", mqKafkaVersion, err, kafkaVersion)
nitpick: Error parsing
(uppercase)
LGTM, just two minor comments. BTW, the PR description for changes is awesome. |
57be599
to
fe1d974
Compare
Hey @life1347 I have made changes according to your suggestions.
Hehe, thanks :) |
fe1d974
to
f3ee39c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 4 files reviewed, 2 unresolved discussions (waiting on @life1347)
mqtrigger/messageQueue/kafka.go, line 32 at r1 (raw file):
Previously, life1347 (Ta-Ching Chen) wrote…
please sort the import
Done.
mqtrigger/messageQueue/kafka.go, line 52 at r1 (raw file):
Previously, life1347 (Ta-Ching Chen) wrote…
nitpick: Error parsing
(uppercase)
Ah! right
LGTM. please rebase master and we can merge it when ci passed |
f3ee39c
to
31db92f
Compare
- This makes sure that the headers received from a function are converted to Kafka record headers - Also extracts the headers from a Kafka record and sends them to function - ref: https://issues.apache.org/jira/browse/KAFKA-4208 Signed-off-by: Bhavin Gandhi <bhavin7392@gmail.com>
- Adds version field to Kafka struct - Adds MESSAGE_QUEUE_KAFKA_VERSION environment variable - Drop the headers if Kafka version is < 0.11.0.0 Signed-off-by: Bhavin Gandhi <bhavin7392@gmail.com>
- Adds new field `kafka.version` - Sets the MESSAGE_QUEUE_KAFKA_VERSION environments variable to `kafka.version` Signed-off-by: Bhavin Gandhi <bhavin7392@gmail.com>
31db92f
to
18978bd
Compare
Here is the output of integration test of kafka Creating node-kafka environment
environment 'node-kafka' created
Creating go-kafka environment
environment 'go-kafka' created
Creating package for Kafka producer
...
Output of glide install
...
pkgName=kafka-zip-3lwb
~/work/go/src/github.com/fission/fission/test/tests/mqtrigger/kafka
Waiting for builder manager to finish the build
Waiting for build to finish
Waiting for build to finish
Waiting for build to finish
Waiting for build to finish
succeeded
Package kafka-zip-3lwb created
Creating function consumer-func
function 'consumer-func' created
Creating function consumer-func2
function 'consumer-func2' created
Creating function producer-func
function 'producer-func' created
Creating trigger kafkatest
trigger 'kafkatest' created
Creating trigger kafkatest2
trigger 'kafkatest2' created
Successfully sent to testtopicTesting pool manager function
Checking for valid response
level=info msg="Calling message handler with value {"name": "testvalue"}" time="2019-01-11T10:07:18Z" level=info msg="Making HTTP request to http://router.fission/fission-function/consumer-func"
Testing the headers values in consumer-func
Checking for valid function log
> fission-nodejs-runtime@0.1.0 start /usr/src/app > node server.js user code loaded in 0sec 0.354236ms ::1 - - [11/Jan/2019:10:03:23 +0000] "POST /specialize HTTP/1.1" 202 - "-" "Go-http-client/1.1" { name: 'testvalue' } z-custom-name: Kafka-Header-test x-fission-function-name: consumer-func ::ffff:10.36.0.6 - - [11/Jan/2019:10:03:23 +0000] "POST / HTTP/1.1" 200 20 "-" "Go-http-client/1.1" > fission-nodejs-runtime@0.1.0 start /usr/src/app > node server.js user code loaded in 0sec 3.618165ms ::1 - - [11/Jan/2019:10:07:18 +0000] "POST /specialize HTTP/1.1" 202 - "-" "Go-http-client/1.1" { name: 'testvalue' } z-custom-name: Kafka-Header-test x-fission-function-name: consumer-func ::ffff:10.36.0.6 - - [11/Jan/2019:10:07:18 +0000] "POST / HTTP/1.1" 200 20 "-" "Go-http-client/1.1"
Testing the header value in consumer-func2
Checking for valid function log
> fission-nodejs-runtime@0.1.0 start /usr/src/app > node server.js user code loaded in 0sec 0.222792ms ::1 - - [11/Jan/2019:10:03:23 +0000] "POST /specialize HTTP/1.1" 202 - "-" "Go-http-client/1.1" { name: 'testvalue' } z-custom-name: Kafka-Header-test x-fission-function-name: consumer-func2 ::ffff:10.36.0.6 - - [11/Jan/2019:10:03:23 +0000] "POST / HTTP/1.1" 200 20 "-" "Go-http-client/1.1" > fission-nodejs-runtime@0.1.0 start /usr/src/app > node server.js user code loaded in 0sec 0.407005ms ::1 - - [11/Jan/2019:10:07:19 +0000] "POST /specialize HTTP/1.1" 202 - "-" "Go-http-client/1.1" { name: 'testvalue' } z-custom-name: Kafka-Header-test x-fission-function-name: consumer-func2 ::ffff:10.36.0.6 - - [11/Jan/2019:10:07:19 +0000] "POST / HTTP/1.1" 200 20 "-" "Go-http-client/1.1"
Checking for valid function log
> fission-nodejs-runtime@0.1.0 start /usr/src/app > node server.js user code loaded in 0sec 0.222792ms ::1 - - [11/Jan/2019:10:03:23 +0000] "POST /specialize HTTP/1.1" 202 - "-" "Go-http-client/1.1" { name: 'testvalue' } z-custom-name: Kafka-Header-test x-fission-function-name: consumer-func2 ::ffff:10.36.0.6 - - [11/Jan/2019:10:03:23 +0000] "POST / HTTP/1.1" 200 20 "-" "Go-http-client/1.1" > fission-nodejs-runtime@0.1.0 start /usr/src/app > node server.js user code loaded in 0sec 0.407005ms ::1 - - [11/Jan/2019:10:07:19 +0000] "POST /specialize HTTP/1.1" 202 - "-" "Go-http-client/1.1" { name: 'testvalue' } z-custom-name: Kafka-Header-test x-fission-function-name: consumer-func2 ::ffff:10.36.0.6 - - [11/Jan/2019:10:07:19 +0000] "POST / HTTP/1.1" 200 20 "-" "Go-http-client/1.1"
Cleaning up...
environment 'go-kafka' deleted
environment 'node-kafka' deleted
function 'producer-func' deleted
function 'consumer-func' deleted
function 'consumer-func2' deleted
trigger 'kafkatest' deleted
trigger 'kafkatest2' deleted |
- hellokafka.js: print header values and send the received headers as response - kafka-pub.go: Add header Z-Custom-Name while producing the message - test_kafka.sh: - Create another trigger which calls consumer-func2; consumer-func2 will print the headers it receives - Producer -(testtopic)-> MQT --> consumer-func --> MQT -(resptopic)-> MQT --> consumer-func2 - Use timeout instead of gtimout (available only on Mac OSX) - do glide install before creating package zip file - New function test_fnmessage which checks for a message in a function's log Signed-off-by: Bhavin Gandhi <bhavin7392@gmail.com>
18978bd
to
f3f0413
Compare
yes, looks great. We can merge it 🎉 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 4 files at r1, 2 of 2 files at r2, 3 of 3 files at r3.
Reviewable status: complete! all files reviewed, all discussions resolved
Build in #1052 is green for the same change. This change has been tested in a local setup with Kafka cluster and working as expected with test. Merging. Thanks a lot, @bhavin192 for your contribution! |
Add support for Kafka record headers
converted to Kafka record headers
function
Make Kafka version configurable
MESSAGE_QUEUE_KAFKA_VERSION
environment variable< 0.11.0.0
Update helm chart fission-all for Kafka record headers
kafka.version
MESSAGE_QUEUE_KAFKA_VERSION
environments variable tokafka.version
response
will print the headers it receives
This change is