diff --git a/lib/fluent/plugin/filter_parser.rb b/lib/fluent/plugin/filter_parser.rb index e9c03fd944..f15c8568ec 100644 --- a/lib/fluent/plugin/filter_parser.rb +++ b/lib/fluent/plugin/filter_parser.rb @@ -79,6 +79,10 @@ def filter_with_time(tag, time, record) end @accessor.delete(record) if @remove_key_name_field r = handle_parsed(tag, record, t, values) + # Note: If the parser returns multiple records from one raw_value, + # this returns only the first one record. + # Essentially, we should use `yield` here, but it is difficult with + # the current Filter structure. return t, r else if @emit_invalid_record_to_error diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index e738f2db69..92148fd8a2 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -203,54 +203,24 @@ def on_request(path_info, params) begin path = path_info[1..-1] # remove / tag = path.split('/').join('.') - record_time, record = parse_params(params) - # Skip nil record - if record.nil? - log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" } - if @respond_with_empty_img - return RESPONSE_IMG - else - if @use_204_response - return RESPONSE_204 - else - return RESPONSE_200 - end + mes = Fluent::MultiEventStream.new + parse_params(params) do |record_time, record| + if record.nil? + log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" } + next end - end - mes = nil - # Support batched requests - if record.is_a?(Array) - mes = Fluent::MultiEventStream.new - record.each do |single_record| - add_params_to_record(single_record, params) - - if param_time = params['time'] - param_time = param_time.to_f - single_time = param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) - elsif @custom_parser - single_time = @custom_parser.parse_time(single_record) - single_time, single_record = @custom_parser.convert_values(single_time, single_record) - else - single_time = convert_time_field(single_record) - end - - mes.add(single_time, single_record) - end - else add_params_to_record(record, params) time = if param_time = params['time'] param_time = param_time.to_f param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) else - if record_time.nil? - convert_time_field(record) - else - record_time - end + record_time.nil? ? convert_time_field(record) : record_time end + + mes.add(time, record) end rescue => e if @dump_error_log @@ -261,11 +231,7 @@ def on_request(path_info, params) # TODO server error begin - if mes - router.emit_stream(tag, mes) - else - router.emit(tag, time, record) - end + router.emit_stream(tag, mes) unless mes.empty? rescue => e if @dump_error_log log.error "failed to emit data", error: e @@ -308,20 +274,27 @@ def on_server_connect(conn) def parse_params_default(params) if msgpack = params['msgpack'] @parser_msgpack.parse(msgpack) do |_time, record| - return nil, record + if record.is_a?(Array) + # TODO: Temporarily supporting this case for compatibility. + # We should not consider this case here. + # We should fix MessagePackParser so that it doesn't return Array. + record.each do |single_record| + yield nil, single_record + end + else + yield nil, record + end end elsif js = params['json'] @parser_json.parse(js) do |_time, record| - return nil, record + yield nil, record end elsif ndjson = params['ndjson'] - events = [] ndjson.split(/\r?\n/).each do |js| @parser_json.parse(js) do |_time, record| - events.push(record) + yield nil, record end end - return nil, events else raise "'json', 'ndjson' or 'msgpack' parameter is required" end @@ -329,10 +302,9 @@ def parse_params_default(params) def parse_params_with_parser(params) if content = params[EVENT_RECORD_PARAMETER] - @custom_parser.parse(content) { |time, record| - raise "Received event is not #{@format_name}: #{content}" if record.nil? - return time, record - } + @custom_parser.parse(content) do |time, record| + yield time, record + end else raise "'#{EVENT_RECORD_PARAMETER}' parameter is required" end diff --git a/lib/fluent/plugin/parser_json.rb b/lib/fluent/plugin/parser_json.rb index 829aa4b72a..4b365bcaa5 100644 --- a/lib/fluent/plugin/parser_json.rb +++ b/lib/fluent/plugin/parser_json.rb @@ -70,16 +70,31 @@ def configure_json_parser(name) end def parse(text) - record = @load_proc.call(text) - time = parse_time(record) - if @execute_convert_values - time, record = convert_values(time, record) + parsed_json = @load_proc.call(text) + + if parsed_json.is_a?(Hash) + yield parse_one_record(parsed_json) + elsif parsed_json.is_a?(Array) + parsed_json.each do |record| + unless record.is_a?(Hash) + yield nil, nil + next + end + yield parse_one_record(record) + end + else + yield nil, nil end - yield time, record + rescue @error_class, EncodingError # EncodingError is for oj 3.x or later yield nil, nil end + def parse_one_record(record) + time = parse_time(record) + convert_values(time, record) + end + def parser_type :text end diff --git a/test/plugin/test_parser_json.rb b/test/plugin/test_parser_json.rb index 19c45402d1..2c90cc71b8 100644 --- a/test/plugin/test_parser_json.rb +++ b/test/plugin/test_parser_json.rb @@ -135,4 +135,60 @@ def test_yajl_parse_io_with_buffer_smaller_than_input end end end + + sub_test_case "various record pattern" do + data("Only string", { record: '"message"', expected: [nil] }, keep: true) + data("Only string without quotation", { record: "message", expected: [nil] }, keep: true) + data("Only number", { record: "0", expected: [nil] }, keep: true) + data( + "Array of Hash", + { + record: '[{"k1": 1}, {"k2": 2}]', + expected: [{"k1" => 1}, {"k2" => 2}] + }, + keep: true, + ) + data( + "Array of both Hash and invalid", + { + record: '[{"k1": 1}, "string", {"k2": 2}, 0]', + expected: [{"k1" => 1}, nil, {"k2" => 2}, nil] + }, + keep: true, + ) + data( + "Array of all invalid", + { + record: '["string", 0, [{"k": 0}]]', + expected: [nil, nil, nil] + }, + keep: true, + ) + def test_oj(data) + i = 0 + @parser.configure('json_parser' => "oj") + @parser.instance.parse(data[:record]) { |time, record| + assert_equal(data[:expected][i], record) + i += 1 + } + end + + def test_yajl(data) + i = 0 + @parser.configure('json_parser' => "yajl") + @parser.instance.parse(data[:record]) { |time, record| + assert_equal(data[:expected][i], record) + i += 1 + } + end + + def test_json(json) + i = 0 + @parser.configure('json_parser' => "json") + @parser.instance.parse(data[:record]) { |time, record| + assert_equal(data[:expected][i], record) + i += 1 + } + end + end end