Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure JSON parser returns Hash #4106

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/fluent/plugin/filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ def filter_with_time(tag, time, record)
end
@accessor.delete(record) if @remove_key_name_field
r = handle_parsed(tag, record, t, values)
# Note: If the parser returns multiple records from one raw_value,
# this returns only the first one record.
# Essentially, we should use `yield` here, but it is difficult with
# the current Filter structure.
return t, r
else
if @emit_invalid_record_to_error
Expand Down
76 changes: 24 additions & 52 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -203,54 +203,24 @@ def on_request(path_info, params)
begin
path = path_info[1..-1] # remove /
tag = path.split('/').join('.')
record_time, record = parse_params(params)

# Skip nil record
if record.nil?
log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" }
if @respond_with_empty_img
return RESPONSE_IMG
else
if @use_204_response
return RESPONSE_204
else
return RESPONSE_200
end
mes = Fluent::MultiEventStream.new
parse_params(params) do |record_time, record|
if record.nil?
log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" }
next
end
end

mes = nil
# Support batched requests
if record.is_a?(Array)
mes = Fluent::MultiEventStream.new
record.each do |single_record|
add_params_to_record(single_record, params)

if param_time = params['time']
param_time = param_time.to_f
single_time = param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
elsif @custom_parser
single_time = @custom_parser.parse_time(single_record)
single_time, single_record = @custom_parser.convert_values(single_time, single_record)
ashie marked this conversation as resolved.
Show resolved Hide resolved
else
single_time = convert_time_field(single_record)
end

mes.add(single_time, single_record)
end
else
add_params_to_record(record, params)

time = if param_time = params['time']
param_time = param_time.to_f
param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
else
if record_time.nil?
convert_time_field(record)
else
record_time
end
record_time.nil? ? convert_time_field(record) : record_time
end

mes.add(time, record)
end
rescue => e
if @dump_error_log
Expand All @@ -261,11 +231,7 @@ def on_request(path_info, params)

# TODO server error
begin
if mes
router.emit_stream(tag, mes)
else
router.emit(tag, time, record)
end
router.emit_stream(tag, mes) unless mes.empty?
rescue => e
if @dump_error_log
log.error "failed to emit data", error: e
Expand Down Expand Up @@ -308,31 +274,37 @@ def on_server_connect(conn)
def parse_params_default(params)
if msgpack = params['msgpack']
@parser_msgpack.parse(msgpack) do |_time, record|
return nil, record
if record.is_a?(Array)
# TODO: Temporarily supporting this case for compatibility.
# We should not consider this case here.
# We should fix MessagePackParser so that it doesn't return Array.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you do this in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it should be summarized as fixing the JSON parser.
Since there is no time before the next release, I also wanted to keep the scope of impact as small as possible.
About in_http, I had no choice but to fix it in this PR.

If it looks like I should fix it in this PR, I will try today or tomorrow.
Or I can try to fix it in another PR after this PR is merged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be done in this PR instead of adding such workaround, since it's considered as a same problem and sometimes such workaround lives long unexpectedly.

I’ll postpone merging this to v1.17.0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review. I Understood.
My understanding was that the existing workaround was minimized here, not adding a new workaround.
But certainly, if it is not fixed entirely, it may lead to unnecessary problems.
Especially for MessagePackParser as @custom_parser.

record.each do |single_record|
yield nil, single_record
end
else
yield nil, record
end
end
elsif js = params['json']
@parser_json.parse(js) do |_time, record|
return nil, record
yield nil, record
end
elsif ndjson = params['ndjson']
events = []
ndjson.split(/\r?\n/).each do |js|
@parser_json.parse(js) do |_time, record|
events.push(record)
yield nil, record
end
end
return nil, events
else
raise "'json', 'ndjson' or 'msgpack' parameter is required"
end
end

def parse_params_with_parser(params)
if content = params[EVENT_RECORD_PARAMETER]
@custom_parser.parse(content) { |time, record|
raise "Received event is not #{@format_name}: #{content}" if record.nil?
Copy link
Contributor Author

@daipom daipom Mar 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explanation of the diff

There did not seem to be any particular reason for raise only here.
We should align with parse_params_default.

return time, record
}
@custom_parser.parse(content) do |time, record|
yield time, record
end
else
raise "'#{EVENT_RECORD_PARAMETER}' parameter is required"
end
Expand Down
25 changes: 20 additions & 5 deletions lib/fluent/plugin/parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,31 @@ def configure_json_parser(name)
end

def parse(text)
record = @load_proc.call(text)
time = parse_time(record)
if @execute_convert_values
time, record = convert_values(time, record)
parsed_json = @load_proc.call(text)

if parsed_json.is_a?(Hash)
yield parse_one_record(parsed_json)
elsif parsed_json.is_a?(Array)
parsed_json.each do |record|
unless record.is_a?(Hash)
yield nil, nil
next
end
yield parse_one_record(record)
end
else
yield nil, nil
end
yield time, record

rescue @error_class, EncodingError # EncodingError is for oj 3.x or later
yield nil, nil
end

def parse_one_record(record)
time = parse_time(record)
convert_values(time, record)
end

def parser_type
:text
end
Expand Down
56 changes: 56 additions & 0 deletions test/plugin/test_parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,60 @@ def test_yajl_parse_io_with_buffer_smaller_than_input
end
end
end

sub_test_case "various record pattern" do
data("Only string", { record: '"message"', expected: [nil] }, keep: true)
data("Only string without quotation", { record: "message", expected: [nil] }, keep: true)
data("Only number", { record: "0", expected: [nil] }, keep: true)
data(
"Array of Hash",
{
record: '[{"k1": 1}, {"k2": 2}]',
expected: [{"k1" => 1}, {"k2" => 2}]
},
keep: true,
)
data(
"Array of both Hash and invalid",
{
record: '[{"k1": 1}, "string", {"k2": 2}, 0]',
expected: [{"k1" => 1}, nil, {"k2" => 2}, nil]
},
keep: true,
)
data(
"Array of all invalid",
{
record: '["string", 0, [{"k": 0}]]',
expected: [nil, nil, nil]
},
keep: true,
)
def test_oj(data)
i = 0
@parser.configure('json_parser' => "oj")
@parser.instance.parse(data[:record]) { |time, record|
assert_equal(data[:expected][i], record)
i += 1
}
end

def test_yajl(data)
i = 0
@parser.configure('json_parser' => "yajl")
@parser.instance.parse(data[:record]) { |time, record|
assert_equal(data[:expected][i], record)
i += 1
}
end

def test_json(json)
i = 0
@parser.configure('json_parser' => "json")
@parser.instance.parse(data[:record]) { |time, record|
assert_equal(data[:expected][i], record)
i += 1
}
end
end
end