Skip to content

Commit

Permalink
Merge pull request #2054 from okkez/calculate-timekey
Browse files Browse the repository at this point in the history
Consider timezone when calculate timekey
  • Loading branch information
repeatedly committed Jul 23, 2018
2 parents d2137c2 + 54bda48 commit 1c55789
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 6 deletions.
23 changes: 17 additions & 6 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ def configure(conf)
raise Fluent::ConfigError, "<buffer ...> argument includes 'time', but timekey is not configured" unless @buffer_config.timekey
Fluent::Timezone.validate!(@buffer_config.timekey_zone)
@timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone
@timekey = @buffer_config.timekey
@timekey_use_utc = @buffer_config.timekey_use_utc
@offset = Fluent::Timezone.utc_offset(@timekey_zone)
@calculate_offset = @offset.respond_to?(:call) ? @offset : nil
@output_time_formatter_cache = {}
end

Expand Down Expand Up @@ -803,20 +807,17 @@ def metadata(tag, time, record)
if !@chunk_key_time && !@chunk_key_tag
@buffer.metadata()
elsif @chunk_key_time && @chunk_key_tag
time_int = time.to_i
timekey = (time_int - (time_int % @buffer_config.timekey)).to_i
timekey = calculate_timekey(time)
@buffer.metadata(timekey: timekey, tag: tag)
elsif @chunk_key_time
time_int = time.to_i
timekey = (time_int - (time_int % @buffer_config.timekey)).to_i
timekey = calculate_timekey(time)
@buffer.metadata(timekey: timekey)
else
@buffer.metadata(tag: tag)
end
else
timekey = if @chunk_key_time
time_int = time.to_i
(time_int - (time_int % @buffer_config.timekey)).to_i
calculate_timekey(time)
else
nil
end
Expand All @@ -825,6 +826,16 @@ def metadata(tag, time, record)
end
end

def calculate_timekey(time)
time_int = time.to_i
if @timekey_use_utc
(time_int - (time_int % @timekey)).to_i
else
offset = @calculate_offset ? @calculate_offset.call(time) : @offset
(time_int - ((time_int + offset)% @timekey)).to_i
end
end

def chunk_for_test(tag, time, record)
require 'fluent/plugin/buffer/memory_chunk'

Expand Down
14 changes: 14 additions & 0 deletions lib/fluent/timezone.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,19 @@ def self.formatter(timezone = nil, format = nil)

return nil
end

def self.utc_offset(timezone)
return 0 if timezone.nil?

case timezone
when NUMERIC_PATTERN
Time.zone_offset(timezone)
when NAME_PATTERN
tz = TZInfo::Timezone.get(timezone)
->(time) {
tz.period_for_utc(time).utc_total_offset
}
end
end
end
end
73 changes: 73 additions & 0 deletions test/plugin/test_out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,79 @@ def parse_system(text)
check_gzipped_result(path, formatted_lines * 3)
end

test 'append when JST' do
with_timezone(Fluent.windows? ? "JST-9" : "Asia/Tokyo") do
time = event_time("2011-01-02 03:14:15+09:00")
formatted_lines = %[2011-01-02T03:14:15+09:00\ttest\t{"a":1}\n] + %[2011-01-02T03:14:15+09:00\ttest\t{"a":2}\n]

write_once = ->(){
d = create_driver %[
path #{TMP_DIR}/out_file_test
compress gz
append true
<buffer>
timekey_use_utc false
timekey_zone Asia/Tokyo
</buffer>
]
d.run(default_tag: 'test'){
d.feed(time, {"a"=>1})
d.feed(time, {"a"=>2})
}
d.instance.last_written_path
}

path = write_once.call
assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path
check_gzipped_result(path, formatted_lines)

path = write_once.call
assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path
check_gzipped_result(path, formatted_lines * 2)

path = write_once.call
assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path
check_gzipped_result(path, formatted_lines * 3)
end
end

test 'append when UTC-02 but timekey_zone is +0900' do
with_timezone("UTC-02") do # +0200
time = event_time("2011-01-02 17:14:15+02:00")
formatted_lines = %[2011-01-02T17:14:15+02:00\ttest\t{"a":1}\n] + %[2011-01-02T17:14:15+02:00\ttest\t{"a":2}\n]

write_once = ->(){
d = create_driver %[
path #{TMP_DIR}/out_file_test
compress gz
append true
<buffer>
timekey_use_utc false
timekey_zone +0900
</buffer>
]
d.run(default_tag: 'test'){
d.feed(time, {"a"=>1})
d.feed(time, {"a"=>2})
}
d.instance.last_written_path
}

path = write_once.call
# Rotated at 2011-01-02 17:00:00+02:00
assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path
check_gzipped_result(path, formatted_lines)

path = write_once.call
assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path
check_gzipped_result(path, formatted_lines * 2)

path = write_once.call
assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path
check_gzipped_result(path, formatted_lines * 3)
end
end

test '${chunk_id}' do
time = event_time("2011-01-02 13:14:15 UTC")
formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]
Expand Down

0 comments on commit 1c55789

Please sign in to comment.