Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <memory>
#include <numeric>
#include <ostream>
#include <set>
#include <sstream>
#include <string>
#include <string_view>
Expand Down Expand Up @@ -1764,6 +1765,15 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
TabletStats tablet_stat;
int64_t converted_rowset_num = 0;
int32_t max_batch_size = config::max_restore_job_rowsets_per_batch;
// Track schema keys for which `put_schema_kv_on_restore` has already been
// invoked within this RPC, so the same (index_id, schema_version) does not
// issue redundant FDB reads/writes across rowsets. Only covers the
// `meta_schema_key` path; `put_versioned_schema_kv` remains independent.
std::set<std::string> restored_schema_keys;
int64_t rs_meta_schema_put_cnt = 0;
int64_t rs_meta_schema_skip_cnt = 0;
int64_t tablet_meta_schema_put_cnt = 0;
int64_t tablet_meta_schema_skip_cnt = 0;
for (size_t i = 0; i < restore_job_rs_metas.size(); i += max_batch_size) {
size_t end = (i + max_batch_size) > restore_job_rs_metas.size()
? restore_job_rs_metas.size()
Expand Down Expand Up @@ -1791,9 +1801,16 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
return;
}
}
put_schema_kv(code, msg, txn.get(), schema_key, rowset_meta.tablet_schema());
if (code != MetaServiceCode::OK) {
return;
if (restored_schema_keys.count(schema_key) == 0) {
put_schema_kv_on_restore(code, msg, txn.get(), schema_key,
rowset_meta.tablet_schema());
if (code != MetaServiceCode::OK) {
return;
}
restored_schema_keys.insert(schema_key);
++rs_meta_schema_put_cnt;
} else {
++rs_meta_schema_skip_cnt;
}
if (is_versioned_write) {
std::string versioned_schema_key = versioned::meta_schema_key(
Expand Down Expand Up @@ -2052,8 +2069,14 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
fix_column_type(tablet_meta->mutable_schema());
auto schema_key = meta_schema_key(
{instance_id, tablet_meta->index_id(), tablet_meta->schema_version()});
put_schema_kv(code, msg, txn0.get(), schema_key, tablet_meta->schema());
if (code != MetaServiceCode::OK) return;
if (restored_schema_keys.count(schema_key) == 0) {
put_schema_kv_on_restore(code, msg, txn0.get(), schema_key, tablet_meta->schema());
if (code != MetaServiceCode::OK) return;
restored_schema_keys.insert(schema_key);
++tablet_meta_schema_put_cnt;
} else {
++tablet_meta_schema_skip_cnt;
}

bool is_versioned_write = is_version_write_enabled(instance_id);
if (is_versioned_write) {
Expand Down Expand Up @@ -2161,7 +2184,11 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
.tag("tablet_id", tablet_idx.tablet_id())
.tag("state", restore_job_pb.state())
.tag("mtime_s", restore_job_pb.mtime_s())
.tag("committed_rowset_num", converted_rowset_num);
.tag("committed_rowset_num", converted_rowset_num)
.tag("rs_meta_schema_put_cnt", rs_meta_schema_put_cnt)
.tag("rs_meta_schema_skip_cnt", rs_meta_schema_skip_cnt)
.tag("tablet_meta_schema_put_cnt", tablet_meta_schema_put_cnt)
.tag("tablet_meta_schema_skip_cnt", tablet_meta_schema_skip_cnt);
err = txn0->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
Expand Down
52 changes: 52 additions & 0 deletions cloud/src/meta-service/meta_service_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "cpp/sync_point.h"
#include "meta-service/meta_service_helper.h"
#include "meta-store/blob_message.h"
#include "meta-store/codec.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/txn_kv.h"
Expand Down Expand Up @@ -133,6 +134,57 @@ void put_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn,
}
}

void put_schema_kv_on_restore(MetaServiceCode& code, std::string& msg, Transaction* txn,
std::string_view schema_key,
const doris::TabletSchemaCloudPB& schema) {
// Decide whether we need to (re)write the schema at this key.
bool need_put = false;
ValueBuf val_buf;
TxnErrorCode err = cloud::blob_get(txn, schema_key, &val_buf);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
need_put = true;
} else if (err == TxnErrorCode::TXN_OK) {
// Overwrite if the existing schema value cannot be parsed, or if its
// first column has unique_id == -1 which is the signature of a broken
// schema written by create_tablet for a light_schema_change=false
// table during restore.
doris::TabletSchemaCloudPB saved_schema;
need_put = !parse_schema_value(val_buf, &saved_schema) ||
(saved_schema.column_size() > 0 && saved_schema.column(0).unique_id() == -1);
} else {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get schema during restore, err={}", err);
return;
}
if (!need_put) {
return;
}
// Defensive check: refuse to overwrite with a schema that is itself
// broken (empty columns, or column(0).unique_id == -1). This guarantees
// we never replace a bad schema with another bad one. On rejection we
// only log, so callers can continue committing other work.
if (schema.column_size() == 0 || schema.column(0).unique_id() == -1) {
LOG_WARNING("skip put schema during restore, incoming schema is broken")
.tag("key", hex(schema_key))
.tag("column_size", schema.column_size());
return;
}
// `put_schema_kv` stores the schema as either a single KV (when
// meta_schema_value_version == 0) or multiple blob chunks keyed by
// `schema_key + encode_int64(ver << 56 + i)`. To clean up all possible
// existing chunks we do a range remove over `[schema_key, schema_key + INT64_MAX)`.
std::string schema_key_end(schema_key);
encode_int64(INT64_MAX, &schema_key_end);
txn->remove(schema_key, schema_key_end);
uint8_t ver = config::meta_schema_value_version;
if (ver > 0) {
cloud::blob_put(txn, schema_key, schema, ver);
} else {
txn->put(schema_key, schema.SerializeAsString());
}
LOG_INFO("put schema during restore").tag("key", hex(schema_key));
}

void put_versioned_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn,
std::string_view schema_key,
const doris::TabletSchemaCloudPB& schema) {
Expand Down
9 changes: 9 additions & 0 deletions cloud/src/meta-service/meta_service_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ struct ValueBuf;
void put_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn,
std::string_view schema_key, const doris::TabletSchemaCloudPB& schema);

// Put schema during a restore job. Unlike put_schema_kv, this function will
// overwrite an existing schema key whose contents are broken (cannot be
// parsed, or column(0).unique_id == -1 produced by create_tablet for tables
// with light_schema_change=false). It also refuses to write a new schema
// that is itself broken, to avoid replacing a bad schema with another bad one.
void put_schema_kv_on_restore(MetaServiceCode& code, std::string& msg, Transaction* txn,
std::string_view schema_key,
const doris::TabletSchemaCloudPB& schema);

void put_versioned_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn,
std::string_view schema_key, const doris::TabletSchemaCloudPB& schema);

Expand Down
Loading
Loading