Skip to content

Commit

Permalink
Fix for an output plugin which has #format
Browse files Browse the repository at this point in the history
  • Loading branch information
ganmacs committed Sep 2, 2016
1 parent 32efffd commit 1709b78
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 3 deletions.
23 changes: 23 additions & 0 deletions example/in_dummy_with_compression.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<source>
@type dummy
@label @main
tag "test.data"
size 2
rate 10
dummy {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaay"}
auto_increment_key number
</source>

<label @main>
<match test.data>
@type buffered_stdout
<buffer>
@type file
path "#{Dir.pwd}/compressed_buffers"
flush_at_shutdown false
chunk_limit_size 1m
flush_interval 10s
compress gzip
</buffer>
</match>
</label>
4 changes: 2 additions & 2 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ def write_once(metadata, data, format: nil, size: nil, &block)
serialized = format.call(data)
chunk.concat(serialized, size ? size.call : data.size)
else
chunk.append(data)
chunk.append(data, compress: @compress)
end
adding_bytesize = chunk.bytesize - original_bytesize

Expand Down Expand Up @@ -561,7 +561,7 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
if format
chunk.concat(format.call(split), split.size)
else
chunk.append(split)
chunk.append(split, compress: @compress)
end

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
Expand Down
18 changes: 17 additions & 1 deletion lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

require 'monitor'
require 'tempfile'
require 'zlib'

module Fluent
module Plugin
Expand Down Expand Up @@ -66,7 +67,8 @@ def initialize(metadata, compress: :text)
attr_reader :unique_id, :metadata, :created_at, :modified_at, :state

# data is array of formatted record string
def append(data)
def append(data, **kwargs)
raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip
adding = ''.b
data.each do |d|
adding << d.b
Expand Down Expand Up @@ -165,6 +167,20 @@ def write_to(io, **kwargs)
module Decompressable
include Fluent::Plugin::Compressable

def append(data, **kwargs)
if kwargs[:compress] == :gzip
io = StringIO.new
Zlib::GzipWriter.wrap(io) do |gz|
data.each do |d|
gz.write d
end
end
concat(io.string, data.size)
else
super
end
end

def open(**kwargs, &block)
if kwargs[:compressed] == :gzip
super
Expand Down
1 change: 1 addition & 0 deletions test/plugin/test_buffer_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class BufferChunkTest < Test::Unit::TestCase
assert_raise(ArgumentError){ chunk.read(compressed: :gzip) }
assert_raise(ArgumentError){ chunk.open(compressed: :gzip){} }
assert_raise(ArgumentError){ chunk.write_to(nil, compressed: :gzip) }
assert_raise(ArgumentError){ chunk.append(nil, compress: :gzip) }
end
end

Expand Down
11 changes: 11 additions & 0 deletions test/plugin/test_buffer_file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,17 @@ def gen_chunk_path(prefix, unique_id)
@gzipped_src = compress(@src)
end

test '#append with compress option writes compressed data to chunk when compress is gzip' do
c = @klass.new(gen_metadata, File.join(@chunkdir,'test.*.log'), :create, compress: :gzip)
c.append([@src, @src], compress: :gzip)
c.commit

# check chunk is compressed
assert c.read(compressed: :gzip).size < [@src, @src].join("").size

assert_equal @src + @src, c.read
end

test '#open passes io object having decompressed data to a block when compress is gzip' do
c = @klass.new(gen_metadata, File.join(@chunkdir,'test.*.log'), :create, compress: :gzip)
c.concat(@gzipped_src, @src.size)
Expand Down
11 changes: 11 additions & 0 deletions test/plugin/test_buffer_memory_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ class BufferMemoryChunkTest < Test::Unit::TestCase
@gzipped_src = compress(@src)
end

test '#append with compress option writes compressed data to chunk when compress is gzip' do
c = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new, compress: :gzip)
c.append([@src, @src], compress: :gzip)
c.commit

# check chunk is compressed
assert c.read(compressed: :gzip).size < [@src, @src].join("").size

assert_equal @src + @src, c.read
end

test '#open passes io object having decompressed data to a block when compress is gzip' do
c = Fluent::Plugin::Buffer::MemoryChunk.new(Object.new, compress: :gzip)
c.concat(@gzipped_src, @src.size)
Expand Down

0 comments on commit 1709b78

Please sign in to comment.