Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add placeholder support for stream names #174

Merged

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Feb 22, 2019

Related to #165.

We can handle to send different stream nams with extract_placeholders API which is the brand new Fluentd v1 API.

BTW, how to write test code for this feature?

@cosmo0920 cosmo0920 force-pushed the add-placeholer-support-for-stream-names branch from 2b706af to 19cce69 Compare February 25, 2019 02:12
@cosmo0920
Copy link
Contributor Author

cosmo0920 commented Feb 25, 2019

We can send events into different kinesis_streams with the following configuration:

  <source>
    @type tail
    path "data.json"
    tag "in.json"
    read_from_head true
    <parse>
      @type "json"
    </parse>
  </source>
  <match in.json>
    @type kinesis_streams
    stream_name "${$.key.nested}"
    aws_key_id xxxxxx
    aws_sec_key xxxxxx
    <buffer $.key.nested>
      @type "memory"
    </buffer>
  </match>

Sending data:

{"key":{"nested":"cosmo0920-fluentd-test1","message":"hoge"}}
{"key":{"nested":"cosmo0920-fluentd-test2","message":"huga"}}
{"key":{"nested":"cosmo0920-fluentd-test1","message":"hoo"}}
{"key":{"nested":"cosmo0920-fluentd-test2","message":"bar"}}
{"key":{"nested":"cosmo0920-fluentd-test2","message":"baz"}}

result:

cosmo0920-fluent-test1 #=> 2 events
cosmo0920-fluent-test2 #=> 3 events

@cosmo0920 cosmo0920 force-pushed the add-placeholer-support-for-stream-names branch from 4e1437d to 09f1acc Compare February 25, 2019 03:56
@cosmo0920
Copy link
Contributor Author

I've confirmed that this PR works on real AWS environemnt (ap-northeast-1) and added test cases for built-in placeholders.

@riywo @simukappu Could you kindly take a look?

@mathetake
Copy link

any update on this feature? 🤔

@cosmo0920
Copy link
Contributor Author

@mathetake Could you try this PR's patch in your AWS environment?
I guess the plugin developers are waiting users feedback.

@roscoecairney
Copy link

Just built this and gave it a try. Got this...

2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/seahorse/client/plugins/raise_response_errors.rb:15:in call'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/aws-sdk-core/plugins/jsonvalue_converter.rb:20:in call' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/aws-sdk-core/plugins/idempotency_token.rb:17:in call'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/aws-sdk-core/plugins/param_converter.rb:24:in call' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/aws-sdk-core/plugins/response_paging.rb:10:in call'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/seahorse/client/plugins/response_target.rb:23:in call' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-core-3.67.0/lib/seahorse/client/request.rb:70:in send_request'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/aws-sdk-kinesis-1.19.0/lib/aws-sdk-kinesis/client.rb:1731:in put_records' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/out_kinesis_streams.rb:49:in block in write'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/kinesis_helper/api.rb:91:in batch_request_with_retry' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/kinesis.rb:142:in block (2 levels) in write_records_batch'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/kinesis_helper/api.rb:86:in split_to_batches' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/kinesis.rb:140:in block in write_records_batch'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.7.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in open' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.7.1/lib/fluent/plugin/buffer/memory_chunk.rb:81:in open'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/kinesis.rb:138:in write_records_batch' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluent-plugin-kinesis-3.0.0.rc.2.0/lib/fluent/plugin/out_kinesis_streams.rb:45:in write'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.7.1/lib/fluent/plugin/output.rb:1122:in try_flush' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.7.1/lib/fluent/plugin/output.rb:1428:in flush_thread_run'
2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.7.1/lib/fluent/plugin/output.rb:458:in block (2 levels) in start' 2019-10-01 13:55:42 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.7.1/lib/fluent/plugin_helper/thread.rb:78:in block in thread_create'
2019-10-01 13:55:42 +0000 [warn]: #0 [out_kinesis_streams] failed to flush the buffer. retry_time=12 next_retry_seconds=2019-10-01 14:26:44 +0000 chunk="593d9a91629510cc781f68360c8c5119" error_class=Aws::Kinesis::Errors::ValidationException error="1 validation error detected: Value '${$.kubernetes.annotations.kinesis_stream}' at 'streamName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+"`

@cosmo0920
Copy link
Contributor Author

@roscoecairney Could you share your configuration?
I guess that built-in placeholder is not working as expected.

@roscoecairney
Copy link

Hi, sure thing. I'm pulling in docker logs, using the kubernetes filter to enrich them, grepping out the ones without a given annotation and then using that annotation for the kinesis stream name.

      @type tail
      @id in_tail_container_logs
      path /var/log/containers/*.log
      pos_file /var/log/fluentd-containers.log.pos
      tag kubernetes.*
      read_from_head true
      <parse>
        @type "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TYPE'] || 'json'}"
        time_format %Y-%m-%dT%H:%M:%S.%NZ
      </parse>
    </source>

    <filter kubernetes.**>
      @type kubernetes_metadata
      @id filter_kube_metadata
      annotation_match [".*"]
    </filter>

    <filter kubernetes.**>
      @type grep
      <regexp>
        key $.kubernetes.annotations.kinesis_stream
        pattern /.+/
      </regexp>
    </filter>

    <match kubernetes.**>
      @type kinesis_streams
      @id out_kinesis_streams
      region "#{ENV['FLUENT_KINESIS_STREAMS_REGION'] || nil}"
      aws_key_id "#{ENV['FLUENT_KINESIS_AWS_KEY_ID'] || nil}"
      aws_sec_key "#{ENV['FLUENT_KINESIS_AWS_SEC_KEY'] || nil}"
      stream_name "${$.kubernetes.annotations.kinesis_stream}"
      <buffer>
        flush_interval 1
        chunk_limit_size "#{ENV['FLUENT_KINESIS_STREAMS_CHUNK_LIMIT_SIZE'] || '1m'}"
        flush_thread_interval 0.1
        flush_thread_burst_interval 0.01
        flush_thread_count 15
      </buffer>
    </match>```

@cosmo0920
Copy link
Contributor Author

Aha! Got it. Could you try the following buffer configuration?

     <buffer $.kubernetes.annotations.kinesis_stream>
        flush_interval 1
        chunk_limit_size "#{ENV['FLUENT_KINESIS_STREAMS_CHUNK_LIMIT_SIZE'] || '1m'}"
        flush_thread_interval 0.1
        flush_thread_burst_interval 0.01
        flush_thread_count 15
      </buffer>

Fluentd built-in placeholder requests to include target key information with buffer attributes.

@roscoecairney
Copy link

Thanks @cosmo0920 - that's working perfectly. Hope this gets merged in soon.

@simukappu
Copy link
Contributor

@roscoecairney Thank you for your testing!
@cosmo0920 Could you write these configuration examples into README to avoid these configuration issue?

@cosmo0920
Copy link
Contributor Author

Thanks for notifying it.
I've added notes for built-in placeholders usage.

@cosmo0920 cosmo0920 force-pushed the add-placeholer-support-for-stream-names branch from f5f53a1 to 5a52502 Compare October 2, 2019 07:17
@simukappu simukappu self-assigned this Oct 2, 2019
@simukappu simukappu self-requested a review October 2, 2019 10:27
Copy link
Contributor

@simukappu simukappu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added very small comments. Can you check it?

@@ -30,7 +30,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]
spec.required_ruby_version = '>= 2.1'

spec.add_dependency "fluentd", ">= 0.14.10", "< 2"
spec.add_dependency "fluentd", ">= 0.14.22", "< 2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to update dependencies in README Fluentd 0.14.10+ too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 8d2ac48.

@simukappu
Copy link
Contributor

@cosmo0920 Could you confirm to the formal statement below?

"By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license."

We appreciate your understanding and strong contribution!

@cosmo0920
Copy link
Contributor Author

@cosmo0920 Could you confirm to the formal statement below?

"By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license."

Yep. I've written these patches under the terms of the Apache license 2.0.
These patches are my original work. There is no referring GPL code.
So it should be licensed as Apache license as well.
I don't insist my patches as GPL 3.0 license.

@simukappu
Copy link
Contributor

Great, thank you!
Finally, could you squash your commits into one or a few?

@cosmo0920
Copy link
Contributor Author

Sure.
But this squashed merge documentation may help you:
https://help.github.com/en/articles/configuring-commit-squashing-for-pull-requests

@cosmo0920 cosmo0920 force-pushed the add-placeholer-support-for-stream-names branch from 8d2ac48 to af9acb7 Compare October 3, 2019 01:02
@cosmo0920
Copy link
Contributor Author

I've squashed this PR into one commit.

@simukappu
Copy link
Contributor

Thank you! Proceed to our checking. Please wait for a moment.

@simukappu simukappu merged commit 03d8232 into awslabs:master Oct 10, 2019
@cosmo0920 cosmo0920 deleted the add-placeholer-support-for-stream-names branch October 10, 2019 03:23
@simukappu simukappu added this to the v3.2.0 milestone Oct 10, 2019
simukappu added a commit to simukappu/aws-fluent-plugin-kinesis that referenced this pull request Oct 12, 2019
@simukappu simukappu mentioned this pull request Oct 12, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants