-
Notifications
You must be signed in to change notification settings - Fork 723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Adding support to propagate unique ID to message for Kafka EventSource messages #1453
Conversation
Signed-off-by: Dillen Padhiar <dpadhiar@ucsd.edu>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGMT
@whynowy can you review it?
eventsources/common/common.go
Outdated
type Options func(*event.Event) error | ||
|
||
// Option to set different ID for event | ||
func SetCustomID(id string) Options { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A better function name would be WithID(id string)
.
eventsources/sources/kafka/start.go
Outdated
|
||
// Function can be passed as Option to generate unique id for kafka event | ||
// eventSourceName:eventName:kafka-url:topic:partition:offset | ||
func genUniqueID(eventSourceName string, eventName string, kafkaURL string, topic string, partition int32, offset int64) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor - func genUniqueID(eventSourceName, eventName, kafkaURL, topic string, partition int32, offset int64)
.
eventsources/sources/kafka/start.go
Outdated
// eventSourceName:eventName:kafka-url:topic:partition:offset | ||
func genUniqueID(eventSourceName string, eventName string, kafkaURL string, topic string, partition int32, offset int64) string { | ||
|
||
kafkaID := fmt.Sprintf("%s:%s:%s:%s:%d:%d", eventSourceName, eventName, kafkaURL, topic, partition, offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafkaURL
could be very long, which contains several URLs separated by comma. I would suggest to split the URL to only take the first one.
eventsources/sources/kafka/start.go
Outdated
@@ -375,9 +380,20 @@ func (consumer *Consumer) processOne(session sarama.ConsumerGroupSession, messag | |||
return errors.Wrap(err, "failed to marshal the event data, rejecting the event...") | |||
} | |||
|
|||
if err = consumer.dispatch(eventBody); err != nil { | |||
kafkaID := genUniqueID(consumer.eventSourceName, consumer.eventName, consumer.kafkaEventSource.URL, message.Topic, message.Partition, message.Offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name it messageID
or kafkaMessageID
, kafkaID
is very tricky.
Signed-off-by: Dillen Padhiar <dpadhiar@ucsd.edu>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @dpadhiar
Checklist:
closes: #1427
Logs from kafka-eventsource pod: