Skip to content

Commit

Permalink
Improved kafka plugin (#182)
Browse files Browse the repository at this point in the history
Signed-off-by: chengdehao <dehaocheng@yunify.com>
  • Loading branch information
wenchajun committed Dec 8, 2021
1 parent 91a9589 commit 09bfa63
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 6 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions api/fluentbitoperator/v1alpha2/plugins/output/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ type Kafka struct {
TopicKey string `json:"topicKey,omitempty"`
// {property} can be any librdkafka properties
Rdkafka map[string]string `json:"rdkafka,omitempty"`
//adds unknown topics (found in Topic_Key) to Topics. So in Topics only a default topic needs to be configured
DynamicTopic *bool `json:"dynamicTopic,omitempty"`
//Fluent Bit queues data into rdkafka library,
//if for some reason the underlying library cannot flush the records the queue might fills up blocking new addition of records.
//The queue_full_retries option set the number of local retries to enqueue the data.
//The default value is 10 times, the interval between each retry is 1 second.
//Setting the queue_full_retries value to 0 set's an unlimited number of retries.
QueueFullRetries *int64 `json:"queueFullRetries,omitempty"`
}

func (*Kafka) Name() string {
Expand Down Expand Up @@ -67,6 +75,12 @@ func (k *Kafka) Params(_ plugins.SecretLoader) (*params.KVs, error) {
if k.TopicKey != "" {
kvs.Insert("Topic_Key", k.TopicKey)
}
if k.DynamicTopic != nil {
kvs.Insert("Dynamic_topic", fmt.Sprint(*k.DynamicTopic))
}
if k.QueueFullRetries != nil {
kvs.Insert("queue_full_retries", fmt.Sprint(*k.QueueFullRetries))
}

kvs.InsertStringMap(k.Rdkafka, func(k, v string) (string, string) {
return fmt.Sprintf("rdkafka.%s", k), v
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion api/fluentbitoperator/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions config/crd/bases/logging.kubesphere.io_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,10 @@ spec:
description: 'Single of multiple list of Kafka Brokers, e.g: 192.168.1.3:9092,
192.168.1.4:9092.'
type: string
dynamicTopic:
description: adds unknown topics (found in Topic_Key) to Topics.
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
type: string
Expand All @@ -818,6 +822,16 @@ spec:
will indicate the message key. If not set nor found in the record,
Message_Key will be used (if set).
type: string
queueFullRetries:
description: Fluent Bit queues data into rdkafka library, if for
some reason the underlying library cannot flush the records
the queue might fills up blocking new addition of records. The
queue_full_retries option set the number of local retries to
enqueue the data. The default value is 10 times, the interval
between each retry is 1 second. Setting the queue_full_retries
value to 0 set's an unlimited number of retries.
format: int64
type: integer
rdkafka:
additionalProperties:
type: string
Expand Down
2 changes: 2 additions & 0 deletions docs/plugins/output/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@
| brokers | Single of multiple list of Kafka Brokers, e.g: 192.168.1.3:9092, 192.168.1.4:9092. | string |
| topics | Single entry or list of topics separated by comma (,) that Fluent Bit will use to send messages to Kafka. If only one topic is set, that one will be used for all records. Instead if multiple topics exists, the one set in the record by Topic_Key will be used. | string |
| topicKey | If multiple Topics exists, the value of Topic_Key in the record will indicate the topic to use. E.g: if Topic_Key is router and the record is {\"key1\": 123, \"router\": \"route_2\"}, Fluent Bit will use topic route_2. Note that if the value of Topic_Key is not present in Topics, then by default the first topic in the Topics list will indicate the topic to be used. | string |
| dynamic_topic | adds unknown topics (found in Topic_Key) to Topics. So in Topics only a default topic needs to be configured | bool |
| queue_full_retries | Fluent Bit queues data into rdkafka library, if for some reason the underlying library cannot flush the records the queue might fills up blocking new addition of records. The queue_full_retries option set the number of local retries to enqueue the data. The default value is 10 times, the interval between each retry is 1 second. Setting the queue_full_retries value to 0 set's an unlimited number of retries. | int |
| rdkafka | {property} can be any librdkafka properties | map[string]string |
14 changes: 14 additions & 0 deletions manifests/setup/fluentbit-operator-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4249,6 +4249,10 @@ spec:
description: 'Single of multiple list of Kafka Brokers, e.g: 192.168.1.3:9092,
192.168.1.4:9092.'
type: string
dynamicTopic:
description: adds unknown topics (found in Topic_Key) to Topics.
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
type: string
Expand All @@ -4260,6 +4264,16 @@ spec:
will indicate the message key. If not set nor found in the record,
Message_Key will be used (if set).
type: string
queueFullRetries:
description: Fluent Bit queues data into rdkafka library, if for
some reason the underlying library cannot flush the records
the queue might fills up blocking new addition of records. The
queue_full_retries option set the number of local retries to
enqueue the data. The default value is 10 times, the interval
between each retry is 1 second. Setting the queue_full_retries
value to 0 set's an unlimited number of retries.
format: int64
type: integer
rdkafka:
additionalProperties:
type: string
Expand Down
14 changes: 14 additions & 0 deletions manifests/setup/setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4254,6 +4254,10 @@ spec:
description: 'Single of multiple list of Kafka Brokers, e.g: 192.168.1.3:9092,
192.168.1.4:9092.'
type: string
dynamicTopic:
description: adds unknown topics (found in Topic_Key) to Topics.
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
type: string
Expand All @@ -4265,6 +4269,16 @@ spec:
will indicate the message key. If not set nor found in the record,
Message_Key will be used (if set).
type: string
queueFullRetries:
description: Fluent Bit queues data into rdkafka library, if for
some reason the underlying library cannot flush the records
the queue might fills up blocking new addition of records. The
queue_full_retries option set the number of local retries to
enqueue the data. The default value is 10 times, the interval
between each retry is 1 second. Setting the queue_full_retries
value to 0 set's an unlimited number of retries.
format: int64
type: integer
rdkafka:
additionalProperties:
type: string
Expand Down

0 comments on commit 09bfa63

Please sign in to comment.