Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,7 @@ DEFINE_mBool(variant_use_cloud_schema_dict_cache, "true");
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048");
DEFINE_mInt32(variant_max_json_key_length, "255");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
DEFINE_mBool(variant_enable_duplicate_json_path_check, "false");
DEFINE_mBool(enable_vertical_compact_variant_subcolumns, "true");

DEFINE_Validator(variant_max_json_key_length,
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,8 @@ DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column);
DECLARE_mInt32(variant_max_json_key_length);
// Treat invalid json format str as string, instead of throwing exception if false
DECLARE_mBool(variant_throw_exeception_on_invalid_json);
// Enable duplicate path check when parsing json into variant subcolumns/jsonb.
DECLARE_mBool(variant_enable_duplicate_json_path_check);
// Enable vertical compact subcolumns of variant column
DECLARE_mBool(enable_vertical_compact_variant_subcolumns);

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block)

vectorized::ParseConfig config;
config.enable_flatten_nested = _context.tablet_schema->variant_flatten_nested();
config.check_duplicate_json_path = config::variant_enable_duplicate_json_path_check;
RETURN_IF_ERROR(
vectorized::schema_util::parse_variant_columns(block, variant_column_pos, config));
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Status parse_variant_columns_in_block(vectorized::Block& block, const TabletSche
}
vectorized::ParseConfig config;
config.enable_flatten_nested = tablet_schema.variant_flatten_nested();
config.check_duplicate_json_path = config::variant_enable_duplicate_json_path_check;
return vectorized::schema_util::parse_variant_columns(block, variant_column_pos, config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ Status parse_variant_columns_in_block(vectorized::Block& block, const TabletSche
}
vectorized::ParseConfig config;
config.enable_flatten_nested = tablet_schema.variant_flatten_nested();
config.check_duplicate_json_path = config::variant_enable_duplicate_json_path_check;
return vectorized::schema_util::parse_variant_columns(block, variant_column_pos, config);
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/data_types/serde/data_type_variant_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>

#include "common/cast_set.h"
#include "common/config.h"
#include "common/exception.h"
#include "common/status.h"
#include "runtime/jsonb_value.h"
Expand Down Expand Up @@ -110,10 +111,11 @@ Status DataTypeVariantSerDe::serialize_one_cell_to_json(const IColumn& column, i

Status DataTypeVariantSerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const {
vectorized::ParseConfig config;
vectorized::ParseConfig parse_config;
parse_config.check_duplicate_json_path = config::variant_enable_duplicate_json_path_check;
auto parser = parsers_pool.get([] { return new JsonParser(); });
RETURN_IF_CATCH_EXCEPTION(
parse_json_to_variant(column, slice.data, slice.size, parser.get(), config));
parse_json_to_variant(column, slice.data, slice.size, parser.get(), parse_config));
return Status::OK();
}

Expand Down
44 changes: 32 additions & 12 deletions be/src/vec/json/json_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ std::optional<ParseResult> JSONDataParser<ParserImpl>::parse(const char* begin,
}
ParseContext context;
context.enable_flatten_nested = config.enable_flatten_nested;
context.check_duplicate_json_path = config.check_duplicate_json_path;
context.is_top_array = document.isArray();
traverse(document, context);
ParseResult result;
Expand Down Expand Up @@ -74,19 +75,34 @@ void JSONDataParser<ParserImpl>::traverse(const Element& element, ParseContext&
// Parse nested arrays to JsonbField
JsonbWriter writer;
traverseArrayAsJsonb(element.getArray(), writer);
ctx.paths.push_back(ctx.builder.get_parts());
ctx.values.push_back(Field::create_field<TYPE_JSONB>(
JsonbField(writer.getOutput()->getBuffer(), writer.getOutput()->getSize())));
appendValueIfNotDuplicate(
ctx, ctx.builder.get_parts(),
Field::create_field<TYPE_JSONB>(JsonbField(writer.getOutput()->getBuffer(),
writer.getOutput()->getSize())));
} else {
traverseArray(element.getArray(), ctx);
}
// we should set has_nested_in_flatten to false when traverse array finished for next array otherwise it will be true for next array
ctx.has_nested_in_flatten = false;
} else {
ctx.paths.push_back(ctx.builder.get_parts());
ctx.values.push_back(getValueAsField(element));
appendValueIfNotDuplicate(ctx, ctx.builder.get_parts(), getValueAsField(element));
}
}

template <typename ParserImpl>
void JSONDataParser<ParserImpl>::appendValueIfNotDuplicate(ParseContext& ctx,
const PathInData::Parts& path,
Field&& value) {
if (ctx.check_duplicate_json_path) {
PathInData path_in_data(path);
if (!ctx.visited_path_names.emplace(path_in_data.get_path()).second) {
return;
}
}
ctx.paths.push_back(path);
ctx.values.push_back(std::move(value));
}

template <typename ParserImpl>
void JSONDataParser<ParserImpl>::traverseObject(const JSONObject& object, ParseContext& ctx) {
ctx.paths.reserve(ctx.paths.size() + object.size());
Expand Down Expand Up @@ -178,23 +194,25 @@ void JSONDataParser<ParserImpl>::traverseArray(const JSONArray& array, ParseCont
ParseArrayContext array_ctx;
array_ctx.has_nested_in_flatten = ctx.has_nested_in_flatten;
array_ctx.is_top_array = ctx.is_top_array;
array_ctx.check_duplicate_json_path = ctx.check_duplicate_json_path;
array_ctx.total_size = array.size();
for (auto it = array.begin(); it != array.end(); ++it) {
traverseArrayElement(*it, array_ctx);
++array_ctx.current_size;
}
auto&& arrays_by_path = array_ctx.arrays_by_path;
if (arrays_by_path.empty()) {
ctx.paths.push_back(ctx.builder.get_parts());
ctx.values.push_back(Field::create_field<TYPE_ARRAY>(Array()));
appendValueIfNotDuplicate(ctx, ctx.builder.get_parts(),
Field::create_field<TYPE_ARRAY>(Array()));
} else {
ctx.paths.reserve(ctx.paths.size() + arrays_by_path.size());
ctx.values.reserve(ctx.values.size() + arrays_by_path.size());
for (auto it = arrays_by_path.begin(); it != arrays_by_path.end(); ++it) {
auto&& [path, path_array] = it->second;
/// Merge prefix path and path of array element.
ctx.paths.push_back(ctx.builder.append(path, true).get_parts());
ctx.values.push_back(Field::create_field<TYPE_ARRAY>(std::move(path_array)));
ctx.builder.append(path, true);
appendValueIfNotDuplicate(ctx, ctx.builder.get_parts(),
Field::create_field<TYPE_ARRAY>(std::move(path_array)));
ctx.builder.pop_back(path.size());
}
}
Expand All @@ -206,10 +224,12 @@ void JSONDataParser<ParserImpl>::traverseArrayElement(const Element& element,
ParseContext element_ctx;
element_ctx.has_nested_in_flatten = ctx.has_nested_in_flatten;
element_ctx.is_top_array = ctx.is_top_array;
element_ctx.check_duplicate_json_path = ctx.check_duplicate_json_path;
traverse(element, element_ctx);
auto& [_, paths, values, flatten_nested, __, is_top_array] = element_ctx;
auto& paths = element_ctx.paths;
auto& values = element_ctx.values;

if (element_ctx.has_nested_in_flatten && is_top_array) {
if (element_ctx.has_nested_in_flatten && element_ctx.is_top_array) {
checkAmbiguousStructure(ctx, paths);
}

Expand All @@ -231,7 +251,7 @@ void JSONDataParser<ParserImpl>::traverseArrayElement(const Element& element,
}
}

if (keys_to_update && !(is_top_array && ctx.has_nested_in_flatten)) {
if (keys_to_update && !(element_ctx.is_top_array && ctx.has_nested_in_flatten)) {
fillMissedValuesInArrays(ctx);
}
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/json/json_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ void writeValueAsJsonb(const Element& element, JsonbWriter& writer) {

struct ParseConfig {
bool enable_flatten_nested = false;
bool check_duplicate_json_path = false;
};
/// Result of parsing of a document.
/// Contains all paths extracted from document
Expand All @@ -122,7 +123,9 @@ class JSONDataParser {
PathInDataBuilder builder;
std::vector<PathInData::Parts> paths;
std::vector<Field> values;
phmap::flat_hash_set<std::string> visited_path_names;
bool enable_flatten_nested = false;
bool check_duplicate_json_path = false;
bool has_nested_in_flatten = false;
bool is_top_array = false;
};
Expand All @@ -136,10 +139,12 @@ class JSONDataParser {
KeyToSizes nested_sizes_by_key;
bool has_nested_in_flatten = false;
bool is_top_array = false;
bool check_duplicate_json_path = false;
};
void traverse(const Element& element, ParseContext& ctx);
void traverseObject(const JSONObject& object, ParseContext& ctx);
void traverseArray(const JSONArray& array, ParseContext& ctx);
void appendValueIfNotDuplicate(ParseContext& ctx, const PathInData::Parts& path, Field&& value);
void traverseArrayElement(const Element& element, ParseArrayContext& ctx);
void checkAmbiguousStructure(const ParseArrayContext& ctx,
const std::vector<PathInData::Parts>& paths);
Expand Down
49 changes: 37 additions & 12 deletions be/src/vec/json/parse2column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,32 +165,57 @@ void parse_json_to_variant(IColumn& column, const char* src, size_t length,
check_paths.insert(check_paths.end(), paths.begin(), paths.end());
THROW_IF_ERROR(vectorized::schema_util::check_variant_has_no_ambiguous_paths(check_paths));
}
for (size_t i = 0; i < paths.size(); ++i) {
FieldInfo field_info;
schema_util::get_field_info(values[i], &field_info);
if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
continue;

auto is_plain_path = [](const PathInData& path) {
for (const auto& part : path.get_parts()) {
if (part.is_nested || part.anonymous_array_level != 0) {
return false;
}
}
if (column_variant.get_subcolumn(paths[i], i) == nullptr) {
if (paths[i].has_nested_part()) {
column_variant.add_nested_subcolumn(paths[i], field_info, old_num_rows);
return true;
};

auto get_or_create_subcolumn = [&](const PathInData& path, size_t index_hint,
const FieldInfo& field_info) -> ColumnVariant::Subcolumn* {
auto* subcolumn = column_variant.get_subcolumn(path, index_hint);
if (subcolumn == nullptr) {
if (path.has_nested_part()) {
column_variant.add_nested_subcolumn(path, field_info, old_num_rows);
} else {
column_variant.add_sub_column(paths[i], old_num_rows);
column_variant.add_sub_column(path, old_num_rows);
}
subcolumn = column_variant.get_subcolumn(path, index_hint);
}
auto* subcolumn = column_variant.get_subcolumn(paths[i], i);
if (!subcolumn) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to find sub column {}",
paths[i].get_path());
path.get_path());
}
return subcolumn;
};

auto normalize_plain_path = [&](const PathInData& path) {
if (!config.check_duplicate_json_path || path.empty() || !is_plain_path(path)) {
return path;
}
return PathInData(path.get_path());
};

for (size_t i = 0; i < paths.size(); ++i) {
FieldInfo field_info;
schema_util::get_field_info(values[i], &field_info);
if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
continue;
}
auto path = normalize_plain_path(paths[i]);
auto* subcolumn = get_or_create_subcolumn(path, i, field_info);
if (subcolumn->cur_num_of_defaults() > 0) {
subcolumn->insert_many_defaults(subcolumn->cur_num_of_defaults());
subcolumn->reset_current_num_of_defaults();
}
if (subcolumn->size() != old_num_rows) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"subcolumn {} size missmatched, may contains duplicated entry",
paths[i].get_path());
path.get_path());
}
subcolumn->insert(std::move(values[i]), std::move(field_info));
}
Expand Down
42 changes: 42 additions & 0 deletions be/test/vec/common/schema_util_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,48 @@ TEST_F(SchemaUtilTest, TestParseVariantColumns) {
EXPECT_TRUE(obj_column.is_scalar_variant());
}

TEST_F(SchemaUtilTest, TestParseVariantColumnsDuplicateJsonPathCheck) {
Block block;

auto variant_type = std::make_shared<DataTypeVariant>(10);
auto variant_column = ColumnVariant::create(10);
auto root_column = ColumnString::create();
root_column->insert(
vectorized::Field::create_field<PrimitiveType::TYPE_STRING>(R"({"a":123,"a":"123"})"));
root_column->insert(vectorized::Field::create_field<PrimitiveType::TYPE_STRING>(
R"({"a.b":1,"a":{"b":2}})"));
root_column->insert(vectorized::Field::create_field<PrimitiveType::TYPE_STRING>(
R"({"a":{"b":3},"a.b":4})"));
variant_column->create_root(std::make_shared<DataTypeString>(), root_column->get_ptr());

block.insert({variant_column->get_ptr(), variant_type, "variant_col"});

ParseConfig config;
config.check_duplicate_json_path = true;
auto status = schema_util::parse_variant_columns(block, {0}, config);
ASSERT_TRUE(status.ok()) << status.to_string();

const auto& result_column = assert_cast<const ColumnVariant&>(*block.get_by_position(0).column);
ASSERT_TRUE(result_column.sanitize().ok());

const auto* sub_a = result_column.get_subcolumn(PathInData("a"));
const auto* sub_ab = result_column.get_subcolumn(PathInData("a.b"));
ASSERT_NE(sub_a, nullptr);
ASSERT_NE(sub_ab, nullptr);

FieldWithDataType field;
sub_a->get(0, field);
EXPECT_EQ(field.field.get_type(), PrimitiveType::TYPE_BIGINT);
EXPECT_EQ(field.field.get<PrimitiveType::TYPE_BIGINT>(), 123);

sub_ab->get(1, field);
EXPECT_EQ(field.field.get_type(), PrimitiveType::TYPE_BIGINT);
EXPECT_EQ(field.field.get<PrimitiveType::TYPE_BIGINT>(), 1);
sub_ab->get(2, field);
EXPECT_EQ(field.field.get_type(), PrimitiveType::TYPE_BIGINT);
EXPECT_EQ(field.field.get<PrimitiveType::TYPE_BIGINT>(), 3);
}

TEST_F(SchemaUtilTest, TestGetLeastCommonSchema) {
// Create test schemas
TabletSchemaPB schema1_pb;
Expand Down
32 changes: 32 additions & 0 deletions be/test/vec/jsonb/json_parser_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,38 @@ TEST(JsonParserTest, ParseCornerCases) {
ASSERT_TRUE(result.has_value());
}

TEST(JsonParserTest, ParseDuplicateJsonPathCheckKeepsFirstLeafValue) {
JSONDataParser<SimdJSONParser> parser;
ParseConfig config;
config.check_duplicate_json_path = true;

std::string json = R"({"a":123,"a":"123","b":1})";
auto result = parser.parse(json.c_str(), json.size(), config);
ASSERT_TRUE(result.has_value());
ASSERT_EQ(result->paths.size(), 2);
ASSERT_EQ(result->values.size(), 2);
EXPECT_EQ(result->paths[0].get_path(), "a");
EXPECT_EQ(result->values[0].get_type(), doris::PrimitiveType::TYPE_BIGINT);
EXPECT_EQ(result->values[0].get<doris::PrimitiveType::TYPE_BIGINT>(), 123);
EXPECT_EQ(result->paths[1].get_path(), "b");
}

TEST(JsonParserTest, ParseDuplicateJsonPathCheckNormalizesDottedAndNestedPaths) {
JSONDataParser<SimdJSONParser> parser;
ParseConfig config;
config.check_duplicate_json_path = true;

std::string json = R"({"a.b":1,"a":{"b":2},"a":{"c":3}})";
auto result = parser.parse(json.c_str(), json.size(), config);
ASSERT_TRUE(result.has_value());
ASSERT_EQ(result->paths.size(), 2);
ASSERT_EQ(result->values.size(), 2);
EXPECT_EQ(result->paths[0].get_path(), "a.b");
EXPECT_EQ(result->values[0].get<doris::PrimitiveType::TYPE_BIGINT>(), 1);
EXPECT_EQ(result->paths[1].get_path(), "a.c");
EXPECT_EQ(result->values[1].get<doris::PrimitiveType::TYPE_BIGINT>(), 3);
}

// Test cases for the selected code functionality
TEST(JsonParserTest, TestIsPrefixFunction) {
JSONDataParser<SimdJSONParser> parser;
Expand Down
7 changes: 7 additions & 0 deletions regression-test/data/variant_p0/duplicate_json_path.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"k":8,"v":{"a":42,"a":{"b":42}}}
{"k":9,"v":{"a":123,"a":"123"}}
{"k":10,"v":{"a.b":8,"a":{"b":9}}}
{"k":11,"v":{"a":{"b":10},"a.b":11}}
{"k":12,"v":{"a":{"b":11},"a":{"c":12}}}
{"k":13,"v":{"a":[13],"a":14}}
{"k":14,"v":{"a":14,"a":[13]}}
Loading
Loading