New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ruby-kafka #54

Merged
merged 7 commits into from Jun 18, 2016

Conversation

Projects
None yet
5 participants
@repeatedly
Member

repeatedly commented Jun 8, 2016

Use ruby-kafka for kafka v0.9 or v0.10 support.

  • Make plugin thread-safe with num_threads
  • Add new features like TLS
  • Check performance

In this PR, I plan to update only kafka_buffered.
We need stable consumer API for replacing input plugins.

@repeatedly repeatedly referenced this pull request Jun 8, 2016

Closed

Kafka .9 plugin #44

repeatedly added some commits Jun 9, 2016

Create new producer for each flush thread
ruby-kafka's producer is not thread-safe so
we can't share one producer between flush threads
@repeatedly

This comment has been minimized.

Show comment
Hide comment
@repeatedly

repeatedly Jun 9, 2016

Member

Basic replacement is done.
Does anyone test this branch on actual environment, e.g. kafka 0.8 / kafka 0.9 / kafka 0.10?

Member

repeatedly commented Jun 9, 2016

Basic replacement is done.
Does anyone test this branch on actual environment, e.g. kafka 0.8 / kafka 0.9 / kafka 0.10?

@fire

This comment has been minimized.

Show comment
Hide comment
@fire

fire Jun 9, 2016

I can test tls if that's in. Will try it on http://www.cloudkafka.com/ since it has a free developer plan.

fire commented Jun 9, 2016

I can test tls if that's in. Will try it on http://www.cloudkafka.com/ since it has a free developer plan.

@fire

This comment has been minimized.

Show comment
Hide comment
@fire

fire Jun 10, 2016

How can I test this? /me looks up the process of install gems from git.

fire commented Jun 10, 2016

How can I test this? /me looks up the process of install gems from git.

@repeatedly

This comment has been minimized.

Show comment
Hide comment
@repeatedly

repeatedly Jun 10, 2016

Member

There are several approaches.

  • If you have a Gemfile, specify my branch
  • copy my patched code into /etc/fluent/plugin (in td-agent, /etd/td-agent/plugin)
  • or download patched code into any directory and specify -p
Member

repeatedly commented Jun 10, 2016

There are several approaches.

  • If you have a Gemfile, specify my branch
  • copy my patched code into /etc/fluent/plugin (in td-agent, /etd/td-agent/plugin)
  • or download patched code into any directory and specify -p
@drughi

This comment has been minimized.

Show comment
Hide comment
@drughi

drughi Jun 10, 2016

i have a running mapr cluster that implements kafka 0.9 ( mapr event streams ) . .i can test this without problem.. since i use docker images to test this i install it using "td-agent-gem install fluent-plugin-kafka" install on my docker file .. any way to test this with gem install?

drughi commented Jun 10, 2016

i have a running mapr cluster that implements kafka 0.9 ( mapr event streams ) . .i can test this without problem.. since i use docker images to test this i install it using "td-agent-gem install fluent-plugin-kafka" install on my docker file .. any way to test this with gem install?

@drughi

This comment has been minimized.

Show comment
Hide comment
@drughi

drughi Jun 10, 2016

ok i patched file manually and i get this error when starting fluentd

Restarting td-agent:
agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:54:in require': cannot load such file -- kafka (LoadError) from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:54:inrequire'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.1.5/lib/fluent/plugin/out_kafka_buffered.rb:9:in initialize' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/plugin.rb:128:innew'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/plugin.rb:128:in new_impl' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/plugin.rb:57:innew_output'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/agent.rb:127:in add_match' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/agent.rb:60:inblock in configure'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/agent.rb:54:in each' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/agent.rb:54:inconfigure'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/root_agent.rb:82:in configure' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/engine.rb:117:inconfigure'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/engine.rb:91:in run_configure' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:515:inrun_configure'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:186:in dry_run' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:133:instart'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/command/fluentd.rb:171:in <top (required)>' from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:54:inrequire'
from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:54:in require' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/bin/fluentd:6:in<top (required)>'
from /opt/td-agent/embedded/bin/fluentd:23:in load' from /opt/td-agent/embedded/bin/fluentd:23:in<top (required)>'
from /usr/sbin/td-agent:7:in load' from /usr/sbin/td-agent:7:in

'

i am not developer btw .. but seems a problem with the requiere 'kafka'

drughi commented Jun 10, 2016

ok i patched file manually and i get this error when starting fluentd

Restarting td-agent:
agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:54:in require': cannot load such file -- kafka (LoadError) from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:54:inrequire'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.1.5/lib/fluent/plugin/out_kafka_buffered.rb:9:in initialize' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/plugin.rb:128:innew'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/plugin.rb:128:in new_impl' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/plugin.rb:57:innew_output'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/agent.rb:127:in add_match' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/agent.rb:60:inblock in configure'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/agent.rb:54:in each' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/agent.rb:54:inconfigure'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/root_agent.rb:82:in configure' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/engine.rb:117:inconfigure'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/engine.rb:91:in run_configure' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:515:inrun_configure'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:186:in dry_run' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:133:instart'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/command/fluentd.rb:171:in <top (required)>' from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:54:inrequire'
from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:54:in require' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/bin/fluentd:6:in<top (required)>'
from /opt/td-agent/embedded/bin/fluentd:23:in load' from /opt/td-agent/embedded/bin/fluentd:23:in<top (required)>'
from /usr/sbin/td-agent:7:in load' from /usr/sbin/td-agent:7:in

'

i am not developer btw .. but seems a problem with the requiere 'kafka'

@drughi

This comment has been minimized.

Show comment
Hide comment
@drughi

drughi Jun 10, 2016

ok i realized now there is a dependency on ruby kafka

i installed kafka-rb packet with gem and now i get other error..

/usr/sbin/td-agent-gem install kafka-rb
Fetching: kafka-rb-0.0.15.gem (100%)
Successfully installed kafka-rb-0.0.15
Parsing documentation for kafka-rb-0.0.15

2016-06-10 11:35:02 +0000 [error]: undefined method new' for Kafka:Module 2016-06-10 11:35:02 +0000 [error]: unexpected error error_class=NoMethodError error=#<NoMethodError: undefined methodnew' for Kafka:Module>
2016-06-10 11:35:02 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.1.5/lib/fluent/plugin/out_kafka_buffered.rb:84:in refresh_cli ent' 2016-06-10 11:35:02 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.1.5/lib/fluent/plugin/out_kafka_buffered.rb:136:instart'

drughi commented Jun 10, 2016

ok i realized now there is a dependency on ruby kafka

i installed kafka-rb packet with gem and now i get other error..

/usr/sbin/td-agent-gem install kafka-rb
Fetching: kafka-rb-0.0.15.gem (100%)
Successfully installed kafka-rb-0.0.15
Parsing documentation for kafka-rb-0.0.15

2016-06-10 11:35:02 +0000 [error]: undefined method new' for Kafka:Module 2016-06-10 11:35:02 +0000 [error]: unexpected error error_class=NoMethodError error=#<NoMethodError: undefined methodnew' for Kafka:Module>
2016-06-10 11:35:02 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.1.5/lib/fluent/plugin/out_kafka_buffered.rb:84:in refresh_cli ent' 2016-06-10 11:35:02 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kafka-0.1.5/lib/fluent/plugin/out_kafka_buffered.rb:136:instart'

@repeatedly

This comment has been minimized.

Show comment
Hide comment
@repeatedly

repeatedly Jun 10, 2016

Member

@drughi kafka-rb is different library. Please use ruby-kafka which is mentioned in the title ;)

Member

repeatedly commented Jun 10, 2016

@drughi kafka-rb is different library. Please use ruby-kafka which is mentioned in the title ;)

@drughi

This comment has been minimized.

Show comment
Hide comment
@drughi

drughi Jun 10, 2016

yes, sorry i am on it right now :(

drughi commented Jun 10, 2016

yes, sorry i am on it right now :(

@fire

This comment has been minimized.

Show comment
Hide comment
@fire

fire Jun 10, 2016

ssl_ca_cert and etc refer to filesystem locations as a string rather than a raw string.

fire commented Jun 10, 2016

ssl_ca_cert and etc refer to filesystem locations as a string rather than a raw string.

@fire

This comment has been minimized.

Show comment
Hide comment
@fire

fire Jun 10, 2016

I was able to get the kafka plugin to work with kafkacloud through their python consumer (had some trouble with snappy) http://www.cloudkafka.com/docs-python.html

Kafkacloud uses TLS.

Here is how I modified the td-agent chef:

td_agent_gem 'snappy' do
end

td_agent_gem "ruby-kafka" do
end

td_agent_gem "zookeeper" do
end

td_agent_gem "msgpack" do
end

td_agent_gem "yajl-ruby" do
end

td_agent_gem "ltsv" do
end

td_agent_gem "poseidon_cluster" do
end

git "/usr/local/src/fluent-plugin-kafka" do
  repository "https://github.com/repeatedly/fluent-plugin-kafka.git"
  reference "use-ruby-kafka"
  action :sync
end

# https://stackoverflow.com/questions/16365146/install-ruby-gem-globally-from-github-repository
execute 'gem build fluent-plugin-kafka.gemspec' do
  cwd '/usr/local/src/fluent-plugin-kafka'
end

td_agent_gem 'kafka' do
  source '/usr/local/src/fluent-plugin-kafka/fluent-plugin-kafka-0.1.5.gem'
  plugin true
end

td_agent_source 'example_in_http' do
  type 'http'
  params(format: 'json', add_http_headers: true)
end

template '/etc/td-agent/conf.d/kafka-ca.pem' do
  source "ca.pem.erb"
  owner 'td-agent'
  group 'td-agent'
  mode 00600
end

template '/etc/td-agent/conf.d/kafka-cert.pem' do
  source "cert.pem.erb"
  owner 'td-agent'
  group 'td-agent'
  mode 00600
end

template '/etc/td-agent/conf.d/kafka-key.pem' do
  source "key.pem.erb"
  owner 'td-agent'
  group 'td-agent'
  mode 00600
end

td_agent_match 'forward_kafka' do
  type 'kafka_buffered'
  tag '*'
  params(brokers: '01.example.com:9093,02.example.com:9093,03.example.com:9093',
         default_topic: 'xxxx-xxxx',
         output_data_type: 'json',
         compression_codec: 'snappy',
         ssl_ca_cert: '/etc/td-agent/conf.d/kafka-ca.pem',
         ssl_client_cert: '/etc/td-agent/conf.d/kafka-cert.pem',
         ssl_client_cert_key: '/etc/td-agent/conf.d/kafka-key.pem'
)
end

fire commented Jun 10, 2016

I was able to get the kafka plugin to work with kafkacloud through their python consumer (had some trouble with snappy) http://www.cloudkafka.com/docs-python.html

Kafkacloud uses TLS.

Here is how I modified the td-agent chef:

td_agent_gem 'snappy' do
end

td_agent_gem "ruby-kafka" do
end

td_agent_gem "zookeeper" do
end

td_agent_gem "msgpack" do
end

td_agent_gem "yajl-ruby" do
end

td_agent_gem "ltsv" do
end

td_agent_gem "poseidon_cluster" do
end

git "/usr/local/src/fluent-plugin-kafka" do
  repository "https://github.com/repeatedly/fluent-plugin-kafka.git"
  reference "use-ruby-kafka"
  action :sync
end

# https://stackoverflow.com/questions/16365146/install-ruby-gem-globally-from-github-repository
execute 'gem build fluent-plugin-kafka.gemspec' do
  cwd '/usr/local/src/fluent-plugin-kafka'
end

td_agent_gem 'kafka' do
  source '/usr/local/src/fluent-plugin-kafka/fluent-plugin-kafka-0.1.5.gem'
  plugin true
end

td_agent_source 'example_in_http' do
  type 'http'
  params(format: 'json', add_http_headers: true)
end

template '/etc/td-agent/conf.d/kafka-ca.pem' do
  source "ca.pem.erb"
  owner 'td-agent'
  group 'td-agent'
  mode 00600
end

template '/etc/td-agent/conf.d/kafka-cert.pem' do
  source "cert.pem.erb"
  owner 'td-agent'
  group 'td-agent'
  mode 00600
end

template '/etc/td-agent/conf.d/kafka-key.pem' do
  source "key.pem.erb"
  owner 'td-agent'
  group 'td-agent'
  mode 00600
end

td_agent_match 'forward_kafka' do
  type 'kafka_buffered'
  tag '*'
  params(brokers: '01.example.com:9093,02.example.com:9093,03.example.com:9093',
         default_topic: 'xxxx-xxxx',
         output_data_type: 'json',
         compression_codec: 'snappy',
         ssl_ca_cert: '/etc/td-agent/conf.d/kafka-ca.pem',
         ssl_client_cert: '/etc/td-agent/conf.d/kafka-cert.pem',
         ssl_client_cert_key: '/etc/td-agent/conf.d/kafka-key.pem'
)
end

@repeatedly repeatedly changed the title from [WIP] Use ruby-kafka to Use ruby-kafka Jun 13, 2016

@repeatedly

This comment has been minimized.

Show comment
Hide comment
@repeatedly

repeatedly Jun 13, 2016

Member

Thanks for the testing!

Member

repeatedly commented Jun 13, 2016

Thanks for the testing!

@repeatedly

This comment has been minimized.

Show comment
Hide comment
@repeatedly

repeatedly Jun 13, 2016

Member

It seems this PR is ready to merge.
Does anyone have any concern?

Member

repeatedly commented Jun 13, 2016

It seems this PR is ready to merge.
Does anyone have any concern?

@drughi

This comment has been minimized.

Show comment
Hide comment
@drughi

drughi Jun 13, 2016

hello

also managed to make it work without problem!

thanks!

El 13 jun 2016, a las 10:12, Masahiro Nakagawa notifications@github.com escribió:

It seems this PR is ready to merge.
Does anyone have any concern?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub #54 (comment), or mute the thread https://github.com/notifications/unsubscribe/APLI-SM_S4Y2hjMsP144fIrYQ8qA7bHgks5qLRD0gaJpZM4IwuOr.

drughi commented Jun 13, 2016

hello

also managed to make it work without problem!

thanks!

El 13 jun 2016, a las 10:12, Masahiro Nakagawa notifications@github.com escribió:

It seems this PR is ready to merge.
Does anyone have any concern?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub #54 (comment), or mute the thread https://github.com/notifications/unsubscribe/APLI-SM_S4Y2hjMsP144fIrYQ8qA7bHgks5qLRD0gaJpZM4IwuOr.

@repeatedly

This comment has been minimized.

Show comment
Hide comment
@repeatedly

repeatedly Jun 16, 2016

Member

@htgc Do you have any concern to merge this?

Member

repeatedly commented Jun 16, 2016

@htgc Do you have any concern to merge this?

@htgc

This comment has been minimized.

Show comment
Hide comment
@htgc

htgc Jun 18, 2016

Collaborator

@repeatedly Nope. Thank you very much for contribution!!
I will merge and release as v0.2.0 soon.

Collaborator

htgc commented Jun 18, 2016

@repeatedly Nope. Thank you very much for contribution!!
I will merge and release as v0.2.0 soon.

@htgc htgc merged commit e301a45 into fluent:master Jun 18, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment