Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added BufferChunk#unique_id which is an unique identifier of the buff…

…ered chunk
  • Loading branch information...
commit e463eb114b003b1ef9b38fd5cc4e94f44f5b4892 1 parent d93a151
@frsyuki frsyuki authored
Showing with 35 additions and 17 deletions.
  1. +30 −17 lib/fluent/plugin/buf_file.rb
  2. +5 −0 lib/fluent/plugin/buf_memory.rb
View
47 lib/fluent/plugin/buf_file.rb
@@ -19,14 +19,17 @@ module Fluent
class FileBufferChunk < BufferChunk
- def initialize(key, path, mode="a+")
+ def initialize(key, path, unique_id, mode="a+")
super(key)
@path = path
+ @unique_id = unique_id
@file = File.open(@path, mode)
@file.sync = true
@size = @file.stat.size
end
+ attr_reader :unique_id
+
def <<(data)
@file.write(data)
@size += data.bytesize
@@ -103,8 +106,9 @@ def start
def new_chunk(key)
encoded_key = encode_key(key)
- path = make_path(encoded_key, "b")
- FileBufferChunk.new(key, path)
+ path, tsuffix = make_path(encoded_key, "b")
+ unique_id = tsuffix_to_unique_id(tsuffix)
+ FileBufferChunk.new(key, path, unique_id)
end
def resume
@@ -116,28 +120,30 @@ def resume
if m = PATH_MATCH.match(match)
key = decode_key(m[1])
bq = m[2]
- tsuffix = m[3].to_i(16)
+ tsuffix = m[3]
+ timestamp = m[3].to_i(16)
+ unique_id = tsuffix_to_unique_id(tsuffix)
if bq == 'b'
- chunk = FileBufferChunk.new(key, path, "a+")
- maps << [tsuffix, chunk]
+ chunk = FileBufferChunk.new(key, path, unique_id, "a+")
+ maps << [timestamp, chunk]
elsif bq == 'q'
- chunk = FileBufferChunk.new(key, path, "r")
- queues << [tsuffix, chunk]
+ chunk = FileBufferChunk.new(key, path, unique_id, "r")
+ queues << [timestamp, chunk]
end
end
}
map = {}
- maps.sort_by {|(tsuffix,chunk)|
- tsuffix
- }.each {|(tsuffix,chunk)|
+ maps.sort_by {|(timestamp,chunk)|
+ timestamp
+ }.each {|(timestamp,chunk)|
map[chunk.key] = chunk
}
- queue = queues.sort_by {|(tsuffix,chunk)|
- tsuffix
- }.map {|(tsuffix,chunk)|
+ queue = queues.sort_by {|(timestamp,chunk)|
+ timestamp
+ }.map {|(timestamp,chunk)|
chunk
}
@@ -150,7 +156,8 @@ def enqueue(chunk)
m = PATH_MATCH.match(mp)
encoded_key = m ? m[1] : ""
- npath = make_path(encoded_key, "q")
+ tsuffix = m[3]
+ npath = "#{@buffer_path_prefix}#{encoded_key}.q#{tsuffix}#{@buffer_path_suffix}"
chunk.mv(npath)
end
@@ -166,8 +173,14 @@ def decode_key(encoded_key)
def make_path(encoded_key, bq)
now = Time.now.utc
- tsuffix = ((now.to_i*1000*1000+now.usec) << 12 | rand(0xfff)).to_s(16)
- "#{@buffer_path_prefix}#{encoded_key}.#{bq}#{tsuffix}#{@buffer_path_suffix}"
+ timestamp = ((now.to_i*1000*1000+now.usec) << 12 | rand(0xfff))
+ tsuffix = timestamp.to_s(16)
+ path = "#{@buffer_path_prefix}#{encoded_key}.#{bq}#{tsuffix}#{@buffer_path_suffix}"
+ return path, tsuffix
+ end
+
+ def tsuffix_to_unique_id(tsuffix)
+ tsuffix.scan(/../).map {|x| x.to_i(16) }.pack('C*') * 2
end
end
View
5 lib/fluent/plugin/buf_memory.rb
@@ -22,9 +22,14 @@ class MemoryBufferChunk < BufferChunk
def initialize(key, data='')
@data = data
@data.force_encoding('ASCII-8BIT')
+ now = Time.now.utc
+ u1 = ((now.to_i*1000*1000+now.usec) << 12 | rand(0xfff))
+ @unique_id = [u1 >> 32, u1 & u1 & 0xffffffff, rand(0xffffffff), rand(0xffffffff)].pack('NNNN')
super(key)
end
+ attr_reader :unique_id
+
def <<(data)
data.force_encoding('ASCII-8BIT')
@data << data
Please sign in to comment.
Something went wrong with that request. Please try again.