Skip to content

Commit

Permalink
Merge branch 'master' into cooldown_replica_as_clone_dst_tablet
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaokang committed Aug 27, 2023
2 parents 7d45e9d + 9815861 commit b652347
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 44 deletions.
93 changes: 52 additions & 41 deletions be/src/vec/functions/function_jsonb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,48 @@ struct JsonbExtractStringImpl {
auto writer = std::make_unique<JsonbWriter>();
std::unique_ptr<JsonbToJson> formater;

// reuseable json path list, espacially for const path
std::vector<JsonbPath> json_path_list;
json_path_list.resize(rdata_columns.size());

// lambda function to parse json path for row i and path pi
auto parse_json_path = [&](size_t i, size_t pi) -> Status {
const ColumnString* path_col = rdata_columns[pi];
const ColumnString::Chars& rdata = path_col->get_chars();
const ColumnString::Offsets& roffsets = path_col->get_offsets();
size_t r_off = roffsets[index_check_const(i, path_const[pi]) - 1];
size_t r_size = roffsets[index_check_const(i, path_const[pi])] - r_off;
const char* r_raw = reinterpret_cast<const char*>(&rdata[r_off]);

JsonbPath path;
if (!path.seek(r_raw, r_size)) {
return Status::InvalidArgument(
"Json path error: {} for value: {}",
JsonbErrMsg::getErrMsg(JsonbErrType::E_INVALID_JSON_PATH),
std::string_view(reinterpret_cast<const char*>(rdata.data()),
rdata.size()));
}

// if not valid json path , should return error message to user
if (is_invalid_json_path) {
return Status::InvalidArgument(
"Json path error: {} for value: {}",
JsonbErrMsg::getErrMsg(JsonbErrType::E_INVALID_JSON_PATH),
std::string_view(reinterpret_cast<const char*>(rdata.data()),
rdata.size()));
}

json_path_list[pi] = std::move(path);

return Status::OK();
};

for (size_t pi = 0; pi < rdata_columns.size(); pi++) {
if (path_const[pi]) {
RETURN_IF_ERROR(parse_json_path(0, pi));
}
}

for (size_t i = 0; i < input_rows_count; ++i) {
if (null_map[i]) {
StringOP::push_null_string(i, res_data, res_offsets, null_map);
Expand All @@ -526,61 +568,30 @@ struct JsonbExtractStringImpl {
size_t l_size = loffsets[index_check_const(i, json_data_const)] - l_off;
const char* l_raw = reinterpret_cast<const char*>(&ldata[l_off]);
if (rdata_columns.size() == 1) { // just return origin value
const ColumnString* path_col = rdata_columns[0];
const ColumnString::Chars& rdata = path_col->get_chars();
const ColumnString::Offsets& roffsets = path_col->get_offsets();
size_t r_off = roffsets[index_check_const(i, path_const[0]) - 1];
size_t r_size = roffsets[index_check_const(i, path_const[0])] - r_off;
const char* r_raw = reinterpret_cast<const char*>(&rdata[r_off]);

JsonbPath path;
if (!path.seek(r_raw, r_size)) {
return Status::InvalidArgument(
"Json path error: {} for value: {}",
JsonbErrMsg::getErrMsg(JsonbErrType::E_INVALID_JSON_PATH),
std::string_view(reinterpret_cast<const char*>(rdata.data()),
rdata.size()));
if (!path_const[0]) {
RETURN_IF_ERROR(parse_json_path(i, 0));
}

inner_loop_impl(i, res_data, res_offsets, null_map, writer, formater, l_raw, l_size,
path);
json_path_list[0]);
} else { // will make array string to user
writer->reset();
writer->writeStartArray();

// doc is NOT necessary to be deleted since JsonbDocument will not allocate memory
JsonbDocument* doc = JsonbDocument::createDocument(l_raw, l_size);

for (size_t pi = 0; pi < rdata_columns.size(); ++pi) {
const ColumnString* path_col = rdata_columns[pi];
const ColumnString::Chars& rdata = path_col->get_chars();
const ColumnString::Offsets& roffsets = path_col->get_offsets();
size_t r_off = roffsets[index_check_const(i, path_const[pi]) - 1];
size_t r_size = roffsets[index_check_const(i, path_const[pi])] - r_off;
const char* r_raw = reinterpret_cast<const char*>(&rdata[r_off]);
// doc is NOT necessary to be deleted since JsonbDocument will not allocate memory
JsonbDocument* doc = JsonbDocument::createDocument(l_raw, l_size);
if (UNLIKELY(!doc || !doc->getValue())) {
writer->writeNull();
continue;
}

JsonbPath path;
if (!path.seek(r_raw, r_size)) {
return Status::InvalidArgument(
"Json path error: {} for value: {}",
JsonbErrMsg::getErrMsg(JsonbErrType::E_INVALID_JSON_PATH),
std::string_view(reinterpret_cast<const char*>(rdata.data()),
rdata.size()));
}

// if not valid json path , should return error message to user
if (is_invalid_json_path) {
return Status::InvalidArgument(
"Json path error: {} for value: {}",
JsonbErrMsg::getErrMsg(JsonbErrType::E_INVALID_JSON_PATH),
std::string_view(reinterpret_cast<const char*>(rdata.data()),
rdata.size()));
if (!path_const[pi]) {
RETURN_IF_ERROR(parse_json_path(i, pi));
}

// value is NOT necessary to be deleted since JsonbValue will not allocate memory
JsonbValue* value = doc->getValue()->findValue(path, nullptr);
JsonbValue* value = doc->getValue()->findValue(json_path_list[pi], nullptr);

if (UNLIKELY(!value)) {
writer->writeNull();
Expand Down
6 changes: 3 additions & 3 deletions be/test/runtime/load_stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ class LoadStreamMgrTest : public testing::Test {
cntl->SetFailed("Tablet not found");
status->set_status_code(TStatusCode::NOT_FOUND);
response->set_allocated_status(status.get());
response->release_status();
static_cast<void>(response->release_status());
return;
}
auto resp = response->add_tablet_schemas();
Expand All @@ -381,15 +381,15 @@ class LoadStreamMgrTest : public testing::Test {
cntl->SetFailed("Fail to accept stream");
status->set_status_code(TStatusCode::CANCELLED);
response->set_allocated_status(status.get());
response->release_status();
static_cast<void>(response->release_status());
return;
}

load_stream->add_rpc_stream();

status->set_status_code(TStatusCode::OK);
response->set_allocated_status(status.get());
response->release_status();
static_cast<void>(response->release_status());
}

private:
Expand Down

0 comments on commit b652347

Please sign in to comment.