Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add kafka-subscribe and kafka-publish commands

*kafka-subscribe* and *kafka-publish* are command line programs to help
in the task of debugging a kafka installation.
  • Loading branch information...
commit 7ea55aab50df583cebec29fb9c0245da5f419089 1 parent cd3d312
@guillermo guillermo authored
View
50 README.md
@@ -66,6 +66,56 @@ to your Gemfile.
end
+### Using the cli
+
+There is two cli programs to communicate with kafka from the command line
+interface mainly intended for debug. `kafka-publish` and `kafka-consumer`. You
+can configure the commands by command line arguments or by setting the
+environment variables: *KAFKA_HOST*, *KAFKA_PORT*, *KAFKA_TOPIC*,
+*KAFKA_COMPRESSION*.
+
+
+
+#### kafka-publish
+
+```
+$ kafka-publish --help
+Usage: kafka-publish [options]
+
+ -h, --host HOST Set the kafka hostname
+ -p, --port PORT Set the kafka port
+ -t, --topic TOPIC Set the kafka topic
+ -c, --compression no|gzip|snappy Set the compression method
+ -m, --message MESSAGE Message to send
+```
+
+If _message_ is omitted, `kafka-publish` will read from *STDIN*, until EOF or
+SIG-INT.
+
+NOTE: kafka-publish doesn't bach messages for the moment.
+
+This could be quiet handy for piping directly to kafka:
+
+```
+$ tail -f /var/log/syslog | kafka-publish -t syslog
+```
+
+#### kafka-consumer
+
+```
+$ kafka-consumer --help
+Usage: kafka-consumer [options]
+
+ -h, --host HOST Set the kafka hostname
+ -p, --port PORT Set the kafka port
+ -t, --topic TOPIC Set the kafka topic
+```
+
+Kafka consumer will loop and wait for messages until it is interrupted.
+
+This could be nice for example to have a sample of messages.
+
+
## Questions?
alejandrocrosa at(@) gmail.com
http://twitter.com/alejandrocrosa
View
6 bin/kafka-consumer
@@ -0,0 +1,6 @@
+#!/usr/bin/env ruby
+
+require 'kafka'
+require 'kafka/cli'
+
+Kafka::CLI.subscribe!
View
6 bin/kafka-publish
@@ -0,0 +1,6 @@
+#!/usr/bin/env ruby
+
+require 'kafka'
+require 'kafka/cli'
+
+Kafka::CLI.publish!
View
1  kafka-rb.gemspec
@@ -13,6 +13,7 @@ Gem::Specification.new do |s|
s.homepage = %q{http://github.com/acrosa/kafka-rb}
s.require_paths = ["lib"]
s.summary = %q{A Ruby client for the Kafka distributed publish/subscribe messaging service}
+ s.executables = Dir['bin/*']
if s.respond_to? :specification_version then
current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION
View
170 lib/kafka/cli.rb
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'optparse'
+
+module Kafka
+ module CLI #:nodoc: all
+ extend self
+
+ def publish!
+ read_env
+ parse_args
+ validate_config
+ if config[:message]
+ send(config, config.delete(:message))
+ else
+ publish(config)
+ end
+ end
+
+
+ def subscribe!
+ read_env
+ parse_args
+ validate_config
+ subscribe(config)
+ end
+
+ def validate_config
+ if config[:help]
+ puts help
+ exit
+ end
+ config[:host] ||= IO::HOST
+ config[:port] ||= IO::PORT
+ config[:topic].is_a?(String) or raise "Missing topic"
+
+ rescue RuntimeError => e
+ puts e.message
+ puts help
+ exit
+ end
+
+ def parse_args(args = ARGV)
+ option_parser.parse(args)
+ end
+
+ def read_env(env = ENV)
+ config[:host] = env["KAFKA_HOST"] if env["KAFKA_HOST"]
+ config[:port] = env["KAFKA_PORT"].to_i if env["KAFKA_PORT"]
+ config[:topic] = env["KAFKA_TOPIC"] if env["KAFKA_TOPIC"]
+ config[:compression] = string_to_compression(env["KAFKA_COMPRESSION"]) if env["KAFKA_COMPRESSION"]
+ end
+
+ def config
+ @config ||= {:compression => string_to_compression("no")}
+ end
+
+ def help
+ option_parser.to_s
+ end
+
+ def option_parser
+ OptionParser.new do |opts|
+ opts.banner = "Usage: #{program_name} [options]"
+ opts.separator ""
+
+ opts.on("-h","--host HOST", "Set the kafka hostname") do |h|
+ config[:host] = h
+ end
+
+ opts.on("-p", "--port PORT", "Set the kafka port") do |port|
+ config[:port] = port.to_i
+ end
+
+ opts.on("-t", "--topic TOPIC", "Set the kafka topic") do |topic|
+ config[:topic] = topic
+ end
+
+ opts.on("-c", "--compression no|gzip|snappy", "Set the compression method") do |meth|
+ config[:compression] = string_to_compression(meth)
+ end if publish?
+
+ opts.on("-m","--message MESSAGE", "Message to send") do |msg|
+ config[:message] = msg
+ end if publish?
+
+ opts.separator ""
+
+ opts.on("--help", "show the help") do
+ config[:help] = true
+ end
+
+ opts.separator ""
+ opts.separator "You can set the host, port, topic and compression from the environment variables: KAFKA_HOST, KAFKA_PORT, KAFKA_TOPIC AND KAFKA_COMPRESSION"
+ end
+ end
+
+ def publish?
+ program_name == "kafka-publish"
+ end
+
+ def subscribe?
+ program_name == "kafka-subscribe"
+ end
+
+ def program_name(pn = $0)
+ File.basename(pn)
+ end
+
+ def string_to_compression(meth)
+ case meth
+ when "no" then Message::NO_COMPRESSION
+ when "gzip" then Message::GZIP_COMPRESSION
+ when "snappy" then Message::SNAPPY_COMPRESSION
+ else raise "No supported compression"
+ end
+ end
+
+ def send(options, message)
+ Producer.new(options).send(Message.new(message))
+ end
+
+ def publish(options)
+ trap(:INT){ exit }
+ producer = Producer.new(options)
+ loop do
+ publish_loop(producer)
+ end
+ end
+
+ def publish_loop(producer)
+ message = read_input
+ producer.send(Message.new(message))
+ end
+
+ def read_input
+ input = $stdin.gets
+ if input
+ input.strip
+ else
+ exit # gets return nil when eof
+ end
+
+ end
+
+ def subscribe(options)
+ trap(:INT){ exit }
+ consumer = Consumer.new(options)
+ consumer.loop do |messages|
+ messages.each do |message|
+ puts message.payload
+ end
+ end
+ end
+
+ end
+end
View
133 spec/cli_spec.rb
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+require File.dirname(__FILE__) + '/spec_helper'
+require 'kafka/cli'
+
+describe CLI do
+
+ before(:each) do
+ CLI.instance_variable_set("@config", {})
+ CLI.stub(:puts)
+ end
+
+ describe "should read from env" do
+ describe "kafka host" do
+ it "should read KAFKA_HOST from env" do
+ CLI.read_env("KAFKA_HOST" => "google.com")
+ CLI.config[:host].should == "google.com"
+ end
+
+ it "kafka port" do
+ CLI.read_env("KAFKA_PORT" => "1234")
+ CLI.config[:port].should == 1234
+ end
+
+ it "kafka topic" do
+ CLI.read_env("KAFKA_TOPIC" => "news")
+ CLI.config[:topic].should == "news"
+ end
+
+ it "kafka compression" do
+ CLI.read_env("KAFKA_COMPRESSION" => "no")
+ CLI.config[:compression].should == Message::NO_COMPRESSION
+
+ CLI.read_env("KAFKA_COMPRESSION" => "gzip")
+ CLI.config[:compression].should == Message::GZIP_COMPRESSION
+
+ CLI.read_env("KAFKA_COMPRESSION" => "snappy")
+ CLI.config[:compression].should == Message::SNAPPY_COMPRESSION
+ end
+ end
+ end
+
+ describe "should read from command line" do
+ it "kafka host" do
+ CLI.parse_args(%w(--host google.com))
+ CLI.config[:host].should == "google.com"
+
+ CLI.parse_args(%w(-h google.com))
+ CLI.config[:host].should == "google.com"
+ end
+
+ it "kafka port" do
+ CLI.parse_args(%w(--port 1234))
+ CLI.config[:port].should == 1234
+
+ CLI.parse_args(%w(-p 1234))
+ CLI.config[:port].should == 1234
+ end
+
+ it "kafka topic" do
+ CLI.parse_args(%w(--topic news))
+ CLI.config[:topic].should == "news"
+
+ CLI.parse_args(%w(-t news))
+ CLI.config[:topic].should == "news"
+ end
+
+ it "kafka compression" do
+ CLI.stub(:publish? => true)
+
+ CLI.parse_args(%w(--compression no))
+ CLI.config[:compression].should == Message::NO_COMPRESSION
+ CLI.parse_args(%w(-c no))
+ CLI.config[:compression].should == Message::NO_COMPRESSION
+
+ CLI.parse_args(%w(--compression gzip))
+ CLI.config[:compression].should == Message::GZIP_COMPRESSION
+ CLI.parse_args(%w(-c gzip))
+ CLI.config[:compression].should == Message::GZIP_COMPRESSION
+
+ CLI.parse_args(%w(--compression snappy))
+ CLI.config[:compression].should == Message::SNAPPY_COMPRESSION
+ CLI.parse_args(%w(-c snappy))
+ CLI.config[:compression].should == Message::SNAPPY_COMPRESSION
+ end
+
+ it "message" do
+ CLI.stub(:publish? => true)
+ CLI.parse_args(%w(--message YEAH))
+ CLI.config[:message].should == "YEAH"
+
+ CLI.parse_args(%w(-m YEAH))
+ CLI.config[:message].should == "YEAH"
+ end
+
+ end
+
+ describe "config validation" do
+ it "should assign a default port" do
+ CLI.stub(:exit)
+ CLI.stub(:puts)
+ CLI.validate_config
+ CLI.config[:port].should == Kafka::IO::PORT
+ end
+ end
+
+ it "should assign a default host" do
+ CLI.stub(:exit)
+ CLI.validate_config
+ CLI.config[:host].should == Kafka::IO::HOST
+ end
+
+
+ it "read compression method" do
+ CLI.string_to_compression("no").should == Message::NO_COMPRESSION
+ CLI.string_to_compression("gzip").should == Message::GZIP_COMPRESSION
+ CLI.string_to_compression("snappy").should == Message::SNAPPY_COMPRESSION
+ lambda { CLI.send(:string_to_compression,nil) }.should raise_error
+ end
+
+end
Please sign in to comment.
Something went wrong with that request. Please try again.