Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2800,8 +2800,10 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
msg = fmt::format("failed to check whether rowset exists, err={}", err);
return;
}
// write schema kv if rowset_meta has schema
if (config::write_schema_kv && rowset_meta.has_tablet_schema()) {
// Delete rowsets may only carry the columns needed to build predicates, so they must not
// become the shared schema source for the whole schema version.
if (config::write_schema_kv && rowset_meta.has_tablet_schema() &&
!rowset_meta.has_delete_predicate()) {
if (!rowset_meta.has_index_id() && !is_versioned_read) {
TabletIndexPB tablet_idx;
get_tablet_idx(code, msg, txn.get(), instance_id, rowset_meta.tablet_id(), tablet_idx);
Expand Down Expand Up @@ -3362,9 +3364,11 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
// get referenced schema
std::unordered_map<int32_t, doris::TabletSchemaCloudPB*> version_to_schema;
for (auto& rowset_meta : *response->mutable_rowset_meta()) {
if (rowset_meta.has_tablet_schema()) {
if (rowset_meta.has_tablet_schema() && !rowset_meta.has_delete_predicate()) {
version_to_schema.emplace(rowset_meta.tablet_schema().schema_version(),
rowset_meta.mutable_tablet_schema());
}
if (rowset_meta.has_tablet_schema()) {
rowset_meta.set_schema_version(rowset_meta.tablet_schema().schema_version());
}
rowset_meta.set_index_id(idx.index_id());
Expand Down
110 changes: 110 additions & 0 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10845,6 +10845,116 @@ TEST(MetaServiceTest, StaleCommitRowset) {
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().code();
}

TEST(MetaServiceTest, DeleteRowsetDoesNotWriteSchemaKv) {
auto meta_service = get_meta_service();

constexpr int64_t db_id = 100301;
constexpr int64_t table_id = 100302;
constexpr int64_t index_id = 100303;
constexpr int64_t partition_id = 100304;
constexpr int64_t tablet_id = 100305;
constexpr int32_t schema_version = 1;
const std::string instance_id = "delete_rowset_schema_test_instance_id";
const bool old_write_schema_kv = config::write_schema_kv;
config::write_schema_kv = true;

auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
config::write_schema_kv = old_write_schema_kv;
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
};
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();

auto build_schema = [&] {
doris::TabletSchemaCloudPB schema;
schema.set_schema_version(schema_version);
auto* column = schema.add_column();
column->set_unique_id(1);
column->set_name("c1");
column->set_type("INT");
auto* index = schema.add_index();
index->set_index_id(101);
index->set_index_name("c1_idx");
index->set_index_type(doris::INVERTED);
index->add_col_unique_id(1);
return schema;
};

ASSERT_NO_FATAL_FAILURE(
create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id));

int64_t delete_txn_id = 0;
ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, "delete_rowset_schema_label",
table_id, delete_txn_id));
CreateRowsetResponse res;
auto delete_rowset = create_rowset(delete_txn_id, tablet_id, partition_id);
delete_rowset.mutable_tablet_schema()->CopyFrom(build_schema());
delete_rowset.mutable_delete_predicate()->set_version(-1);
delete_rowset.mutable_delete_predicate()->add_sub_predicates("c1 = 1");
ASSERT_NO_FATAL_FAILURE(prepare_rowset(meta_service.get(), delete_rowset, res));
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
res.Clear();
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), delete_rowset, res));
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();

{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);

ValueBuf buf;
std::string schema_key = meta_schema_key({instance_id, index_id, schema_version});
ASSERT_EQ(blob_get(txn.get(), schema_key, &buf), TxnErrorCode::TXN_KEY_NOT_FOUND);

std::string tmp_rs_key = meta_rowset_tmp_key({instance_id, delete_txn_id, tablet_id});
std::string tmp_rs_val;
ASSERT_EQ(txn->get(tmp_rs_key, &tmp_rs_val), TxnErrorCode::TXN_OK);
doris::RowsetMetaCloudPB stored_delete_rowset;
ASSERT_TRUE(stored_delete_rowset.ParseFromString(tmp_rs_val));
ASSERT_TRUE(stored_delete_rowset.has_tablet_schema());
EXPECT_EQ(stored_delete_rowset.tablet_schema().schema_version(), schema_version);
EXPECT_EQ(stored_delete_rowset.tablet_schema().index_size(), 1);
}

int64_t load_txn_id = 0;
ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, "normal_rowset_schema_label",
table_id, load_txn_id));
auto load_rowset = create_rowset(load_txn_id, tablet_id, partition_id);
load_rowset.mutable_tablet_schema()->CopyFrom(build_schema());
res.Clear();
ASSERT_NO_FATAL_FAILURE(prepare_rowset(meta_service.get(), load_rowset, res));
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
res.Clear();
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), load_rowset, res));
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();

{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);

ValueBuf buf;
std::string schema_key = meta_schema_key({instance_id, index_id, schema_version});
ASSERT_EQ(blob_get(txn.get(), schema_key, &buf), TxnErrorCode::TXN_OK);
doris::TabletSchemaCloudPB stored_schema;
ASSERT_TRUE(buf.to_pb(&stored_schema));
EXPECT_EQ(stored_schema.schema_version(), schema_version);
EXPECT_EQ(stored_schema.index_size(), 1);

std::string tmp_rs_key = meta_rowset_tmp_key({instance_id, load_txn_id, tablet_id});
std::string tmp_rs_val;
ASSERT_EQ(txn->get(tmp_rs_key, &tmp_rs_val), TxnErrorCode::TXN_OK);
doris::RowsetMetaCloudPB stored_load_rowset;
ASSERT_TRUE(stored_load_rowset.ParseFromString(tmp_rs_val));
ASSERT_FALSE(stored_load_rowset.has_tablet_schema());
EXPECT_EQ(stored_load_rowset.schema_version(), schema_version);
}
}

TEST(MetaServiceTest, AlterObjInfoTest) {
auto meta_service = get_meta_service();

Expand Down
Loading