diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index 74e0c08885..5cdd74575a 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -114,7 +114,9 @@ def load_plugin_dir(dir) end def emit(tag, time, record) - emit_stream tag, OneEventStream.new(time, record) + unless record.nil? + emit_stream tag, OneEventStream.new(time, record) + end end def emit_array(tag, array) diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 62c6acf0e5..516a938024 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -124,18 +124,20 @@ def on_message(msg) # Forward es = MultiEventStream.new entries.each {|e| + record = e[1] + next if record.nil? time = e[0].to_i time = (now ||= Engine.now) if time == 0 - record = e[1] es.add(time, record) } Engine.emit_stream(tag, es) else # Message + record = msg[2] + return if record.nil? time = msg[1] time = Engine.now if time == 0 - record = msg[2] Engine.emit(tag, time, record) end end diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index d7e32b6f2a..3bb7dedefa 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -117,6 +117,11 @@ def on_request(path_info, params) raise "'json' or 'msgpack' parameter is required" end + # Skip nil record + if record.nil? + return ["200 OK", {'Content-type'=>'text/plain'}, ""] + end + time = params['time'] time = time.to_i if time == 0 diff --git a/lib/fluent/plugin/in_stream.rb b/lib/fluent/plugin/in_stream.rb index 03157dcd6e..728b7f606b 100644 --- a/lib/fluent/plugin/in_stream.rb +++ b/lib/fluent/plugin/in_stream.rb @@ -84,18 +84,21 @@ def on_message(msg) # Forward es = MultiEventStream.new entries.each {|e| + record = e[1] + next if record.nil? time = e[0].to_i time = (now ||= Engine.now) if time == 0 - record = e[1] es.add(time, record) } Engine.emit_stream(tag, es) else # Message + record = msg[2] + return if record.nil? + time = msg[1] time = Engine.now if time == 0 - record = msg[2] Engine.emit(tag, time, record) end end