diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index 1c511488673369..ed36b54322cde0 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -325,6 +325,96 @@ static int find_cascade_instances(TxnKv* txn_kv, const std::string& root_instanc return 0; } +static int find_storage_vault_position_by_id(const InstanceInfoPB& instance, + std::string_view vault_id) { + auto id_itr = + std::find(instance.resource_ids().begin(), instance.resource_ids().end(), vault_id); + if (id_itr == instance.resource_ids().end()) { + return -1; + } + return static_cast(id_itr - instance.resource_ids().begin()); +} + +static int find_storage_vault_id_by_name(const InstanceInfoPB& instance, + std::string_view vault_name, std::string* vault_id) { + auto name_itr = std::find_if( + instance.storage_vault_names().begin(), instance.storage_vault_names().end(), + [&](const auto& current_name) { return current_name == vault_name; }); + if (name_itr == instance.storage_vault_names().end()) { + return -1; + } + int pos = static_cast(name_itr - instance.storage_vault_names().begin()); + *vault_id = instance.resource_ids().Get(pos); + return 0; +} + +static int alter_instance_obj_store_info_by_id(InstanceInfoPB& instance, + std::string_view target_obj_id, std::string_view ak, + std::string_view sk, std::string_view role_arn, + std::string_view external_id, + const EncryptionInfoPB& encryption_info, + MetaServiceCode& code, std::string& msg) { + auto& obj_info = const_cast&>(instance.obj_info()); + for (auto& it : obj_info) { + if (it.id() != target_obj_id) { + continue; + } + + if (role_arn.empty()) { + if (it.ak() == ak && it.sk() == sk) { + code = MetaServiceCode::OK; + msg = "ak/sk not changed"; + return 1; + } + it.clear_role_arn(); + it.clear_external_id(); + it.clear_cred_provider_type(); + + it.set_ak(std::string(ak)); + it.set_sk(std::string(sk)); + it.mutable_encryption_info()->CopyFrom(encryption_info); + } else { + if (!ak.empty() || !sk.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "invaild argument, both set ak/sk and role_arn is not allowed"; + LOG(INFO) << msg; + return -1; + } + + if (it.provider() != ObjectStoreInfoPB::S3) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "role_arn is only supported for s3 provider"; + LOG(INFO) << msg << " provider=" << it.provider(); + return -1; + } + + if (it.role_arn() == role_arn && it.external_id() == external_id) { + code = MetaServiceCode::OK; + msg = "ak/sk not changed"; + return 1; + } + it.clear_ak(); + it.clear_sk(); + it.clear_encryption_info(); + + it.set_role_arn(std::string(role_arn)); + it.set_external_id(std::string(external_id)); + it.set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE); + } + + auto now_time = std::chrono::system_clock::now(); + uint64_t time = + std::chrono::duration_cast(now_time.time_since_epoch()) + .count(); + it.set_mtime(time); + return 0; + } + + code = MetaServiceCode::INVALID_ARGUMENT; + msg = fmt::format("obj info id={} not found", target_obj_id); + return -1; +} + // Helper function to update AK/SK for a single instance // Returns 0 on success, -1 on error static int update_instance_ak_sk(InstanceInfoPB& instance, const UpdateAkSkRequest* request, @@ -805,9 +895,11 @@ static bool vault_exist(const InstanceInfoPB& instance, const std::string& new_v return false; } -static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr& txn, - const StorageVaultPB& vault, MetaServiceCode& code, - std::string& msg, AlterObjStoreInfoResponse* response) { +static int alter_hdfs_storage_vault_by_id(InstanceInfoPB& instance, + std::unique_ptr& txn, + std::string_view target_vault_id, + const StorageVaultPB& vault, MetaServiceCode& code, + std::string& msg, AlterObjStoreInfoResponse* response) { if (!vault.has_hdfs_info()) { code = MetaServiceCode::INVALID_ARGUMENT; std::stringstream ss; @@ -825,19 +917,25 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptrbegin(), - instance.mutable_storage_vault_names()->end(), - [&](const auto& vault_name) { return name == vault_name; }); - if (name_itr == instance.storage_vault_names().end()) { + int pos = find_storage_vault_position_by_id(instance, target_vault_id); + if (pos < 0) { code = MetaServiceCode::INVALID_ARGUMENT; std::stringstream ss; - ss << "invalid storage vault name, not found, name =" << name; + ss << "invalid storage vault id, not found, id =" << target_vault_id; msg = ss.str(); return -1; } - auto pos = name_itr - instance.storage_vault_names().begin(); - std::string vault_id = instance.resource_ids().begin()[pos]; + auto* storage_vault_names = instance.mutable_storage_vault_names(); + auto* name_ptr = storage_vault_names->Mutable(pos); + DCHECK(name_ptr != nullptr); + const std::string old_name = *name_ptr; + if (old_name != name) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = fmt::format("storage vault id={} name mismatch, expected={}, actual={}", + target_vault_id, name, old_name); + return -1; + } + std::string vault_id = instance.resource_ids().Get(pos); auto vault_key = storage_vault_key({instance.instance_id(), vault_id}); std::string val; @@ -881,7 +979,10 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr& txn, - const StorageVaultPB& vault, MetaServiceCode& code, - std::string& msg, AlterObjStoreInfoResponse* response) { +static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr& txn, + const StorageVaultPB& vault, MetaServiceCode& code, + std::string& msg, AlterObjStoreInfoResponse* response) { + std::string vault_id; + if (find_storage_vault_id_by_name(instance, vault.name(), &vault_id) != 0) { + code = MetaServiceCode::INVALID_ARGUMENT; + std::stringstream ss; + ss << "invalid storage vault name, not found, name =" << vault.name(); + msg = ss.str(); + return -1; + } + return alter_hdfs_storage_vault_by_id(instance, txn, vault_id, vault, code, msg, response); +} + +static int alter_s3_storage_vault_by_id(InstanceInfoPB& instance, std::unique_ptr& txn, + std::string_view target_vault_id, + const StorageVaultPB& vault, MetaServiceCode& code, + std::string& msg, AlterObjStoreInfoResponse* response) { if (!vault.has_obj_info()) { code = MetaServiceCode::INVALID_ARGUMENT; std::stringstream ss; @@ -939,19 +1055,25 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptrbegin(), - instance.mutable_storage_vault_names()->end(), - [&](const auto& vault_name) { return name == vault_name; }); - if (name_itr == instance.storage_vault_names().end()) { + int pos = find_storage_vault_position_by_id(instance, target_vault_id); + if (pos < 0) { code = MetaServiceCode::INVALID_ARGUMENT; std::stringstream ss; - ss << "invalid storage vault name, not found, name =" << name; + ss << "invalid storage vault id, not found, id =" << target_vault_id; msg = ss.str(); return -1; } - auto pos = name_itr - instance.storage_vault_names().begin(); - std::string vault_id = instance.resource_ids().begin()[pos]; + auto* storage_vault_names = instance.mutable_storage_vault_names(); + auto* name_ptr = storage_vault_names->Mutable(pos); + DCHECK(name_ptr != nullptr); + const std::string old_name = *name_ptr; + if (old_name != name) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = fmt::format("storage vault id={} name mismatch, expected={}, actual={}", + target_vault_id, name, old_name); + return -1; + } + std::string vault_id = instance.resource_ids().Get(pos); auto vault_key = storage_vault_key({instance.instance_id(), vault_id}); std::string val; @@ -995,7 +1117,10 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr& txn, + const StorageVaultPB& vault, MetaServiceCode& code, + std::string& msg, AlterObjStoreInfoResponse* response) { + std::string vault_id; + if (find_storage_vault_id_by_name(instance, vault.name(), &vault_id) != 0) { + code = MetaServiceCode::INVALID_ARGUMENT; + std::stringstream ss; + ss << "invalid storage vault name, not found, name =" << vault.name(); + msg = ss.str(); + return -1; + } + return alter_s3_storage_vault_by_id(instance, txn, vault_id, vault, code, msg, response); +} + struct ObjectStorageDesc { std::string& ak; std::string& sk; @@ -1307,6 +1446,17 @@ void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr return; } + bool supports_cascade = request->op() == AlterObjStoreInfoRequest::ALTER_S3_VAULT || + request->op() == AlterObjStoreInfoRequest::ALTER_HDFS_VAULT; + std::string root_vault_id; + if (supports_cascade && + find_storage_vault_id_by_name(instance, request->vault().name(), &root_vault_id) != 0) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = fmt::format("invalid storage vault name, not found, name ={}", + request->vault().name()); + return; + } + switch (request->op()) { case AlterObjStoreInfoRequest::ADD_S3_VAULT: { if (!instance.enable_storage_vault()) { @@ -1494,6 +1644,115 @@ void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr code = cast_as(err); msg = fmt::format("failed to commit kv txn, err={}", err); LOG(WARNING) << msg; + return; + } + + async_notify_refresh_instance(txn_kv_, instance_id, true); + + if (!supports_cascade) { + return; + } + + if (!instance.has_snapshot_switch_status() || + instance.snapshot_switch_status() == SNAPSHOT_SWITCH_DISABLED) { + LOG(INFO) << "snapshot disabled for instance_id=" << instance_id + << ", skip cascade updating derived instances after alter_storage_vault"; + return; + } + + std::vector cascade_instance_ids; + if (find_cascade_instances(txn_kv_.get(), instance_id, &cascade_instance_ids) != 0) { + LOG(WARNING) << "failed to find derived instances for storage vault cascade, instance_id=" + << instance_id; + return; + } + + for (const auto& cascade_id : cascade_instance_ids) { + std::unique_ptr cascade_txn; + TxnErrorCode cascade_err = txn_kv_->create_txn(&cascade_txn); + if (cascade_err != TxnErrorCode::TXN_OK) { + code = cast_as(cascade_err); + msg = fmt::format( + "failed to create txn for derived storage vault update, instance_id={}, " + "err={}", + cascade_id, cascade_err); + LOG(WARNING) << msg; + return; + } + + std::string cascade_key; + std::string cascade_val; + instance_key({cascade_id}, &cascade_key); + cascade_err = cascade_txn->get(cascade_key, &cascade_val); + if (cascade_err != TxnErrorCode::TXN_OK) { + code = cast_as(cascade_err); + msg = fmt::format( + "failed to get derived instance for storage vault update, instance_id={}, " + "err={}", + cascade_id, cascade_err); + LOG(WARNING) << msg; + return; + } + + InstanceInfoPB cascade_instance; + if (!cascade_instance.ParseFromString(cascade_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = fmt::format( + "failed to parse derived InstanceInfoPB for storage vault update, " + "instance_id={}", + cascade_id); + LOG(WARNING) << msg; + return; + } + + MetaServiceCode cascade_code = MetaServiceCode::OK; + std::string cascade_msg; + AlterObjStoreInfoResponse cascade_response; + int ret = -1; + if (request->op() == AlterObjStoreInfoRequest::ALTER_S3_VAULT) { + ret = alter_s3_storage_vault_by_id(cascade_instance, cascade_txn, root_vault_id, + request->vault(), cascade_code, cascade_msg, + &cascade_response); + } else if (request->op() == AlterObjStoreInfoRequest::ALTER_HDFS_VAULT) { + ret = alter_hdfs_storage_vault_by_id(cascade_instance, cascade_txn, root_vault_id, + request->vault(), cascade_code, cascade_msg, + &cascade_response); + } + if (ret != 0) { + code = cascade_code; + msg = fmt::format( + "failed to cascade storage vault update, instance_id={}, vault_id={}, msg={}", + cascade_id, root_vault_id, cascade_msg); + LOG(WARNING) << msg << " code=" << static_cast(code); + return; + } + + cascade_val = cascade_instance.SerializeAsString(); + if (cascade_val.empty()) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + msg = fmt::format( + "failed to serialize derived instance after storage vault update, " + "instance_id={}", + cascade_id); + LOG(WARNING) << msg; + return; + } + + cascade_txn->atomic_add(system_meta_service_instance_update_key(), 1); + cascade_txn->put(cascade_key, cascade_val); + cascade_err = cascade_txn->commit(); + if (cascade_err != TxnErrorCode::TXN_OK) { + code = cast_as(cascade_err); + msg = fmt::format( + "failed to commit derived storage vault update, instance_id={}, err={}", + cascade_id, cascade_err); + LOG(WARNING) << msg; + return; + } + + async_notify_refresh_instance(txn_kv_, cascade_id, true); + LOG(INFO) << "cascade storage vault update finished, root_instance_id=" << instance_id + << " derived_instance_id=" << cascade_id << " vault_id=" << root_vault_id; } } @@ -1585,73 +1844,28 @@ void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont return; } + bool supports_cascade = request->op() == AlterObjStoreInfoRequest::ALTER_OBJ_INFO || + request->op() == AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK; + std::string root_obj_id = + request->has_obj() && request->obj().has_id() ? request->obj().id() : "0"; + switch (request->op()) { case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK: case AlterObjStoreInfoRequest::ALTER_OBJ_INFO: { - // get id - std::string id = request->obj().has_id() ? request->obj().id() : "0"; - int idx = std::stoi(id); + int idx = std::stoi(root_obj_id); if (idx < 1 || idx > instance.obj_info().size()) { // err code = MetaServiceCode::INVALID_ARGUMENT; msg = "id invalid, please check it"; return; } - auto& obj_info = - const_cast&>(instance.obj_info()); - for (auto& it : obj_info) { - if (std::stoi(it.id()) == idx) { - if (role_arn.empty()) { - if (it.ak() == ak && it.sk() == sk) { - // not change, just return ok - code = MetaServiceCode::OK; - msg = "ak/sk not changed"; - return; - } - it.clear_role_arn(); - it.clear_external_id(); - it.clear_cred_provider_type(); - - it.set_ak(ak); - it.set_sk(sk); - it.mutable_encryption_info()->CopyFrom(encryption_info); - } else { - if (!ak.empty() || !sk.empty()) { - code = MetaServiceCode::INVALID_ARGUMENT; - msg = "invaild argument, both set ak/sk and role_arn is not allowed"; - LOG(INFO) << msg; - return; - } - - if (it.provider() != ObjectStoreInfoPB::S3) { - code = MetaServiceCode::INVALID_ARGUMENT; - msg = "role_arn is only supported for s3 provider"; - LOG(INFO) << msg << " provider=" << it.provider(); - return; - } - - if (it.role_arn() == role_arn && it.external_id() == external_id && - get_cred_provider_type(it) == get_cred_provider_type(request->obj())) { - // not change, just return ok - code = MetaServiceCode::OK; - msg = "ak/sk not changed"; - return; - } - it.clear_ak(); - it.clear_sk(); - it.clear_encryption_info(); - - it.set_role_arn(role_arn); - it.set_external_id(external_id); - it.set_cred_provider_type(get_cred_provider_type(request->obj())); - } - - auto now_time = std::chrono::system_clock::now(); - uint64_t time = std::chrono::duration_cast( - now_time.time_since_epoch()) - .count(); - it.set_mtime(time); + int ret = alter_instance_obj_store_info_by_id(instance, root_obj_id, ak, sk, role_arn, + external_id, encryption_info, code, msg); + if (ret != 0) { + if (ret > 0) { + return; } + return; } } break; case AlterObjStoreInfoRequest::ADD_OBJ_INFO: { @@ -1727,6 +1941,105 @@ void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont code = cast_as(err); msg = fmt::format("failed to commit kv txn, err={}", err); LOG(WARNING) << msg; + return; + } + + async_notify_refresh_instance(txn_kv_, instance_id, true); + + if (!supports_cascade) { + return; + } + + if (!instance.has_snapshot_switch_status() || + instance.snapshot_switch_status() == SNAPSHOT_SWITCH_DISABLED) { + LOG(INFO) << "snapshot disabled for instance_id=" << instance_id + << ", skip cascade updating derived instances after alter_obj_store_info"; + return; + } + + std::vector cascade_instance_ids; + if (find_cascade_instances(txn_kv_.get(), instance_id, &cascade_instance_ids) != 0) { + LOG(WARNING) << "failed to find derived instances for obj store cascade, instance_id=" + << instance_id; + return; + } + + for (const auto& cascade_id : cascade_instance_ids) { + std::unique_ptr cascade_txn; + TxnErrorCode cascade_err = txn_kv_->create_txn(&cascade_txn); + if (cascade_err != TxnErrorCode::TXN_OK) { + code = cast_as(cascade_err); + msg = fmt::format( + "failed to create txn for derived obj store update, instance_id={}, err={}", + cascade_id, cascade_err); + LOG(WARNING) << msg; + return; + } + + std::string cascade_key; + std::string cascade_val; + instance_key({cascade_id}, &cascade_key); + cascade_err = cascade_txn->get(cascade_key, &cascade_val); + if (cascade_err != TxnErrorCode::TXN_OK) { + code = cast_as(cascade_err); + msg = fmt::format( + "failed to get derived instance for obj store update, instance_id={}, err={}", + cascade_id, cascade_err); + LOG(WARNING) << msg; + return; + } + + InstanceInfoPB cascade_instance; + if (!cascade_instance.ParseFromString(cascade_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = fmt::format( + "failed to parse derived InstanceInfoPB for obj store update, instance_id={}", + cascade_id); + LOG(WARNING) << msg; + return; + } + + MetaServiceCode cascade_code = MetaServiceCode::OK; + std::string cascade_msg; + int ret = alter_instance_obj_store_info_by_id(cascade_instance, root_obj_id, ak, sk, + role_arn, external_id, encryption_info, + cascade_code, cascade_msg); + if (ret != 0) { + if (ret < 0) { + code = cascade_code; + msg = fmt::format( + "failed to cascade obj store update, instance_id={}, obj_info_id={}, " + "msg={}", + cascade_id, root_obj_id, cascade_msg); + LOG(WARNING) << msg << " code=" << static_cast(code); + return; + } + } + + cascade_val = cascade_instance.SerializeAsString(); + if (cascade_val.empty()) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + msg = fmt::format( + "failed to serialize derived instance after obj store update, instance_id={}", + cascade_id); + LOG(WARNING) << msg; + return; + } + + cascade_txn->atomic_add(system_meta_service_instance_update_key(), 1); + cascade_txn->put(cascade_key, cascade_val); + cascade_err = cascade_txn->commit(); + if (cascade_err != TxnErrorCode::TXN_OK) { + code = cast_as(cascade_err); + msg = fmt::format("failed to commit derived obj store update, instance_id={}, err={}", + cascade_id, cascade_err); + LOG(WARNING) << msg; + return; + } + + async_notify_refresh_instance(txn_kv_, cascade_id, true); + LOG(INFO) << "cascade obj store update finished, root_instance_id=" << instance_id + << " derived_instance_id=" << cascade_id << " obj_info_id=" << root_obj_id; } } @@ -1849,9 +2162,11 @@ void MetaServiceImpl::update_ak_sk(google::protobuf::RpcController* controller, std::unique_ptr cascade_txn; TxnErrorCode cascade_err = txn_kv_->create_txn(&cascade_txn); if (cascade_err != TxnErrorCode::TXN_OK) { - LOG(WARNING) << "failed to create txn for derived instance, instance_id=" << cascade_id - << " err=" << cascade_err; - continue; + code = cast_as(cascade_err); + msg = fmt::format("failed to create txn for derived instance, instance_id={}, err={}", + cascade_id, cascade_err); + LOG(WARNING) << msg; + return; } InstanceKeyInfo cascade_key_info {cascade_id}; @@ -1861,15 +2176,20 @@ void MetaServiceImpl::update_ak_sk(google::protobuf::RpcController* controller, cascade_err = cascade_txn->get(cascade_key, &cascade_val); if (cascade_err != TxnErrorCode::TXN_OK) { - LOG(WARNING) << "failed to get derived instance, instance_id=" << cascade_id - << " err=" << cascade_err; - continue; + code = cast_as(cascade_err); + msg = fmt::format("failed to get derived instance, instance_id={}, err={}", cascade_id, + cascade_err); + LOG(WARNING) << msg; + return; } InstanceInfoPB cascade_instance; if (!cascade_instance.ParseFromString(cascade_val)) { - LOG(WARNING) << "failed to parse InstanceInfoPB for derived instance_id=" << cascade_id; - continue; + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = fmt::format("failed to parse InstanceInfoPB for derived instance_id={}", + cascade_id); + LOG(WARNING) << msg; + return; } // Update the cascade instance using helper function @@ -1878,24 +2198,30 @@ void MetaServiceImpl::update_ak_sk(google::protobuf::RpcController* controller, std::string cascade_msg; if (update_instance_ak_sk(cascade_instance, request, time, cascade_code, cascade_msg, cascade_update_record) != 0) { - LOG(WARNING) << "failed to update derived instance, instance_id=" << cascade_id - << " msg=" << cascade_msg; - continue; + code = cascade_code; + msg = fmt::format("failed to update derived instance, instance_id={}, msg={}", + cascade_id, cascade_msg); + LOG(WARNING) << msg << " code=" << static_cast(code); + return; } cascade_val = cascade_instance.SerializeAsString(); if (cascade_val.empty()) { - LOG(WARNING) << "failed to serialize derived instance, instance_id=" << cascade_id; - continue; + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + msg = fmt::format("failed to serialize derived instance, instance_id={}", cascade_id); + LOG(WARNING) << msg; + return; } cascade_txn->put(cascade_key, cascade_val); cascade_err = cascade_txn->commit(); if (cascade_err != TxnErrorCode::TXN_OK) { - LOG(WARNING) << "failed to commit derived instance txn, instance_id=" << cascade_id - << " err=" << cascade_err; - continue; + code = cast_as(cascade_err); + msg = fmt::format("failed to commit derived instance txn, instance_id={}, err={}", + cascade_id, cascade_err); + LOG(WARNING) << msg; + return; } async_notify_refresh_instance(txn_kv_, cascade_id, true); diff --git a/cloud/test/resource_test.cpp b/cloud/test/resource_test.cpp index ee3f8598df8be3..2c02d9bfccfab2 100644 --- a/cloud/test/resource_test.cpp +++ b/cloud/test/resource_test.cpp @@ -567,6 +567,84 @@ static void verify_instance_aksk(MetaServiceProxy* meta_service, const std::stri EXPECT_EQ(instance.obj_info(0).sk(), expected_sk); } +static void create_instance_with_storage_vault(MetaServiceProxy* meta_service, + const std::string& instance_id, + const std::string& source_instance_id, + const std::string& vault_id, + const std::string& vault_name, const std::string& ak, + const std::string& sk, bool enable_snapshot = true) { + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + instance.set_enable_storage_vault(true); + instance.add_resource_ids(vault_id); + instance.add_storage_vault_names(vault_name); + instance.set_default_storage_vault_id(vault_id); + instance.set_default_storage_vault_name(vault_name); + + std::optional snapshot_version; + if (!source_instance_id.empty()) { + instance.set_source_instance_id(source_instance_id); + snapshot_version = next_test_snapshot_versionstamp(); + instance.set_source_snapshot_id(snapshot_version->to_string()); + } + instance.set_snapshot_switch_status(enable_snapshot ? SNAPSHOT_SWITCH_ON + : SNAPSHOT_SWITCH_DISABLED); + + StorageVaultPB vault; + vault.set_id(vault_id); + vault.set_name(vault_name); + auto* obj_info = vault.mutable_obj_info(); + obj_info->set_id(vault_id); + obj_info->set_ak(ak); + obj_info->set_sk(sk); + obj_info->set_bucket("bucket"); + obj_info->set_prefix("prefix"); + obj_info->set_endpoint("endpoint"); + obj_info->set_external_endpoint("external-endpoint"); + obj_info->set_region("region"); + obj_info->set_provider(ObjectStoreInfoPB::S3); + + txn->put(instance_key({instance_id}), instance.SerializeAsString()); + txn->put(storage_vault_key({instance_id, vault_id}), vault.SerializeAsString()); + + if (snapshot_version.has_value()) { + versioned::SnapshotReferenceKeyInfo ref_key_info {source_instance_id, *snapshot_version, + instance_id}; + txn->put(versioned::snapshot_reference_key(ref_key_info), ""); + } + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); +} + +static void verify_storage_vault(MetaServiceProxy* meta_service, const std::string& instance_id, + const std::string& vault_id, const std::string& expected_name, + const std::string& expected_ak, const std::string& expected_sk) { + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + + std::string val; + ASSERT_EQ(txn->get(storage_vault_key({instance_id, vault_id}), &val), TxnErrorCode::TXN_OK); + + StorageVaultPB vault; + ASSERT_TRUE(vault.ParseFromString(val)); + ASSERT_TRUE(vault.has_obj_info()); + EXPECT_EQ(vault.name(), expected_name); + EXPECT_EQ(vault.obj_info().ak(), expected_ak); + EXPECT_EQ(vault.obj_info().sk(), expected_sk); + + ASSERT_EQ(txn->get(instance_key({instance_id}), &val), TxnErrorCode::TXN_OK); + InstanceInfoPB instance; + ASSERT_TRUE(instance.ParseFromString(val)); + ASSERT_EQ(instance.resource_ids_size(), 1); + ASSERT_EQ(instance.storage_vault_names_size(), 1); + EXPECT_EQ(instance.resource_ids(0), vault_id); + EXPECT_EQ(instance.storage_vault_names(0), expected_name); + EXPECT_EQ(instance.default_storage_vault_id(), vault_id); + EXPECT_EQ(instance.default_storage_vault_name(), expected_name); +} + // Test AK/SK cascade update: two-level cascade TEST(AkSkCascadeTest, TwoLevelCascade) { auto meta_service = get_meta_service(); @@ -856,6 +934,94 @@ TEST(AkSkCascadeTest, ChildWithoutObjInfo) { sp->clear_all_call_backs(); } +TEST(StorageVaultCascadeTest, AlterS3VaultCascadesToDerivedInstances) { + auto meta_service = get_meta_service(); + + auto sp = SyncPoint::get_instance(); + sp->enable_processing(); + sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { + auto* ret = try_any_cast(args[0]); + *ret = 0; + auto* key = try_any_cast(args[1]); + *key = "selectdbselectdbselectdbselectdb"; + auto* key_id = try_any_cast(args[2]); + *key_id = 1; + }); + + create_instance_with_storage_vault(meta_service.get(), "vault_parent", "", "2", "vault_old", + "old_ak", "old_sk"); + create_instance_with_storage_vault(meta_service.get(), "vault_child", "vault_parent", "2", + "vault_old", "old_ak", "old_sk"); + + AlterObjStoreInfoRequest req; + req.set_cloud_unique_id("1:vault_parent:test"); + req.set_op(AlterObjStoreInfoRequest::ALTER_S3_VAULT); + StorageVaultPB vault; + vault.set_name("vault_old"); + vault.set_alter_name("vault_new"); + vault.mutable_obj_info()->set_ak("new_ak"); + vault.mutable_obj_info()->set_sk("new_sk"); + req.mutable_vault()->CopyFrom(vault); + + brpc::Controller cntl; + AlterObjStoreInfoResponse res; + meta_service->alter_storage_vault(&cntl, &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + + std::string cipher_sk = "HNAGUf23voYuuqV2BCX9Tw=="; + verify_storage_vault(meta_service.get(), "vault_parent", "2", "vault_new", "new_ak", cipher_sk); + verify_storage_vault(meta_service.get(), "vault_child", "2", "vault_new", "new_ak", cipher_sk); + + sp->disable_processing(); + sp->clear_all_call_backs(); +} + +TEST(StorageVaultCascadeTest, SnapshotDisabledSkipsCascade) { + auto meta_service = get_meta_service(); + + auto sp = SyncPoint::get_instance(); + sp->enable_processing(); + sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { + auto* ret = try_any_cast(args[0]); + *ret = 0; + auto* key = try_any_cast(args[1]); + *key = "selectdbselectdbselectdbselectdb"; + auto* key_id = try_any_cast(args[2]); + *key_id = 1; + }); + + create_instance_with_storage_vault(meta_service.get(), "vault_parent_disabled", "", "2", + "vault_old", "old_ak", "old_sk", + /*enable_snapshot=*/false); + create_instance_with_storage_vault(meta_service.get(), "vault_child_disabled", + "vault_parent_disabled", "2", "vault_old", "old_ak", + "old_sk"); + + AlterObjStoreInfoRequest req; + req.set_cloud_unique_id("1:vault_parent_disabled:test"); + req.set_op(AlterObjStoreInfoRequest::ALTER_S3_VAULT); + StorageVaultPB vault; + vault.set_name("vault_old"); + vault.set_alter_name("vault_new"); + vault.mutable_obj_info()->set_ak("new_ak"); + vault.mutable_obj_info()->set_sk("new_sk"); + req.mutable_vault()->CopyFrom(vault); + + brpc::Controller cntl; + AlterObjStoreInfoResponse res; + meta_service->alter_storage_vault(&cntl, &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + + std::string cipher_sk = "HNAGUf23voYuuqV2BCX9Tw=="; + verify_storage_vault(meta_service.get(), "vault_parent_disabled", "2", "vault_new", "new_ak", + cipher_sk); + verify_storage_vault(meta_service.get(), "vault_child_disabled", "2", "vault_old", "old_ak", + "old_sk"); + + sp->disable_processing(); + sp->clear_all_call_backs(); +} + TEST(ResourceTest, RollbackInstance) { auto sp = SyncPoint::get_instance(); sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {