From 93bbb413e27585d45e6f08a5a7af7e4310ab6ce9 Mon Sep 17 00:00:00 2001 From: carolinchen Date: Wed, 22 Apr 2026 16:56:59 +0800 Subject: [PATCH] [fix](cloud)(restore) fix broken schema during restore of lsc=false tables MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problem ------- In cloud mode, when a snapshot of a table created with `light_schema_change = false` is restored, the schema KV persisted under `meta_schema_key({instance_id, index_id, schema_version})` ends up with every column carrying `unique_id = -1`, because the schema is first written by `create_tablet` and at that point the unique_ids are not yet assigned. `commit_restore_job` then receives the correct schema (with valid `unique_id >= 0`) in each rowset meta from the backup, but the existing `put_schema_kv` is a no-op when the key already exists, so the broken schema leaks through and subsequent reads fail with errors such as `column reader is nullptr` or `different type between schema and column reader`. Fix --- Introduce `put_schema_kv_on_restore()` in the cloud MetaService. It reads the existing schema value, detects the broken-schema signature (`column(0).unique_id() == -1` or an unparseable value), range-removes all chunks of the stale schema, and writes the correct one. To avoid replacing a bad schema with another bad one, it also refuses to write a schema that is itself broken (empty columns or `column(0).unique_id == -1`) and only logs a warning in that case. In `MetaServiceImpl::commit_restore_job`, replace both existing `put_schema_kv` call sites with `put_schema_kv_on_restore`, guarded by an in-RPC `std::set` so the same `(index_id, schema_version)` pair does not issue redundant FDB reads/writes when the restore spans many rowsets. Four counters (`rs_meta_schema_put_cnt/skip_cnt`, `tablet_meta_schema_put_cnt/skip_cnt`) are logged at the end of the RPC to make the behaviour observable in production. `put_versioned_schema_kv()` is intentionally NOT wrapped by the dedup set: it targets a different key space (`versioned::meta_schema_key`) and is already skip-if-exists inside the function. Tests ----- `cloud/test/meta_service_test.cpp` adds 5 unit tests covering every branch of `put_schema_kv_on_restore`: - `PutWhenKeyNotExist` — first-write path - `NoopWhenExistingSchemaIsGood` — skip-if-healthy path - `OverwriteWhenExistingIsBroken` — the fix itself - `DefensiveSkipWhenIncomingHasEmptyColumns` — defensive guard - `DefensiveSkipWhenIncomingHasUidNegativeOne` — defensive guard --- cloud/src/meta-service/meta_service.cpp | 39 ++- .../src/meta-service/meta_service_schema.cpp | 52 +++ cloud/src/meta-service/meta_service_schema.h | 9 + cloud/test/meta_service_test.cpp | 325 ++++++++++++++++++ 4 files changed, 419 insertions(+), 6 deletions(-) diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index c6f423381a9f7c..a6b324ee747360 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #include #include #include @@ -1764,6 +1765,15 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont TabletStats tablet_stat; int64_t converted_rowset_num = 0; int32_t max_batch_size = config::max_restore_job_rowsets_per_batch; + // Track schema keys for which `put_schema_kv_on_restore` has already been + // invoked within this RPC, so the same (index_id, schema_version) does not + // issue redundant FDB reads/writes across rowsets. Only covers the + // `meta_schema_key` path; `put_versioned_schema_kv` remains independent. + std::set restored_schema_keys; + int64_t rs_meta_schema_put_cnt = 0; + int64_t rs_meta_schema_skip_cnt = 0; + int64_t tablet_meta_schema_put_cnt = 0; + int64_t tablet_meta_schema_skip_cnt = 0; for (size_t i = 0; i < restore_job_rs_metas.size(); i += max_batch_size) { size_t end = (i + max_batch_size) > restore_job_rs_metas.size() ? restore_job_rs_metas.size() @@ -1791,9 +1801,16 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont return; } } - put_schema_kv(code, msg, txn.get(), schema_key, rowset_meta.tablet_schema()); - if (code != MetaServiceCode::OK) { - return; + if (restored_schema_keys.count(schema_key) == 0) { + put_schema_kv_on_restore(code, msg, txn.get(), schema_key, + rowset_meta.tablet_schema()); + if (code != MetaServiceCode::OK) { + return; + } + restored_schema_keys.insert(schema_key); + ++rs_meta_schema_put_cnt; + } else { + ++rs_meta_schema_skip_cnt; } if (is_versioned_write) { std::string versioned_schema_key = versioned::meta_schema_key( @@ -2052,8 +2069,14 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont fix_column_type(tablet_meta->mutable_schema()); auto schema_key = meta_schema_key( {instance_id, tablet_meta->index_id(), tablet_meta->schema_version()}); - put_schema_kv(code, msg, txn0.get(), schema_key, tablet_meta->schema()); - if (code != MetaServiceCode::OK) return; + if (restored_schema_keys.count(schema_key) == 0) { + put_schema_kv_on_restore(code, msg, txn0.get(), schema_key, tablet_meta->schema()); + if (code != MetaServiceCode::OK) return; + restored_schema_keys.insert(schema_key); + ++tablet_meta_schema_put_cnt; + } else { + ++tablet_meta_schema_skip_cnt; + } bool is_versioned_write = is_version_write_enabled(instance_id); if (is_versioned_write) { @@ -2161,7 +2184,11 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont .tag("tablet_id", tablet_idx.tablet_id()) .tag("state", restore_job_pb.state()) .tag("mtime_s", restore_job_pb.mtime_s()) - .tag("committed_rowset_num", converted_rowset_num); + .tag("committed_rowset_num", converted_rowset_num) + .tag("rs_meta_schema_put_cnt", rs_meta_schema_put_cnt) + .tag("rs_meta_schema_skip_cnt", rs_meta_schema_skip_cnt) + .tag("tablet_meta_schema_put_cnt", tablet_meta_schema_put_cnt) + .tag("tablet_meta_schema_skip_cnt", tablet_meta_schema_skip_cnt); err = txn0->commit(); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); diff --git a/cloud/src/meta-service/meta_service_schema.cpp b/cloud/src/meta-service/meta_service_schema.cpp index 93bbdfc2c193ba..858a0159ce3ded 100644 --- a/cloud/src/meta-service/meta_service_schema.cpp +++ b/cloud/src/meta-service/meta_service_schema.cpp @@ -34,6 +34,7 @@ #include "cpp/sync_point.h" #include "meta-service/meta_service_helper.h" #include "meta-store/blob_message.h" +#include "meta-store/codec.h" #include "meta-store/document_message.h" #include "meta-store/keys.h" #include "meta-store/txn_kv.h" @@ -133,6 +134,57 @@ void put_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn, } } +void put_schema_kv_on_restore(MetaServiceCode& code, std::string& msg, Transaction* txn, + std::string_view schema_key, + const doris::TabletSchemaCloudPB& schema) { + // Decide whether we need to (re)write the schema at this key. + bool need_put = false; + ValueBuf val_buf; + TxnErrorCode err = cloud::blob_get(txn, schema_key, &val_buf); + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + need_put = true; + } else if (err == TxnErrorCode::TXN_OK) { + // Overwrite if the existing schema value cannot be parsed, or if its + // first column has unique_id == -1 which is the signature of a broken + // schema written by create_tablet for a light_schema_change=false + // table during restore. + doris::TabletSchemaCloudPB saved_schema; + need_put = !parse_schema_value(val_buf, &saved_schema) || + (saved_schema.column_size() > 0 && saved_schema.column(0).unique_id() == -1); + } else { + code = cast_as(err); + msg = fmt::format("failed to get schema during restore, err={}", err); + return; + } + if (!need_put) { + return; + } + // Defensive check: refuse to overwrite with a schema that is itself + // broken (empty columns, or column(0).unique_id == -1). This guarantees + // we never replace a bad schema with another bad one. On rejection we + // only log, so callers can continue committing other work. + if (schema.column_size() == 0 || schema.column(0).unique_id() == -1) { + LOG_WARNING("skip put schema during restore, incoming schema is broken") + .tag("key", hex(schema_key)) + .tag("column_size", schema.column_size()); + return; + } + // `put_schema_kv` stores the schema as either a single KV (when + // meta_schema_value_version == 0) or multiple blob chunks keyed by + // `schema_key + encode_int64(ver << 56 + i)`. To clean up all possible + // existing chunks we do a range remove over `[schema_key, schema_key + INT64_MAX)`. + std::string schema_key_end(schema_key); + encode_int64(INT64_MAX, &schema_key_end); + txn->remove(schema_key, schema_key_end); + uint8_t ver = config::meta_schema_value_version; + if (ver > 0) { + cloud::blob_put(txn, schema_key, schema, ver); + } else { + txn->put(schema_key, schema.SerializeAsString()); + } + LOG_INFO("put schema during restore").tag("key", hex(schema_key)); +} + void put_versioned_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn, std::string_view schema_key, const doris::TabletSchemaCloudPB& schema) { diff --git a/cloud/src/meta-service/meta_service_schema.h b/cloud/src/meta-service/meta_service_schema.h index ffade3a150fbcf..e910333c028251 100644 --- a/cloud/src/meta-service/meta_service_schema.h +++ b/cloud/src/meta-service/meta_service_schema.h @@ -27,6 +27,15 @@ struct ValueBuf; void put_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn, std::string_view schema_key, const doris::TabletSchemaCloudPB& schema); +// Put schema during a restore job. Unlike put_schema_kv, this function will +// overwrite an existing schema key whose contents are broken (cannot be +// parsed, or column(0).unique_id == -1 produced by create_tablet for tables +// with light_schema_change=false). It also refuses to write a new schema +// that is itself broken, to avoid replacing a bad schema with another bad one. +void put_schema_kv_on_restore(MetaServiceCode& code, std::string& msg, Transaction* txn, + std::string_view schema_key, + const doris::TabletSchemaCloudPB& schema); + void put_versioned_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn, std::string_view schema_key, const doris::TabletSchemaCloudPB& schema); diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 6f18493ee2ed3a..906bf1af32cbc2 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -35,10 +35,12 @@ #include #include "common/config.h" +#include "common/defer.h" #include "common/logging.h" #include "common/util.h" #include "cpp/sync_point.h" #include "meta-service/meta_service_helper.h" +#include "meta-service/meta_service_schema.h" #include "meta-store/blob_message.h" #include "meta-store/document_message.h" #include "meta-store/keys.h" @@ -13213,4 +13215,327 @@ TEST(MetaServiceTest, CleanTxnLabelVersionedWriteMixedTxns) { } } +// ============================================================================= +// Tests for put_schema_kv_on_restore(). +// +// Background: when restoring a table with `light_schema_change = false` in +// cloud mode, create_tablet writes a schema whose columns all have +// `unique_id = -1`. The subsequent commit_restore_job receives the correct +// schema from the backup's rowset meta and must overwrite that broken one. +// The original put_schema_kv() is a no-op when the key already exists, so +// the broken schema leaks through. put_schema_kv_on_restore() fixes this, +// and also defensively refuses to replace a broken schema with another +// broken one. +// ============================================================================= + +namespace { + +// Builds a TabletSchemaCloudPB whose columns all have unique_id == -1. +// Simulates the schema that create_tablet seeds for light_schema_change=false. +doris::TabletSchemaCloudPB make_broken_schema(int32_t schema_version) { + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(schema_version); + for (int i = 0; i < 3; ++i) { + auto* col = schema.add_column(); + col->set_unique_id(-1); + col->set_name("c" + std::to_string(i)); + col->set_type("INT"); + } + return schema; +} + +// Builds a TabletSchemaCloudPB with valid unique_ids (>= 0). Simulates a +// schema loaded from a backup's rowset meta. +doris::TabletSchemaCloudPB make_good_schema(int32_t schema_version) { + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(schema_version); + for (int i = 0; i < 3; ++i) { + auto* col = schema.add_column(); + col->set_unique_id(1000 + i); + col->set_name("c" + std::to_string(i)); + col->set_type("INT"); + } + return schema; +} + +// Seed a schema KV directly, mirroring put_schema_kv's write path so that +// blob_get can later read it back regardless of meta_schema_value_version. +void seed_schema_kv(MetaServiceProxy* meta_service, std::string_view schema_key, + const doris::TabletSchemaCloudPB& schema) { + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + uint8_t ver = config::meta_schema_value_version; + if (ver > 0) { + cloud::blob_put(txn.get(), schema_key, schema, ver); + } else { + txn->put(schema_key, schema.SerializeAsString()); + } + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); +} + +// Read back a schema KV and parse it into *out. Returns the err from blob_get. +TxnErrorCode read_schema_kv(MetaServiceProxy* meta_service, std::string_view schema_key, + doris::TabletSchemaCloudPB* out) { + std::unique_ptr txn; + EXPECT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ValueBuf buf; + auto err = cloud::blob_get(txn.get(), schema_key, &buf); + if (err == TxnErrorCode::TXN_OK) { + EXPECT_TRUE(parse_schema_value(buf, out)); + } + return err; +} + +} // namespace + +TEST(PutSchemaKvOnRestoreTest, PutWhenKeyNotExist) { + auto meta_service = get_meta_service(); + std::string instance_id = "put_schema_kv_on_restore_case1"; + + auto sp = SyncPoint::get_instance(); + DORIS_CLOUD_DEFER { + SyncPoint::get_instance()->clear_all_call_backs(); + }; + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + sp->enable_processing(); + + const int64_t index_id = 30001; + const int32_t schema_version = 5; + const int8_t saved_ver = config::meta_schema_value_version; + config::meta_schema_value_version = 0; + DORIS_CLOUD_DEFER { + config::meta_schema_value_version = saved_ver; + }; + + auto schema_key = meta_schema_key({instance_id, index_id, schema_version}); + auto good = make_good_schema(schema_version); + + // Precondition: key does not exist. + { + doris::TabletSchemaCloudPB tmp; + ASSERT_EQ(read_schema_kv(meta_service.get(), schema_key, &tmp), + TxnErrorCode::TXN_KEY_NOT_FOUND); + } + + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + MetaServiceCode code = MetaServiceCode::OK; + std::string msg; + put_schema_kv_on_restore(code, msg, txn.get(), schema_key, good); + ASSERT_EQ(code, MetaServiceCode::OK) << msg; + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + doris::TabletSchemaCloudPB got; + ASSERT_EQ(read_schema_kv(meta_service.get(), schema_key, &got), TxnErrorCode::TXN_OK); + ASSERT_EQ(got.column_size(), good.column_size()); + for (int i = 0; i < got.column_size(); ++i) { + EXPECT_EQ(got.column(i).unique_id(), good.column(i).unique_id()) << i; + } +} + +TEST(PutSchemaKvOnRestoreTest, NoopWhenExistingSchemaIsGood) { + auto meta_service = get_meta_service(); + std::string instance_id = "put_schema_kv_on_restore_case2"; + + auto sp = SyncPoint::get_instance(); + DORIS_CLOUD_DEFER { + SyncPoint::get_instance()->clear_all_call_backs(); + }; + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + sp->enable_processing(); + + const int64_t index_id = 30002; + const int32_t schema_version = 7; + const int8_t saved_ver = config::meta_schema_value_version; + config::meta_schema_value_version = 0; + DORIS_CLOUD_DEFER { + config::meta_schema_value_version = saved_ver; + }; + + auto schema_key = meta_schema_key({instance_id, index_id, schema_version}); + auto existing_good = make_good_schema(schema_version); + existing_good.mutable_column(0)->set_unique_id(9999); // distinctive sentinel + ASSERT_NO_FATAL_FAILURE(seed_schema_kv(meta_service.get(), schema_key, existing_good)); + + auto incoming = make_good_schema(schema_version); + ASSERT_NE(incoming.column(0).unique_id(), existing_good.column(0).unique_id()); + + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + MetaServiceCode code = MetaServiceCode::OK; + std::string msg; + put_schema_kv_on_restore(code, msg, txn.get(), schema_key, incoming); + ASSERT_EQ(code, MetaServiceCode::OK) << msg; + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + doris::TabletSchemaCloudPB got; + ASSERT_EQ(read_schema_kv(meta_service.get(), schema_key, &got), TxnErrorCode::TXN_OK); + ASSERT_EQ(got.column_size(), existing_good.column_size()); + EXPECT_EQ(got.column(0).unique_id(), 9999); // unchanged sentinel +} + +TEST(PutSchemaKvOnRestoreTest, OverwriteWhenExistingIsBroken) { + auto meta_service = get_meta_service(); + std::string instance_id = "put_schema_kv_on_restore_case3"; + + auto sp = SyncPoint::get_instance(); + DORIS_CLOUD_DEFER { + SyncPoint::get_instance()->clear_all_call_backs(); + }; + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + sp->enable_processing(); + + const int64_t index_id = 30003; + const int32_t schema_version = 3; + const int8_t saved_ver = config::meta_schema_value_version; + config::meta_schema_value_version = 0; + DORIS_CLOUD_DEFER { + config::meta_schema_value_version = saved_ver; + }; + + auto schema_key = meta_schema_key({instance_id, index_id, schema_version}); + ASSERT_NO_FATAL_FAILURE( + seed_schema_kv(meta_service.get(), schema_key, make_broken_schema(schema_version))); + + // Sanity: existing is indeed broken. + { + doris::TabletSchemaCloudPB tmp; + ASSERT_EQ(read_schema_kv(meta_service.get(), schema_key, &tmp), TxnErrorCode::TXN_OK); + ASSERT_GT(tmp.column_size(), 0); + ASSERT_EQ(tmp.column(0).unique_id(), -1); + } + + auto good = make_good_schema(schema_version); + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + MetaServiceCode code = MetaServiceCode::OK; + std::string msg; + put_schema_kv_on_restore(code, msg, txn.get(), schema_key, good); + ASSERT_EQ(code, MetaServiceCode::OK) << msg; + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + doris::TabletSchemaCloudPB got; + ASSERT_EQ(read_schema_kv(meta_service.get(), schema_key, &got), TxnErrorCode::TXN_OK); + ASSERT_EQ(got.column_size(), good.column_size()); + for (int i = 0; i < got.column_size(); ++i) { + EXPECT_GE(got.column(i).unique_id(), 0) << i; + EXPECT_EQ(got.column(i).unique_id(), good.column(i).unique_id()) << i; + } +} + +TEST(PutSchemaKvOnRestoreTest, DefensiveSkipWhenIncomingHasEmptyColumns) { + auto meta_service = get_meta_service(); + std::string instance_id = "put_schema_kv_on_restore_case4"; + + auto sp = SyncPoint::get_instance(); + DORIS_CLOUD_DEFER { + SyncPoint::get_instance()->clear_all_call_backs(); + }; + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + sp->enable_processing(); + + const int64_t index_id = 30004; + const int32_t schema_version = 1; + const int8_t saved_ver = config::meta_schema_value_version; + config::meta_schema_value_version = 0; + DORIS_CLOUD_DEFER { + config::meta_schema_value_version = saved_ver; + }; + + auto schema_key = meta_schema_key({instance_id, index_id, schema_version}); + ASSERT_NO_FATAL_FAILURE( + seed_schema_kv(meta_service.get(), schema_key, make_broken_schema(schema_version))); + + // Snapshot the existing raw bytes so we can prove they are untouched. + std::string original_bytes; + { + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(schema_key, &original_bytes), TxnErrorCode::TXN_OK); + } + + // Incoming schema has no columns — defensive guard must reject it. + doris::TabletSchemaCloudPB incoming_empty; + incoming_empty.set_schema_version(schema_version); + ASSERT_EQ(incoming_empty.column_size(), 0); + + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + MetaServiceCode code = MetaServiceCode::OK; + std::string msg; + put_schema_kv_on_restore(code, msg, txn.get(), schema_key, incoming_empty); + EXPECT_EQ(code, MetaServiceCode::OK) << msg; // defensive skip returns OK + LOG(WARNING) + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + std::string after_bytes; + { + std::unique_ptr txn2; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn2), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn2->get(schema_key, &after_bytes), TxnErrorCode::TXN_OK); + } + EXPECT_EQ(original_bytes, after_bytes); +} + +TEST(PutSchemaKvOnRestoreTest, DefensiveSkipWhenIncomingHasUidNegativeOne) { + auto meta_service = get_meta_service(); + std::string instance_id = "put_schema_kv_on_restore_case5"; + + auto sp = SyncPoint::get_instance(); + DORIS_CLOUD_DEFER { + SyncPoint::get_instance()->clear_all_call_backs(); + }; + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + sp->enable_processing(); + + const int64_t index_id = 30005; + const int32_t schema_version = 2; + const int8_t saved_ver = config::meta_schema_value_version; + config::meta_schema_value_version = 0; + DORIS_CLOUD_DEFER { + config::meta_schema_value_version = saved_ver; + }; + + auto schema_key = meta_schema_key({instance_id, index_id, schema_version}); + auto existing_broken = make_broken_schema(schema_version); + existing_broken.mutable_column(0)->set_name("existing_broken_marker"); + ASSERT_NO_FATAL_FAILURE(seed_schema_kv(meta_service.get(), schema_key, existing_broken)); + + auto incoming_broken = make_broken_schema(schema_version); + incoming_broken.mutable_column(0)->set_name("incoming_broken_marker"); + + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + MetaServiceCode code = MetaServiceCode::OK; + std::string msg; + put_schema_kv_on_restore(code, msg, txn.get(), schema_key, incoming_broken); + EXPECT_EQ(code, MetaServiceCode::OK) << msg; + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + doris::TabletSchemaCloudPB got; + ASSERT_EQ(read_schema_kv(meta_service.get(), schema_key, &got), TxnErrorCode::TXN_OK); + ASSERT_GT(got.column_size(), 0); + EXPECT_EQ(got.column(0).unique_id(), -1); + EXPECT_EQ(got.column(0).name(), "existing_broken_marker"); // unchanged +} + } // namespace doris::cloud