-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(out_rdkafka2): adds use_default_for_unknown_topic
configuration
#490
Conversation
This lifts the same parameter from the `out_kafka2` plugin and behaves similarly - it first attempts to write to a given topic, and if it fails because the topic does not exist, it will attempt to write to the `default_topic` Signed-off-by: Ray Tung <code@raytung.net>
e5bf831
to
6222730
Compare
# RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC | ||
when :unknown_topic | ||
if @use_default_for_unknown_topic && actual_topic != @default_topic | ||
log.debug "'#{actual_topic}' topic not found. Retry with '#{@default_topic}' topic" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this is one major difference to the out_kafka2
plugin's implementation - we are debugging logging instead of warn logging. This is because it is an intended behavior if we do get into this code path (since use_default_for_unknown_topic
is set!). We should consider publishing librdkafka's own internal metrics though, but this should be discussed in a separate issue if we're not already doing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because it is an intended behavior if we do get into this code path (since use_default_for_unknown_topic is set!).
I agree with you, the log level should be debug.
if attempt <= @max_enqueue_retries | ||
log.warn "Failed to enqueue message; attempting retry #{attempt} of #{@max_enqueue_retries} after #{@enqueue_retry_backoff}s" | ||
sleep @enqueue_retry_backoff | ||
attempt += 1 | ||
else | ||
raise "Failed to enqueue message although tried retry #{@max_enqueue_retries} times" | ||
end | ||
# https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/src/rdkafka.h#LL309C9-L309C41 | ||
# RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC | ||
when :unknown_topic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: another major difference to the out_kafka2
's implementation - the out_kafka2
plugin also writes to the default_topic
if we receive an UNKNOWN_PARTITION
error, which I have left out in this implementation since I'd like to consider a bit more on whether we should publish to default_topic
or whether we should have a separate configuration to handle that error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note:
fluent-plugin-kafka/lib/fluent/plugin/out_kafka2.rb
Lines 403 to 408 in c21236f
rescue Kafka::UnknownTopicOrPartition | |
if @use_default_for_unknown_topic && topic != @default_topic | |
log.warn "'#{topic}' topic not found. Retry with '#{default_topic}' topic" | |
topic = @default_topic | |
retry | |
end |
if attempt <= @max_enqueue_retries | ||
log.warn "Failed to enqueue message; attempting retry #{attempt} of #{@max_enqueue_retries} after #{@enqueue_retry_backoff}s" | ||
sleep @enqueue_retry_backoff | ||
attempt += 1 | ||
else | ||
raise "Failed to enqueue message although tried retry #{@max_enqueue_retries} times" | ||
end | ||
# https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/src/rdkafka.h#LL309C9-L309C41 | ||
# RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC | ||
when :unknown_topic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note:
fluent-plugin-kafka/lib/fluent/plugin/out_kafka2.rb
Lines 403 to 408 in c21236f
rescue Kafka::UnknownTopicOrPartition | |
if @use_default_for_unknown_topic && topic != @default_topic | |
log.warn "'#{topic}' topic not found. Retry with '#{default_topic}' topic" | |
topic = @default_topic | |
retry | |
end |
# RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC | ||
when :unknown_topic | ||
if @use_default_for_unknown_topic && actual_topic != @default_topic | ||
log.debug "'#{actual_topic}' topic not found. Retry with '#{@default_topic}' topic" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because it is an intended behavior if we do get into this code path (since use_default_for_unknown_topic is set!).
I agree with you, the log level should be debug.
Thanks! |
Addresses #489
This lifts the same parameter from the
out_kafka2
plugin and behaves similarly - it first attempts to write to a given topic, and if it fails because the topic does not exist, it will attempt to write to thedefault_topic