diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 2f5b3804886a77..5853d034769f7b 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1120,6 +1120,7 @@ DEFINE_mBool(variant_use_cloud_schema_dict_cache, "true"); DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048"); DEFINE_mInt32(variant_max_json_key_length, "255"); DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false"); +DEFINE_mBool(variant_enable_duplicate_json_path_check, "false"); DEFINE_mBool(enable_vertical_compact_variant_subcolumns, "true"); DEFINE_Validator(variant_max_json_key_length, diff --git a/be/src/common/config.h b/be/src/common/config.h index db3a904c4a006c..fc65e3ce68d123 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1374,6 +1374,8 @@ DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column); DECLARE_mInt32(variant_max_json_key_length); // Treat invalid json format str as string, instead of throwing exception if false DECLARE_mBool(variant_throw_exeception_on_invalid_json); +// Enable duplicate path check when parsing json into variant subcolumns/jsonb. +DECLARE_mBool(variant_enable_duplicate_json_path_check); // Enable vertical compact subcolumns of variant column DECLARE_mBool(enable_vertical_compact_variant_subcolumns); diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index b8a434d5aadaf9..b35d4764d2f0a9 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -105,6 +105,7 @@ Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block) vectorized::ParseConfig config; config.enable_flatten_nested = _context.tablet_schema->variant_flatten_nested(); + config.check_duplicate_json_path = config::variant_enable_duplicate_json_path_check; RETURN_IF_ERROR( vectorized::schema_util::parse_variant_columns(block, variant_column_pos, config)); return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 7cffa35cc0fc07..f72cedb19ce405 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -104,6 +104,7 @@ Status parse_variant_columns_in_block(vectorized::Block& block, const TabletSche } vectorized::ParseConfig config; config.enable_flatten_nested = tablet_schema.variant_flatten_nested(); + config.check_duplicate_json_path = config::variant_enable_duplicate_json_path_check; return vectorized::schema_util::parse_variant_columns(block, variant_column_pos, config); } diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 52a7cde37f90b5..73e7d54cf6ae2a 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -109,6 +109,7 @@ Status parse_variant_columns_in_block(vectorized::Block& block, const TabletSche } vectorized::ParseConfig config; config.enable_flatten_nested = tablet_schema.variant_flatten_nested(); + config.check_duplicate_json_path = config::variant_enable_duplicate_json_path_check; return vectorized::schema_util::parse_variant_columns(block, variant_column_pos, config); } diff --git a/be/src/vec/data_types/serde/data_type_variant_serde.cpp b/be/src/vec/data_types/serde/data_type_variant_serde.cpp index 36e69cfd6e3d19..5370006c6322f4 100644 --- a/be/src/vec/data_types/serde/data_type_variant_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_variant_serde.cpp @@ -21,6 +21,7 @@ #include #include "common/cast_set.h" +#include "common/config.h" #include "common/exception.h" #include "common/status.h" #include "runtime/jsonb_value.h" @@ -110,10 +111,11 @@ Status DataTypeVariantSerDe::serialize_one_cell_to_json(const IColumn& column, i Status DataTypeVariantSerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice, const FormatOptions& options) const { - vectorized::ParseConfig config; + vectorized::ParseConfig parse_config; + parse_config.check_duplicate_json_path = config::variant_enable_duplicate_json_path_check; auto parser = parsers_pool.get([] { return new JsonParser(); }); RETURN_IF_CATCH_EXCEPTION( - parse_json_to_variant(column, slice.data, slice.size, parser.get(), config)); + parse_json_to_variant(column, slice.data, slice.size, parser.get(), parse_config)); return Status::OK(); } diff --git a/be/src/vec/json/json_parser.cpp b/be/src/vec/json/json_parser.cpp index eb4d6c5e2b5fee..45f02140d80ce5 100644 --- a/be/src/vec/json/json_parser.cpp +++ b/be/src/vec/json/json_parser.cpp @@ -45,6 +45,7 @@ std::optional JSONDataParser::parse(const char* begin, } ParseContext context; context.enable_flatten_nested = config.enable_flatten_nested; + context.check_duplicate_json_path = config.check_duplicate_json_path; context.is_top_array = document.isArray(); traverse(document, context); ParseResult result; @@ -74,19 +75,34 @@ void JSONDataParser::traverse(const Element& element, ParseContext& // Parse nested arrays to JsonbField JsonbWriter writer; traverseArrayAsJsonb(element.getArray(), writer); - ctx.paths.push_back(ctx.builder.get_parts()); - ctx.values.push_back(Field::create_field( - JsonbField(writer.getOutput()->getBuffer(), writer.getOutput()->getSize()))); + appendValueIfNotDuplicate( + ctx, ctx.builder.get_parts(), + Field::create_field(JsonbField(writer.getOutput()->getBuffer(), + writer.getOutput()->getSize()))); } else { traverseArray(element.getArray(), ctx); } // we should set has_nested_in_flatten to false when traverse array finished for next array otherwise it will be true for next array ctx.has_nested_in_flatten = false; } else { - ctx.paths.push_back(ctx.builder.get_parts()); - ctx.values.push_back(getValueAsField(element)); + appendValueIfNotDuplicate(ctx, ctx.builder.get_parts(), getValueAsField(element)); } } + +template +void JSONDataParser::appendValueIfNotDuplicate(ParseContext& ctx, + const PathInData::Parts& path, + Field&& value) { + if (ctx.check_duplicate_json_path) { + PathInData path_in_data(path); + if (!ctx.visited_path_names.emplace(path_in_data.get_path()).second) { + return; + } + } + ctx.paths.push_back(path); + ctx.values.push_back(std::move(value)); +} + template void JSONDataParser::traverseObject(const JSONObject& object, ParseContext& ctx) { ctx.paths.reserve(ctx.paths.size() + object.size()); @@ -178,6 +194,7 @@ void JSONDataParser::traverseArray(const JSONArray& array, ParseCont ParseArrayContext array_ctx; array_ctx.has_nested_in_flatten = ctx.has_nested_in_flatten; array_ctx.is_top_array = ctx.is_top_array; + array_ctx.check_duplicate_json_path = ctx.check_duplicate_json_path; array_ctx.total_size = array.size(); for (auto it = array.begin(); it != array.end(); ++it) { traverseArrayElement(*it, array_ctx); @@ -185,16 +202,17 @@ void JSONDataParser::traverseArray(const JSONArray& array, ParseCont } auto&& arrays_by_path = array_ctx.arrays_by_path; if (arrays_by_path.empty()) { - ctx.paths.push_back(ctx.builder.get_parts()); - ctx.values.push_back(Field::create_field(Array())); + appendValueIfNotDuplicate(ctx, ctx.builder.get_parts(), + Field::create_field(Array())); } else { ctx.paths.reserve(ctx.paths.size() + arrays_by_path.size()); ctx.values.reserve(ctx.values.size() + arrays_by_path.size()); for (auto it = arrays_by_path.begin(); it != arrays_by_path.end(); ++it) { auto&& [path, path_array] = it->second; /// Merge prefix path and path of array element. - ctx.paths.push_back(ctx.builder.append(path, true).get_parts()); - ctx.values.push_back(Field::create_field(std::move(path_array))); + ctx.builder.append(path, true); + appendValueIfNotDuplicate(ctx, ctx.builder.get_parts(), + Field::create_field(std::move(path_array))); ctx.builder.pop_back(path.size()); } } @@ -206,10 +224,12 @@ void JSONDataParser::traverseArrayElement(const Element& element, ParseContext element_ctx; element_ctx.has_nested_in_flatten = ctx.has_nested_in_flatten; element_ctx.is_top_array = ctx.is_top_array; + element_ctx.check_duplicate_json_path = ctx.check_duplicate_json_path; traverse(element, element_ctx); - auto& [_, paths, values, flatten_nested, __, is_top_array] = element_ctx; + auto& paths = element_ctx.paths; + auto& values = element_ctx.values; - if (element_ctx.has_nested_in_flatten && is_top_array) { + if (element_ctx.has_nested_in_flatten && element_ctx.is_top_array) { checkAmbiguousStructure(ctx, paths); } @@ -231,7 +251,7 @@ void JSONDataParser::traverseArrayElement(const Element& element, } } - if (keys_to_update && !(is_top_array && ctx.has_nested_in_flatten)) { + if (keys_to_update && !(element_ctx.is_top_array && ctx.has_nested_in_flatten)) { fillMissedValuesInArrays(ctx); } } diff --git a/be/src/vec/json/json_parser.h b/be/src/vec/json/json_parser.h index ae5b937ccee1d9..98e71b31029a48 100644 --- a/be/src/vec/json/json_parser.h +++ b/be/src/vec/json/json_parser.h @@ -101,6 +101,7 @@ void writeValueAsJsonb(const Element& element, JsonbWriter& writer) { struct ParseConfig { bool enable_flatten_nested = false; + bool check_duplicate_json_path = false; }; /// Result of parsing of a document. /// Contains all paths extracted from document @@ -122,7 +123,9 @@ class JSONDataParser { PathInDataBuilder builder; std::vector paths; std::vector values; + phmap::flat_hash_set visited_path_names; bool enable_flatten_nested = false; + bool check_duplicate_json_path = false; bool has_nested_in_flatten = false; bool is_top_array = false; }; @@ -136,10 +139,12 @@ class JSONDataParser { KeyToSizes nested_sizes_by_key; bool has_nested_in_flatten = false; bool is_top_array = false; + bool check_duplicate_json_path = false; }; void traverse(const Element& element, ParseContext& ctx); void traverseObject(const JSONObject& object, ParseContext& ctx); void traverseArray(const JSONArray& array, ParseContext& ctx); + void appendValueIfNotDuplicate(ParseContext& ctx, const PathInData::Parts& path, Field&& value); void traverseArrayElement(const Element& element, ParseArrayContext& ctx); void checkAmbiguousStructure(const ParseArrayContext& ctx, const std::vector& paths); diff --git a/be/src/vec/json/parse2column.cpp b/be/src/vec/json/parse2column.cpp index 607548117327ef..1704e7796dc3ab 100644 --- a/be/src/vec/json/parse2column.cpp +++ b/be/src/vec/json/parse2column.cpp @@ -165,24 +165,49 @@ void parse_json_to_variant(IColumn& column, const char* src, size_t length, check_paths.insert(check_paths.end(), paths.begin(), paths.end()); THROW_IF_ERROR(vectorized::schema_util::check_variant_has_no_ambiguous_paths(check_paths)); } - for (size_t i = 0; i < paths.size(); ++i) { - FieldInfo field_info; - schema_util::get_field_info(values[i], &field_info); - if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) { - continue; + + auto is_plain_path = [](const PathInData& path) { + for (const auto& part : path.get_parts()) { + if (part.is_nested || part.anonymous_array_level != 0) { + return false; + } } - if (column_variant.get_subcolumn(paths[i], i) == nullptr) { - if (paths[i].has_nested_part()) { - column_variant.add_nested_subcolumn(paths[i], field_info, old_num_rows); + return true; + }; + + auto get_or_create_subcolumn = [&](const PathInData& path, size_t index_hint, + const FieldInfo& field_info) -> ColumnVariant::Subcolumn* { + auto* subcolumn = column_variant.get_subcolumn(path, index_hint); + if (subcolumn == nullptr) { + if (path.has_nested_part()) { + column_variant.add_nested_subcolumn(path, field_info, old_num_rows); } else { - column_variant.add_sub_column(paths[i], old_num_rows); + column_variant.add_sub_column(path, old_num_rows); } + subcolumn = column_variant.get_subcolumn(path, index_hint); } - auto* subcolumn = column_variant.get_subcolumn(paths[i], i); if (!subcolumn) { throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to find sub column {}", - paths[i].get_path()); + path.get_path()); + } + return subcolumn; + }; + + auto normalize_plain_path = [&](const PathInData& path) { + if (!config.check_duplicate_json_path || path.empty() || !is_plain_path(path)) { + return path; + } + return PathInData(path.get_path()); + }; + + for (size_t i = 0; i < paths.size(); ++i) { + FieldInfo field_info; + schema_util::get_field_info(values[i], &field_info); + if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) { + continue; } + auto path = normalize_plain_path(paths[i]); + auto* subcolumn = get_or_create_subcolumn(path, i, field_info); if (subcolumn->cur_num_of_defaults() > 0) { subcolumn->insert_many_defaults(subcolumn->cur_num_of_defaults()); subcolumn->reset_current_num_of_defaults(); @@ -190,7 +215,7 @@ void parse_json_to_variant(IColumn& column, const char* src, size_t length, if (subcolumn->size() != old_num_rows) { throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "subcolumn {} size missmatched, may contains duplicated entry", - paths[i].get_path()); + path.get_path()); } subcolumn->insert(std::move(values[i]), std::move(field_info)); } diff --git a/be/test/vec/common/schema_util_test.cpp b/be/test/vec/common/schema_util_test.cpp index 6339372ca1f435..cabaea7a87ddba 100644 --- a/be/test/vec/common/schema_util_test.cpp +++ b/be/test/vec/common/schema_util_test.cpp @@ -731,6 +731,48 @@ TEST_F(SchemaUtilTest, TestParseVariantColumns) { EXPECT_TRUE(obj_column.is_scalar_variant()); } +TEST_F(SchemaUtilTest, TestParseVariantColumnsDuplicateJsonPathCheck) { + Block block; + + auto variant_type = std::make_shared(10); + auto variant_column = ColumnVariant::create(10); + auto root_column = ColumnString::create(); + root_column->insert( + vectorized::Field::create_field(R"({"a":123,"a":"123"})")); + root_column->insert(vectorized::Field::create_field( + R"({"a.b":1,"a":{"b":2}})")); + root_column->insert(vectorized::Field::create_field( + R"({"a":{"b":3},"a.b":4})")); + variant_column->create_root(std::make_shared(), root_column->get_ptr()); + + block.insert({variant_column->get_ptr(), variant_type, "variant_col"}); + + ParseConfig config; + config.check_duplicate_json_path = true; + auto status = schema_util::parse_variant_columns(block, {0}, config); + ASSERT_TRUE(status.ok()) << status.to_string(); + + const auto& result_column = assert_cast(*block.get_by_position(0).column); + ASSERT_TRUE(result_column.sanitize().ok()); + + const auto* sub_a = result_column.get_subcolumn(PathInData("a")); + const auto* sub_ab = result_column.get_subcolumn(PathInData("a.b")); + ASSERT_NE(sub_a, nullptr); + ASSERT_NE(sub_ab, nullptr); + + FieldWithDataType field; + sub_a->get(0, field); + EXPECT_EQ(field.field.get_type(), PrimitiveType::TYPE_BIGINT); + EXPECT_EQ(field.field.get(), 123); + + sub_ab->get(1, field); + EXPECT_EQ(field.field.get_type(), PrimitiveType::TYPE_BIGINT); + EXPECT_EQ(field.field.get(), 1); + sub_ab->get(2, field); + EXPECT_EQ(field.field.get_type(), PrimitiveType::TYPE_BIGINT); + EXPECT_EQ(field.field.get(), 3); +} + TEST_F(SchemaUtilTest, TestGetLeastCommonSchema) { // Create test schemas TabletSchemaPB schema1_pb; diff --git a/be/test/vec/jsonb/json_parser_test.cpp b/be/test/vec/jsonb/json_parser_test.cpp index e4790f6786c16a..b5e362361e2e3d 100644 --- a/be/test/vec/jsonb/json_parser_test.cpp +++ b/be/test/vec/jsonb/json_parser_test.cpp @@ -219,6 +219,38 @@ TEST(JsonParserTest, ParseCornerCases) { ASSERT_TRUE(result.has_value()); } +TEST(JsonParserTest, ParseDuplicateJsonPathCheckKeepsFirstLeafValue) { + JSONDataParser parser; + ParseConfig config; + config.check_duplicate_json_path = true; + + std::string json = R"({"a":123,"a":"123","b":1})"; + auto result = parser.parse(json.c_str(), json.size(), config); + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result->paths.size(), 2); + ASSERT_EQ(result->values.size(), 2); + EXPECT_EQ(result->paths[0].get_path(), "a"); + EXPECT_EQ(result->values[0].get_type(), doris::PrimitiveType::TYPE_BIGINT); + EXPECT_EQ(result->values[0].get(), 123); + EXPECT_EQ(result->paths[1].get_path(), "b"); +} + +TEST(JsonParserTest, ParseDuplicateJsonPathCheckNormalizesDottedAndNestedPaths) { + JSONDataParser parser; + ParseConfig config; + config.check_duplicate_json_path = true; + + std::string json = R"({"a.b":1,"a":{"b":2},"a":{"c":3}})"; + auto result = parser.parse(json.c_str(), json.size(), config); + ASSERT_TRUE(result.has_value()); + ASSERT_EQ(result->paths.size(), 2); + ASSERT_EQ(result->values.size(), 2); + EXPECT_EQ(result->paths[0].get_path(), "a.b"); + EXPECT_EQ(result->values[0].get(), 1); + EXPECT_EQ(result->paths[1].get_path(), "a.c"); + EXPECT_EQ(result->values[1].get(), 3); +} + // Test cases for the selected code functionality TEST(JsonParserTest, TestIsPrefixFunction) { JSONDataParser parser; diff --git a/regression-test/data/variant_p0/duplicate_json_path.json b/regression-test/data/variant_p0/duplicate_json_path.json new file mode 100644 index 00000000000000..e065c9b23314b6 --- /dev/null +++ b/regression-test/data/variant_p0/duplicate_json_path.json @@ -0,0 +1,7 @@ +{"k":8,"v":{"a":42,"a":{"b":42}}} +{"k":9,"v":{"a":123,"a":"123"}} +{"k":10,"v":{"a.b":8,"a":{"b":9}}} +{"k":11,"v":{"a":{"b":10},"a.b":11}} +{"k":12,"v":{"a":{"b":11},"a":{"c":12}}} +{"k":13,"v":{"a":[13],"a":14}} +{"k":14,"v":{"a":14,"a":[13]}} diff --git a/regression-test/suites/variant_p0/duplicate_json_path.groovy b/regression-test/suites/variant_p0/duplicate_json_path.groovy new file mode 100644 index 00000000000000..0c6802f461e7d9 --- /dev/null +++ b/regression-test/suites/variant_p0/duplicate_json_path.groovy @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("duplicate_json_path", "p0") { + def customBeConfig = [ + variant_enable_duplicate_json_path_check: true + ] + setBeConfigTemporary(customBeConfig) { + sql "DROP TABLE IF EXISTS duplicate_json_path" + sql """ + CREATE TABLE duplicate_json_path ( + k int, + v variant + ) + DUPLICATE KEY(k) + DISTRIBUTED BY HASH(k) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "2000", + "disable_auto_compaction" = "true" + ); + """ + + sql """insert into duplicate_json_path values (1, '{"a":42,"a":{"b":42}}')""" + sql """insert into duplicate_json_path values (2, '{"a" : 123, "a" : "123"}')""" + sql """insert into duplicate_json_path values (3, '{"a.b":1,"a":{"b":2}}')""" + sql """insert into duplicate_json_path values (4, '{"a":{"b":3},"a.b":4}')""" + sql """insert into duplicate_json_path values (5, '{"a":{"b":5},"a":{"c":6}}')""" + sql """insert into duplicate_json_path values (6, '{"a":[1],"a":2}')""" + sql """insert into duplicate_json_path values (7, '{"a":2,"a":[1]}')""" + + streamLoad { + table "duplicate_json_path" + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'group_commit', 'async_mode' + unset 'label' + file 'duplicate_json_path.json' + time 10000 + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(7, json.NumberTotalRows) + assertEquals(7, json.NumberLoadedRows) + } + } + + for (int i = 0; i < 30; i++) { + def count = sql "select count(*) from duplicate_json_path" + if (count[0][0] == 14) { + break + } + sleep(1000) + } + def totalRows = sql "select count(*) from duplicate_json_path" + assertEquals(14, totalRows[0][0]) + + // When duplicate path check is enabled, duplicate Variant paths keep the first value. + def expectedResult = [ + [1, "{\"b\":42}", "42", null], + [2, "123", null, null], + [3, "{\"b\":1}", "1", null], + [4, "{\"b\":3}", "3", null], + [5, "{\"b\":5,\"c\":6}", "5", "6"], + [6, "[1]", null, null], + [7, "2", null, null], + [8, "{\"b\":42}", "42", null], + [9, "123", null, null], + [10, "{\"b\":8}", "8", null], + [11, "{\"b\":10}", "10", null], + [12, "{\"b\":11,\"c\":12}", "11", "12"], + [13, "[13]", null, null], + [14, "14", null, null] + ] + + def queryResult = { + sql """ + select k, cast(v['a'] as string), cast(v['a']['b'] as string), cast(v['a']['c'] as string) + from duplicate_json_path + order by k + """ + } + assertEquals(expectedResult, queryResult()) + + trigger_and_wait_compaction("duplicate_json_path", "full") + assertEquals(expectedResult, queryResult()) + } +}