Skip to content

Commit

Permalink
Added support for Snappy compression to the Consumer, and added compr…
Browse files Browse the repository at this point in the history
…ession (Snappy and GZip) to the Producer and MultiProducer.

To enable snappy compression, include the following in your Gemfile (This branch is necessary until a pending pull request is accepted):

    gem "snappy", "0.0.4", :git => "git://github.com/watersofoblivion/snappy.git", :branch => "snappy-streaming"
  • Loading branch information
Bob Cotton authored and Jonathan Bryant committed Dec 11, 2012
1 parent 7bd4bf3 commit f4eda48
Show file tree
Hide file tree
Showing 16 changed files with 372 additions and 87 deletions.
6 changes: 6 additions & 0 deletions Gemfile
@@ -0,0 +1,6 @@
source 'https://rubygems.org'

# Specify your gem's dependencies in foo.gemspec
gemspec

gem "rake"
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -7,6 +7,11 @@ and is used in production at wooga.
You need to have access to your Kafka instance and be able to connect through TCP. You need to have access to your Kafka instance and be able to connect through TCP.
You can obtain a copy and instructions on how to setup kafka at http://incubator.apache.org/kafka/ You can obtain a copy and instructions on how to setup kafka at http://incubator.apache.org/kafka/


To make Snappy compression available, add

gem "snappy", "0.0.4", :git => "git://github.com/watersofoblivion/snappy.git", :branch => "snappy-streaming"

to your Gemfile.


## Installation ## Installation


Expand Down
36 changes: 1 addition & 35 deletions Rakefile
Expand Up @@ -19,35 +19,7 @@ require 'rubygems/specification'
require 'date' require 'date'
require 'rspec/core/rake_task' require 'rspec/core/rake_task'


spec = Gem::Specification.new do |s| spec = eval(File.open("kafka-rb.gemspec", "r").read)
s.name = %q{kafka-rb}
s.version = "0.0.11"

s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
s.authors = ["Alejandro Crosa", "Stefan Mees", "Tim Lossen", "Liam Stewart"]
s.autorequire = %q{kafka-rb}
s.date = Time.now.strftime("%Y-%m-%d")
s.description = %q{kafka-rb allows you to produce and consume messages using the Kafka distributed publish/subscribe messaging service.}
s.extra_rdoc_files = ["LICENSE"]
s.files = ["LICENSE", "README.md", "Rakefile"] + Dir.glob("lib/**/*.rb")
s.test_files = Dir.glob("spec/**/*.rb")
s.homepage = %q{http://github.com/acrosa/kafka-rb}
s.require_paths = ["lib"]
s.summary = %q{A Ruby client for the Kafka distributed publish/subscribe messaging service}

if s.respond_to? :specification_version then
current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION
s.specification_version = 3

if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then
s.add_development_dependency(%q<rspec>, [">= 0"])
else
s.add_dependency(%q<rspec>, [">= 0"])
end
else
s.add_dependency(%q<rspec>, [">= 0"])
end
end


Gem::PackageTask.new(spec) do |pkg| Gem::PackageTask.new(spec) do |pkg|
pkg.gem_spec = spec pkg.gem_spec = spec
Expand All @@ -58,12 +30,6 @@ task :install => [:package] do
sh %{sudo gem install pkg/#{GEM}-#{GEM_VERSION}} sh %{sudo gem install pkg/#{GEM}-#{GEM_VERSION}}
end end


desc "Run all examples with RCov"
RSpec::Core::RakeTask.new(:rcov) do |t|
t.pattern = FileList['spec/**/*_spec.rb']
t.rcov = true
end

desc "Run specs" desc "Run specs"
RSpec::Core::RakeTask.new do |t| RSpec::Core::RakeTask.new do |t|
t.pattern = FileList['spec/**/*_spec.rb'] t.pattern = FileList['spec/**/*_spec.rb']
Expand Down
29 changes: 29 additions & 0 deletions kafka-rb.gemspec
@@ -0,0 +1,29 @@
Gem::Specification.new do |s|
s.name = %q{kafka-rb}
s.version = "0.0.12"

s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
s.authors = ["Alejandro Crosa", "Stefan Mees", "Tim Lossen", "Liam Stewart"]
s.autorequire = %q{kafka-rb}
s.date = Time.now.strftime("%Y-%m-%d")
s.description = %q{kafka-rb allows you to produce and consume messages using the Kafka distributed publish/subscribe messaging service.}
s.extra_rdoc_files = ["LICENSE"]
s.files = ["LICENSE", "README.md", "Rakefile"] + Dir.glob("lib/**/*.rb")
s.test_files = Dir.glob("spec/**/*.rb")
s.homepage = %q{http://github.com/acrosa/kafka-rb}
s.require_paths = ["lib"]
s.summary = %q{A Ruby client for the Kafka distributed publish/subscribe messaging service}

if s.respond_to? :specification_version then
current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION
s.specification_version = 3

if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then
s.add_development_dependency(%q<rspec>, [">= 0"])
else
s.add_dependency(%q<rspec>, [">= 0"])
end
else
s.add_dependency(%q<rspec>, [">= 0"])
end
end
8 changes: 6 additions & 2 deletions lib/kafka.rb
Expand Up @@ -14,8 +14,12 @@
# limitations under the License. # limitations under the License.
require 'socket' require 'socket'
require 'zlib' require 'zlib'
if RUBY_VERSION[0,3] == "1.8" require "stringio"
require 'iconv'
begin
require 'snappy'
rescue LoadError
nil
end end


require File.join(File.dirname(__FILE__), "kafka", "io") require File.join(File.dirname(__FILE__), "kafka", "io")
Expand Down
34 changes: 16 additions & 18 deletions lib/kafka/encoder.rb
Expand Up @@ -15,22 +15,12 @@


module Kafka module Kafka
module Encoder module Encoder
def self.message(message) def self.message(message, compression = Message::NO_COMPRESSION)
payload = \ message.encode(compression)
if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding
Iconv.new('UTF-8//IGNORE', 'UTF-8').iconv(message.payload.to_s)
else
message.payload.to_s.force_encoding(Encoding::ASCII_8BIT)
end
data = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + payload

[data.length].pack("N") + data
end end


def self.message_block(topic, partition, messages) def self.message_block(topic, partition, messages, compression)
message_set = Array(messages).collect { |message| message_set = message_set(messages, compression)
self.message(message)
}.join("")


topic = [topic.length].pack("n") + topic topic = [topic.length].pack("n") + topic
partition = [partition].pack("N") partition = [partition].pack("N")
Expand All @@ -39,16 +29,24 @@ def self.message_block(topic, partition, messages)
return topic + partition + messages return topic + partition + messages
end end


def self.produce(topic, partition, messages) def self.message_set(messages, compression)
message_set = Array(messages).collect { |message|
self.message(message)
}.join("")
message_set = self.message(Message.new(message_set), compression) unless compression == Message::NO_COMPRESSION
message_set
end

def self.produce(topic, partition, messages, compression = Message::NO_COMPRESSION)
request = [RequestType::PRODUCE].pack("n") request = [RequestType::PRODUCE].pack("n")
data = request + self.message_block(topic, partition, messages) data = request + self.message_block(topic, partition, messages, compression)


return [data.length].pack("N") + data return [data.length].pack("N") + data
end end


def self.multiproduce(producer_requests) def self.multiproduce(producer_requests, compression = Message::NO_COMPRESSION)
part_set = Array(producer_requests).map { |req| part_set = Array(producer_requests).map { |req|
self.message_block(req.topic, req.partition, req.messages) self.message_block(req.topic, req.partition, req.messages, compression)
} }


request = [RequestType::MULTIPRODUCE].pack("n") request = [RequestType::MULTIPRODUCE].pack("n")
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka/io.rb
Expand Up @@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
module Kafka module Kafka
module IO module IO
attr_accessor :socket, :host, :port attr_accessor :socket, :host, :port, :compression


HOST = "localhost" HOST = "localhost"
PORT = 9092 PORT = 9092
Expand Down
122 changes: 107 additions & 15 deletions lib/kafka/message.rb
Expand Up @@ -33,6 +33,10 @@ module Kafka
class Message class Message


MAGIC_IDENTIFIER_DEFAULT = 0 MAGIC_IDENTIFIER_DEFAULT = 0
MAGIC_IDENTIFIER_COMPRESSION = 1
NO_COMPRESSION = 0
GZIP_COMPRESSION = 1
SNAPPY_COMPRESSION = 2
BASIC_MESSAGE_HEADER = 'NC'.freeze BASIC_MESSAGE_HEADER = 'NC'.freeze
VERSION_0_HEADER = 'N'.freeze VERSION_0_HEADER = 'N'.freeze
VERSION_1_HEADER = 'CN'.freeze VERSION_1_HEADER = 'CN'.freeze
Expand All @@ -41,9 +45,10 @@ class Message
attr_accessor :magic, :checksum, :payload attr_accessor :magic, :checksum, :payload


def initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil) def initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil)
self.magic = magic self.magic = magic
self.payload = payload || "" self.payload = payload || ""
self.checksum = checksum || self.calculate_checksum self.checksum = checksum || self.calculate_checksum
@compression = NO_COMPRESSION
end end


def calculate_checksum def calculate_checksum
Expand All @@ -66,7 +71,7 @@ def self.parse_from(data)
break if bytes_processed + message_size + 4 > data.length # message is truncated break if bytes_processed + message_size + 4 > data.length # message is truncated


case magic case magic
when 0 when MAGIC_IDENTIFIER_DEFAULT
# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 ... # | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 ...
# | | | | # | | | |
# | message_size |magic| checksum | payload ... # | message_size |magic| checksum | payload ...
Expand All @@ -75,7 +80,7 @@ def self.parse_from(data)
payload = data[bytes_processed + 9, payload_size] payload = data[bytes_processed + 9, payload_size]
messages << Kafka::Message.new(payload, magic, checksum) messages << Kafka::Message.new(payload, magic, checksum)


when 1 when MAGIC_IDENTIFIER_COMPRESSION
# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 ... # | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 ...
# | | | | | # | | | | |
# | size |magic|attrs| checksum | payload ... # | size |magic|attrs| checksum | payload ...
Expand All @@ -84,18 +89,22 @@ def self.parse_from(data)
payload = data[bytes_processed + 10, payload_size] payload = data[bytes_processed + 10, payload_size]


case attributes & COMPRESSION_CODEC_MASK case attributes & COMPRESSION_CODEC_MASK
when 0 # a single uncompressed message when NO_COMPRESSION # a single uncompressed message
messages << Kafka::Message.new(payload, magic, checksum) messages << Kafka::Message.new(payload, magic, checksum)
when 1 # a gzip-compressed message set -- parse recursively when GZIP_COMPRESSION # a gzip-compressed message set -- parse recursively
uncompressed = Zlib::GzipReader.new(StringIO.new(payload)).read uncompressed = Zlib::GzipReader.new(StringIO.new(payload)).read
message_set = parse_from(uncompressed) message_set = parse_from(uncompressed)
raise 'malformed compressed message' if message_set.size != uncompressed.size raise 'malformed compressed message' if message_set.size != uncompressed.size
messages.concat(message_set.messages) messages.concat(message_set.messages)
when SNAPPY_COMPRESSION # a snappy-compresses message set -- parse recursively
ensure_snappy! do
uncompressed = Snappy::Reader.new(StringIO.new(payload)).read
message_set = parse_from(uncompressed)
raise 'malformed compressed message' if message_set.size != uncompressed.size
messages.concat(message_set.messages)
end
else else
# https://cwiki.apache.org/confluence/display/KAFKA/Compression # https://cwiki.apache.org/confluence/display/KAFKA/Compression
# claims that 2 is for Snappy compression, but Kafka's Scala client
# implementation doesn't seem to support it yet, so I don't have
# a reference implementation to test against.
raise "Unsupported Kafka compression codec: #{attributes & COMPRESSION_CODEC_MASK}" raise "Unsupported Kafka compression codec: #{attributes & COMPRESSION_CODEC_MASK}"
end end


Expand All @@ -108,10 +117,93 @@ def self.parse_from(data)


MessageSet.new(bytes_processed, messages) MessageSet.new(bytes_processed, messages)
end end
end


# Encapsulates a list of Kafka messages (as Kafka::Message objects in the def encode(compression = NO_COMPRESSION)
# +messages+ attribute) and their total serialized size in bytes (the +size+ @compression = compression
# attribute).
class MessageSet < Struct.new(:size, :messages); end self.payload = asciify_payload
self.payload = compress_payload if compression?

data = magic_and_compression + [calculate_checksum].pack("N") + payload
[data.length].pack("N") + data
end


# Encapsulates a list of Kafka messages (as Kafka::Message objects in the
# +messages+ attribute) and their total serialized size in bytes (the +size+
# attribute).
class MessageSet < Struct.new(:size, :messages); end

def self.ensure_snappy!
if Object.const_defined? "Snappy"
yield
else
fail "Snappy not available!"
end
end

def ensure_snappy! &block
self.class.ensure_snappy! &block
end

private

attr_reader :compression

def compression?
compression != NO_COMPRESSION
end

def magic_and_compression
if compression?
[MAGIC_IDENTIFIER_COMPRESSION, compression].pack("CC")
else
[MAGIC_IDENTIFIER_DEFAULT].pack("C")
end
end

def asciify_payload
if RUBY_VERSION[0, 3] == "1.8"
payload
else
payload.to_s.force_encoding(Encoding::ASCII_8BIT)
end
end

def compress_payload
case compression
when GZIP_COMPRESSION
gzip
when SNAPPY_COMPRESSION
snappy
end
end

def gzip
with_buffer do |buffer|
gz = Zlib::GzipWriter.new buffer, nil, nil
gz.write payload
gz.close
end
end

def snappy
ensure_snappy! do
with_buffer do |buffer|
Snappy::Writer.new buffer do |w|
w << payload
end
end
end
end

def with_buffer
buffer = StringIO.new
buffer.set_encoding Encoding::ASCII_8BIT unless RUBY_VERSION =~ /^1\.8/
yield buffer if block_given?
buffer.rewind
buffer.string
end
end
end end

5 changes: 3 additions & 2 deletions lib/kafka/multi_producer.rb
Expand Up @@ -19,16 +19,17 @@ class MultiProducer
def initialize(options={}) def initialize(options={})
self.host = options[:host] || HOST self.host = options[:host] || HOST
self.port = options[:port] || PORT self.port = options[:port] || PORT
self.compression = options[:compression] || Message::NO_COMPRESSION
self.connect(self.host, self.port) self.connect(self.host, self.port)
end end


def send(topic, messages, options={}) def send(topic, messages, options={})
partition = options[:partition] || 0 partition = options[:partition] || 0
self.write(Encoder.produce(topic, partition, messages)) self.write(Encoder.produce(topic, partition, messages, compression))
end end


def multi_send(producer_requests) def multi_send(producer_requests)
self.write(Encoder.multiproduce(producer_requests)) self.write(Encoder.multiproduce(producer_requests, compression))
end end
end end
end end
11 changes: 6 additions & 5 deletions lib/kafka/producer.rb
Expand Up @@ -20,15 +20,16 @@ class Producer
attr_accessor :topic, :partition attr_accessor :topic, :partition


def initialize(options = {}) def initialize(options = {})
self.topic = options[:topic] || "test" self.topic = options[:topic] || "test"
self.partition = options[:partition] || 0 self.partition = options[:partition] || 0
self.host = options[:host] || HOST self.host = options[:host] || HOST
self.port = options[:port] || PORT self.port = options[:port] || PORT
self.compression = options[:compression] || Message::NO_COMPRESSION
self.connect(self.host, self.port) self.connect(self.host, self.port)
end end


def send(messages) def send(messages)
self.write(Encoder.produce(self.topic, self.partition, messages)) self.write(Encoder.produce(self.topic, self.partition, messages, compression))
end end


def batch(&block) def batch(&block)
Expand Down

0 comments on commit f4eda48

Please sign in to comment.