Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,11 @@ class MetaServiceProxy final : public MetaService {

if (!config::enable_txn_store_retry) {
(impl_.get()->*method)(ctrl, req, resp, brpc::DoNothing());
if (resp->status().code() == MetaServiceCode::KV_TXN_MAYBE_COMMITTED) {
// Keep maybe-committed as an internal retry signal only. Older proto2
// clients may treat unknown enum values as unset and fall back to OK.
resp->mutable_status()->set_code(MetaServiceCode::KV_TXN_COMMIT_ERR);
}
if (DCHECK_IS_ON()) {
MetaServiceCode code = resp->status().code();
DCHECK_NE(code, MetaServiceCode::KV_TXN_STORE_GET_RETRYABLE)
Expand All @@ -1049,6 +1054,8 @@ class MetaServiceProxy final : public MetaService {
<< "KV_TXN_STORE_COMMIT_RETRYABLE should not be sent back to client";
DCHECK_NE(code, MetaServiceCode::KV_TXN_STORE_CREATE_RETRYABLE)
<< "KV_TXN_STORE_CREATE_RETRYABLE should not be sent back to client";
DCHECK_NE(code, MetaServiceCode::KV_TXN_MAYBE_COMMITTED)
<< "KV_TXN_MAYBE_COMMITTED should not be sent back to client";
}
return;
}
Expand Down Expand Up @@ -1088,8 +1095,7 @@ class MetaServiceProxy final : public MetaService {
code == MetaServiceCode::KV_TXN_STORE_COMMIT_RETRYABLE ? KV_TXN_COMMIT_ERR
: code == MetaServiceCode::KV_TXN_STORE_GET_RETRYABLE ? KV_TXN_GET_ERR
: code == MetaServiceCode::KV_TXN_STORE_CREATE_RETRYABLE ? KV_TXN_CREATE_ERR
: code == MetaServiceCode::KV_TXN_MAYBE_COMMITTED
? MetaServiceCode::KV_TXN_MAYBE_COMMITTED
: code == MetaServiceCode::KV_TXN_MAYBE_COMMITTED ? KV_TXN_COMMIT_ERR
: code == MetaServiceCode::KV_TXN_CONFLICT
? KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES
: MetaServiceCode::KV_TXN_TOO_OLD);
Expand Down
147 changes: 145 additions & 2 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8458,7 +8458,111 @@ TEST(MetaServiceTxnStoreRetryableTest, DoNotReturnRetryableCode) {
config::txn_store_retry_times = retry_times;
}

TEST(MetaServiceTxnStoreRetryableTest, RetryMaybeCommittedCode) {
TEST(MetaServiceTxnStoreRetryableTest, CastAsPreservesMaybeCommittedForProxyRetry) {
bool enable_retry = config::enable_txn_store_retry;
DORIS_CLOUD_DEFER {
config::enable_txn_store_retry = enable_retry;
};

config::enable_txn_store_retry = false;
ASSERT_EQ(cast_as<ErrCategory::COMMIT>(TxnErrorCode::TXN_MAYBE_COMMITTED),
MetaServiceCode::KV_TXN_MAYBE_COMMITTED);
ASSERT_EQ(cast_as<ErrCategory::READ>(TxnErrorCode::TXN_RETRYABLE_NOT_COMMITTED),
MetaServiceCode::KV_TXN_MAYBE_COMMITTED);
ASSERT_EQ(cast_as<ErrCategory::CREATE>(TxnErrorCode::TXN_RETRYABLE_NOT_COMMITTED),
MetaServiceCode::KV_TXN_MAYBE_COMMITTED);

config::enable_txn_store_retry = true;
ASSERT_EQ(cast_as<ErrCategory::READ>(TxnErrorCode::TXN_RETRYABLE_NOT_COMMITTED),
MetaServiceCode::KV_TXN_STORE_GET_RETRYABLE);
ASSERT_EQ(cast_as<ErrCategory::COMMIT>(TxnErrorCode::TXN_RETRYABLE_NOT_COMMITTED),
MetaServiceCode::KV_TXN_STORE_COMMIT_RETRYABLE);
ASSERT_EQ(cast_as<ErrCategory::CREATE>(TxnErrorCode::TXN_RETRYABLE_NOT_COMMITTED),
MetaServiceCode::KV_TXN_STORE_COMMIT_RETRYABLE);
ASSERT_EQ(cast_as<ErrCategory::READ>(TxnErrorCode::TXN_TIMEOUT),
MetaServiceCode::KV_TXN_GET_ERR);
ASSERT_EQ(cast_as<ErrCategory::CREATE>(TxnErrorCode::TXN_TIMEOUT),
MetaServiceCode::KV_TXN_CREATE_ERR);
}

TEST(MetaServiceTxnStoreRetryableTest, MaybeCommittedCodeWithoutRetryReturnsCommitErr) {
size_t index = 0;
SyncPoint::get_instance()->set_call_back("update_delete_bitmap:commit:err", [&](auto&& args) {
++index;
*doris::try_any_cast<TxnErrorCode*>(args[2]) = TxnErrorCode::TXN_MAYBE_COMMITTED;
});
SyncPoint::get_instance()->enable_processing();
bool enable_retry = config::enable_txn_store_retry;
int64_t max_txn_commit_byte = config::max_txn_commit_byte;
config::enable_txn_store_retry = false;
config::max_txn_commit_byte = 1;

auto service = get_meta_service();

brpc::Controller cntl;
UpdateDeleteBitmapRequest req;
UpdateDeleteBitmapResponse resp;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_table_id(100);
req.set_partition_id(123);
req.set_lock_id(-3);
req.set_without_lock(true);
req.set_initiator(-1);
req.set_tablet_id(333);
req.add_rowset_ids("r1");
req.add_segment_ids(0);
req.add_versions(2);
req.add_segment_delete_bitmaps("abc");

service->update_delete_bitmap(&cntl, &req, &resp, nullptr);

ASSERT_EQ(resp.status().code(), MetaServiceCode::KV_TXN_COMMIT_ERR)
<< " status is " << resp.status().msg() << ", code=" << resp.status().code();
EXPECT_EQ(index, 1);

SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
config::enable_txn_store_retry = enable_retry;
config::max_txn_commit_byte = max_txn_commit_byte;
}

TEST(MetaServiceTxnStoreRetryableTest, ReadMaybeCommittedCodeWithoutRetryReturnsCommitErr) {
size_t index = 0;
auto* sync_point = SyncPoint::get_instance();
sync_point->set_call_back("get_version_code", [&](auto&& args) {
++index;
*doris::try_any_cast<MetaServiceCode*>(args[0]) = MetaServiceCode::KV_TXN_MAYBE_COMMITTED;
});
sync_point->enable_processing();
bool enable_retry = config::enable_txn_store_retry;
DORIS_CLOUD_DEFER {
config::enable_txn_store_retry = enable_retry;
sync_point->disable_processing();
sync_point->clear_all_call_backs();
};
config::enable_txn_store_retry = false;

auto service = get_meta_service();
create_tablet(service.get(), 1, 1, 1, 1);
insert_rowset(service.get(), 1, "read_maybe_committed_without_retry", 1, 1, 1);

brpc::Controller ctrl;
GetVersionRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_db_id(1);
req.set_table_id(1);
req.set_partition_id(1);

GetVersionResponse resp;
service->get_version(&ctrl, &req, &resp, nullptr);

ASSERT_EQ(resp.status().code(), MetaServiceCode::KV_TXN_COMMIT_ERR)
<< " status is " << resp.status().msg() << ", code=" << resp.status().code();
EXPECT_EQ(resp.version(), 2);
EXPECT_EQ(index, 1);
}

TEST(MetaServiceTxnStoreRetryableTest, RetryMaybeCommittedCodeReturnsCommitErr) {
size_t index = 0;
SyncPoint::get_instance()->set_call_back("update_delete_bitmap:commit:err", [&](auto&& args) {
++index;
Expand Down Expand Up @@ -8491,7 +8595,7 @@ TEST(MetaServiceTxnStoreRetryableTest, RetryMaybeCommittedCode) {

service->update_delete_bitmap(&cntl, &req, &resp, nullptr);

ASSERT_EQ(resp.status().code(), MetaServiceCode::KV_TXN_MAYBE_COMMITTED)
ASSERT_EQ(resp.status().code(), MetaServiceCode::KV_TXN_COMMIT_ERR)
<< " status is " << resp.status().msg() << ", code=" << resp.status().code();
EXPECT_GE(index, static_cast<size_t>(config::txn_store_retry_times + 1));

Expand All @@ -8502,6 +8606,45 @@ TEST(MetaServiceTxnStoreRetryableTest, RetryMaybeCommittedCode) {
config::max_txn_commit_byte = max_txn_commit_byte;
}

TEST(MetaServiceTxnStoreRetryableTest, RetryReadMaybeCommittedCodeReturnsCommitErr) {
size_t index = 0;
auto* sync_point = SyncPoint::get_instance();
sync_point->set_call_back("get_version_code", [&](auto&& args) {
++index;
*doris::try_any_cast<MetaServiceCode*>(args[0]) = MetaServiceCode::KV_TXN_MAYBE_COMMITTED;
});
sync_point->enable_processing();
int32_t retry_times = config::txn_store_retry_times;
bool enable_retry = config::enable_txn_store_retry;
DORIS_CLOUD_DEFER {
config::txn_store_retry_times = retry_times;
config::enable_txn_store_retry = enable_retry;
sync_point->disable_processing();
sync_point->clear_all_call_backs();
};
config::txn_store_retry_times = 2;
config::enable_txn_store_retry = true;

auto service = get_meta_service();
create_tablet(service.get(), 1, 1, 1, 1);
insert_rowset(service.get(), 1, "retry_read_maybe_committed", 1, 1, 1);

brpc::Controller ctrl;
GetVersionRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_db_id(1);
req.set_table_id(1);
req.set_partition_id(1);

GetVersionResponse resp;
service->get_version(&ctrl, &req, &resp, nullptr);

ASSERT_EQ(resp.status().code(), MetaServiceCode::KV_TXN_COMMIT_ERR)
<< " status is " << resp.status().msg() << ", code=" << resp.status().code();
EXPECT_EQ(resp.version(), 2);
EXPECT_GE(index, static_cast<size_t>(config::txn_store_retry_times + 1));
}

TEST(MetaServiceTest, GetClusterStatusTest) {
auto meta_service = get_meta_service();

Expand Down
9 changes: 9 additions & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1799,6 +1799,10 @@ enum MetaServiceCode {
KV_TXN_STORE_COMMIT_RETRYABLE = 1009;
KV_TXN_STORE_CREATE_RETRYABLE = 1010;
KV_TXN_TOO_OLD = 1011;
// WARNING: KV_TXN_MAYBE_COMMITTED is NOT returned to clients. It is kept as an
// internal retry signal inside MetaServiceProxy::call_impl(), then downgraded to
// KV_TXN_COMMIT_ERR before the response is sent back. Older BE/FE versions do not
// recognize this enum value, and proto2 would otherwise fall back to OK (= 0).
KV_TXN_MAYBE_COMMITTED = 1012;

//Doris error
Expand Down Expand Up @@ -1862,6 +1866,11 @@ enum MetaServiceCode {

SCHEMA_DICT_NOT_FOUND = 11001;

// WARNING: Before adding a new MetaServiceCode, consider backward compatibility.
// This is a proto2 optional enum field. If an older client receives an unrecognized
// enum value, the field is treated as unset and defaults to OK (= 0), silently
// turning errors into success. Any new error code that may be sent to older clients
// MUST be downgraded to a legacy code before the response is sent to the client.
UNDEFINED_ERR = 1000000;
}

Expand Down
Loading