Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 84 additions & 103 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,6 @@ Status NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc
size_t size = 0;
simdjson::error_code error;
size_t num_rows = block.rows();

try {
// step1: get and parse buf to get json doc
RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error));
Expand All @@ -1067,25 +1066,19 @@ Status NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc
return Status::OK();
}

for (_json_stream_iterator = _json_stream.begin();
_json_stream_iterator != _json_stream.end(); ++_json_stream_iterator) {
if (_json_stream_iterator.current_index() >= _original_doc_size) {
break;
}
// step2: get json value by json doc
Status st = _get_json_value(&size, eof, &error, is_empty_row);
if (st.is<DATA_QUALITY_ERROR>()) {
return Status::OK();
}
RETURN_IF_ERROR(st);
if (*is_empty_row || *eof) {
return Status::OK();
}

// step 3: write columns by json value
RETURN_IF_ERROR(_simdjson_handle_simple_json_write_columns(block, slot_descs,
is_empty_row, eof));
// step2: get json value by json doc
Status st = _get_json_value(&size, eof, &error, is_empty_row);
if (st.is<DATA_QUALITY_ERROR>()) {
return Status::OK();
}
RETURN_IF_ERROR(st);
if (*is_empty_row || *eof) {
return Status::OK();
}

// step 3: write columns by json value
RETURN_IF_ERROR(
_simdjson_handle_simple_json_write_columns(block, slot_descs, is_empty_row, eof));
} catch (simdjson::simdjson_error& e) {
RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (*_scanner_eof) {
Expand Down Expand Up @@ -1175,25 +1168,19 @@ Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
return Status::OK();
}

for (_json_stream_iterator = _json_stream.begin();
_json_stream_iterator != _json_stream.end(); ++_json_stream_iterator) {
if (_json_stream_iterator.current_index() >= _original_doc_size) {
break;
}
// step2: get json value by json doc
Status st = _get_json_value(&size, eof, &error, is_empty_row);
if (st.is<DATA_QUALITY_ERROR>()) {
return Status::OK();
}
RETURN_IF_ERROR(st);
if (*is_empty_row) {
return Status::OK();
}

// step 3: write columns by json value
RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(
block, slot_descs, is_empty_row, eof));
// step2: get json value by json doc
Status st = _get_json_value(&size, eof, &error, is_empty_row);
if (st.is<DATA_QUALITY_ERROR>()) {
return Status::OK();
}
RETURN_IF_ERROR(st);
if (*is_empty_row) {
return Status::OK();
}

// step 3: write columns by json value
RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(block, slot_descs,
is_empty_row, eof));
} catch (simdjson::simdjson_error& e) {
RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (*_scanner_eof) {
Expand Down Expand Up @@ -1269,26 +1256,20 @@ Status NewJsonReader::_simdjson_handle_nested_complex_json(
RuntimeState* /*state*/, Block& block, const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof) {
// nested complex json
size_t num_rows = block.rows();
simdjson::ondemand::object cur;
size_t size = 0;
simdjson::error_code error;

try {
RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error));
if (size == 0 || *eof) {
*is_empty_row = true;
return Status::OK();
}

for (_json_stream_iterator = _json_stream.begin();
_json_stream_iterator != _json_stream.end(); ++_json_stream_iterator) {
if (_json_stream_iterator.current_index() >= _original_doc_size) {
break;
while (true) {
size_t num_rows = block.rows();
simdjson::ondemand::object cur;
size_t size = 0;
simdjson::error_code error;
try {
RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error));
if (size == 0 || *eof) {
*is_empty_row = true;
return Status::OK();
}
Status st = _get_json_value(&size, eof, &error, is_empty_row);
if (st.is<DATA_QUALITY_ERROR>()) {
return Status::OK();
continue; // continue to read next
}
RETURN_IF_ERROR(st);
if (*is_empty_row) {
Expand Down Expand Up @@ -1318,14 +1299,16 @@ Status NewJsonReader::_simdjson_handle_nested_complex_json(
// so that the caller will continue reading next line.
*is_empty_row = true;
}
}
} catch (simdjson::simdjson_error& e) {
RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (*_scanner_eof) {
// When _scanner_eof is true and valid is false, it means that we have encountered
// unqualified data and decided to stop the scan.
*is_empty_row = true;
return Status::OK();
break; // read a valid row
} catch (simdjson::simdjson_error& e) {
RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (*_scanner_eof) {
// When _scanner_eof is true and valid is false, it means that we have encountered
// unqualified data and decided to stop the scan.
*is_empty_row = true;
return Status::OK();
}
continue;
}
}
return Status::OK();
Expand Down Expand Up @@ -1532,13 +1515,14 @@ Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row, boo
return Status::OK();
}

// step2: init json stream iterate.
// For efficiency reasons, simdjson requires a string with a few bytes (simdjson::SIMDJSON_PADDING) at the end.
_simdjson_ondemand_padding_buffer.clear();
_padded_size = *size + simdjson::SIMDJSON_PADDING;
_simdjson_ondemand_padding_buffer.resize(_padded_size);
_simdjson_ondemand_unscape_padding_buffer.resize(_padded_size);

// step2: init json parser iterate.
if (*size + simdjson::SIMDJSON_PADDING > _padded_size) {
// For efficiency reasons, simdjson requires a string with a few bytes (simdjson::SIMDJSON_PADDING) at the end.
// Hence, a re-allocation is needed if the space is not enough.
_simdjson_ondemand_padding_buffer.resize(*size + simdjson::SIMDJSON_PADDING);
_simdjson_ondemand_unscape_padding_buffer.resize(*size + simdjson::SIMDJSON_PADDING);
_padded_size = *size + simdjson::SIMDJSON_PADDING;
}
// trim BOM since simdjson does not handle UTF-8 Unicode (with BOM)
if (*size >= 3 && static_cast<char>(_json_str[0]) == '\xEF' &&
static_cast<char>(_json_str[1]) == '\xBB' && static_cast<char>(_json_str[2]) == '\xBF') {
Expand All @@ -1548,16 +1532,10 @@ Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row, boo
}
memcpy(&_simdjson_ondemand_padding_buffer.front(), _json_str, *size);
_original_doc_size = *size;

*error = _ondemand_json_parser->iterate_many(_simdjson_ondemand_padding_buffer)
.get(_json_stream);
if (*error != simdjson::error_code::SUCCESS) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}",
*error, simdjson::error_message(*error));
return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof);
}

*error = _ondemand_json_parser
->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size),
_padded_size)
.get(_original_json_doc);
return Status::OK();
}

Expand All @@ -1579,40 +1557,43 @@ Status NewJsonReader::_judge_empty_row(size_t size, bool eof, bool* is_empty_row
return Status::OK();
}

Status NewJsonReader::_return_quality_error(fmt::memory_buffer& error_msg,
const std::string& doc_info, bool* eof) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return doc_info; },
[&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof));
_counter->num_rows_filtered++;
if (*_scanner_eof) {
// Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means
// we meet enough invalid rows and the scanner should be stopped.
// So we set eof to true and return OK, the caller will stop the process as we meet the end of file.
*eof = true;
return Status::OK();
}
return Status::DataQualityError(fmt::to_string(error_msg));
}

Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_code* error,
bool* is_empty_row) {
SCOPED_TIMER(_file_read_timer);
_original_json_doc = (*_json_stream_iterator).value();

auto return_quality_error = [&](fmt::memory_buffer& error_msg,
const std::string& doc_info) -> Status {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return doc_info; },
[&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof));
_counter->num_rows_filtered++;
if (*_scanner_eof) {
// Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means
// we meet enough invalid rows and the scanner should be stopped.
// So we set eof to true and return OK, the caller will stop the process as we meet the end of file.
*eof = true;
return Status::OK();
}
return Status::DataQualityError(fmt::to_string(error_msg));
};
if (*error != simdjson::error_code::SUCCESS) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}",
*error, simdjson::error_message(*error));
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
auto type_res = _original_json_doc.type();
if (type_res.error() != simdjson::error_code::SUCCESS) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}",
type_res.error(), simdjson::error_message(type_res.error()));
return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof);
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
simdjson::ondemand::json_type type = type_res.value();
if (type != simdjson::ondemand::json_type::object &&
type != simdjson::ondemand::json_type::array) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Not an json object or json array");
return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof);
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
if (!_parsed_json_root.empty() && type == simdjson::ondemand::json_type::object) {
try {
Expand All @@ -1624,13 +1605,13 @@ Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_c
if (!st.ok()) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}", st.to_string());
return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof);
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
} catch (simdjson::simdjson_error& e) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Encounter error while extract_from_object, error: {}",
e.what());
return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof);
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
} else {
_json_value = _original_json_doc;
Expand All @@ -1640,14 +1621,14 @@ Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_c
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}",
"JSON data is array-object, `strip_outer_array` must be TRUE.");
return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof);
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}

if (_json_value.type() != simdjson::ondemand::json_type::array && _strip_outer_array) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}",
"JSON data is not an array-object, `strip_outer_array` must be FALSE.");
return _return_quality_error(error_msg, std::string((char*)_json_str, *size), eof);
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
RETURN_IF_ERROR(_judge_empty_row(*size, *eof, is_empty_row));
return Status::OK();
Expand Down
6 changes: 1 addition & 5 deletions be/src/vec/exec/format/json/new_json_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ class NewJsonReader : public GenericReader {
Status _simdjson_init_reader();
Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
simdjson::error_code* error);
Status _return_quality_error(fmt::memory_buffer& error_msg, const std::string& doc_info,
bool* eof);
Status _get_json_value(size_t* size, bool* eof, simdjson::error_code* error,
bool* is_empty_row);
Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row);
Expand Down Expand Up @@ -275,10 +273,8 @@ class NewJsonReader : public GenericReader {
std::string _simdjson_ondemand_padding_buffer;
std::string _simdjson_ondemand_unscape_padding_buffer;
// char _simdjson_ondemand_padding_buffer[_padded_size];
simdjson::ondemand::document_reference _original_json_doc;
simdjson::ondemand::document _original_json_doc;
simdjson::ondemand::value _json_value;
simdjson::ondemand::document_stream _json_stream;
simdjson::ondemand::document_stream::iterator _json_stream_iterator;
// for strip outer array
// array_iter pointed to _array
simdjson::ondemand::array_iterator _array_iter;
Expand Down

This file was deleted.

5 changes: 0 additions & 5 deletions regression-test/data/load_p0/stream_load/test_json_load.out
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,6 @@ John 30 New York {"email":"john@example.com","phone":"+1-123-456-7890"}
android \N \N \N \N \N
android \N \N \N \N \N

-- !iterate_read_json --
Name1 21 5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
Name2 21 5fbfefd2-ea1c-44fd-bc54-6eb2582e1525
Name3 21 5fbfefd2-ea1c-44fd-bc54-6eb2582e1525

-- !select28 --
test k2_value

14 changes: 1 addition & 13 deletions regression-test/suites/load_p0/stream_load/test_json_load.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,7 @@ suite("test_json_load", "p0") {
assertEquals("${reason}", "${out}")
}
}

} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
Expand Down Expand Up @@ -822,20 +823,7 @@ suite("test_json_load", "p0") {
assertEquals("${reason}", "${out}")
}
}
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}

// iterate read json when read_json_by_line = false
try {
sql "DROP TABLE IF EXISTS ${testTable}"

create_json_test_table.call(testTable)
def test_load_label = UUID.randomUUID().toString().replaceAll("-", "")
load_json_data.call("${testTable}", test_load_label, 'false', 'false', 'json', '', '', '', '', '', 'iterate_read_json.json')
sql "sync"

qt_iterate_read_json "select * from ${testTable} order by name"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
Expand Down