Skip to content

Commit

Permalink
rgw/sync-policy: Support disabling per-bucket replication
Browse files Browse the repository at this point in the history
When the zones replicate, allow disabling replication
for specific buckets using sync policy.

These are the semantics to be followed while resolving the policy
conflicts -

==================================================
zonegroup		bucket		Result
==================================================
enabled			enabled		enabled
			allowed		enabled
			forbidden	disabled
allowed			enabled		enabled
			allowed		disabled
			forbidden	disabled
forbidden		enabled		disabled
			allowed		disabled
			forbidden	disabled

In case multiple group policies are set to reflect for any sync pair
(<source-zone,source-bucket>, <dest-zone,dest-bucket>), the following
rules are applied in the order -
1) Even if one policy status is FORBIDDEN, the sync will be disabled
2) Atleast one policy should be ENABLED for the sync to be allowed.

Various cases tested are outlined here -
https://docs.google.com/document/d/19oBQA-bYxLBR4BnekA2DTwJJaTFvjAfrqAk9G3RGU0I/edit#heading=h.4qac9dpc76m

Signed-off-by: Soumya Koduri <skoduri@redhat.com>
(cherry picked from commit aeb3a74)
(cherry picked from commit 587910c)
  • Loading branch information
soumyakoduri committed May 29, 2023
1 parent 6f9eedf commit 3d98234
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 8 deletions.
87 changes: 80 additions & 7 deletions src/rgw/rgw_bucket_sync.cc
Expand Up @@ -467,6 +467,12 @@ RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator RGWBucketSync
}

void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) {
/* Ensure this pipe doesn't match with any disabled pipes */
for (auto p: disabled_pipe_map) {
if (p.second.source.match(pipe.source) && p.second.dest.match(pipe.dest)) {
return;
}
}
pipe_map.insert(make_pair(pipe.id, pipe));

auto& rules_ref = rules[endpoints_pair(pipe)];
Expand All @@ -482,6 +488,32 @@ void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe
handlers.insert(h);
}

void RGWBucketSyncFlowManager::pipe_set::remove_all() {
pipe_map.clear();
disabled_pipe_map.clear();
rules.clear();
handlers.clear();
}

void RGWBucketSyncFlowManager::pipe_set::disable(const rgw_sync_bucket_pipe& pipe) {
/* This pipe is disabled. Add it to disabled pipes & remove any
* matching pipes already inserted
*/
disabled_pipe_map.insert(make_pair(pipe.id, pipe));
for (auto iter_p = pipe_map.begin(); iter_p != pipe_map.end(); ) {
auto p = iter_p++;
if (p->second.source.match(pipe.source) && p->second.dest.match(pipe.dest)) {
auto& rules_ref = rules[endpoints_pair(p->second)];
if (rules_ref) {
pipe_handler h(rules_ref, p->second);
handlers.erase(h);
}
rules.erase(endpoints_pair(p->second));
pipe_map.erase(p);
}
}
}

void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const
{
encode_json("pipes", pipe_map, f);
Expand Down Expand Up @@ -557,6 +589,30 @@ void RGWBucketSyncFlowManager::init(const DoutPrefixProvider *dpp, const rgw_syn
}
}

/*
* These are the semantics to be followed while resolving the policy
* conflicts -
*
* ==================================================
* zonegroup bucket Result
* ==================================================
* enabled enabled enabled
* allowed enabled
* forbidden disabled
* allowed enabled enabled
* allowed disabled
* forbidden disabled
* forbidden enabled disabled
* allowed disabled
* forbidden disabled
*
* In case multiple group policies are set to reflect for any sync pair
* (<source-zone,source-bucket>, <dest-zone,dest-bucket>), the following
* rules are applied in the order-
* 1) Even if one policy status is FORBIDDEN, the sync will be disabled
* 2) Atleast one policy should be ENABLED for the sync to be allowed.
*
*/
void RGWBucketSyncFlowManager::reflect(const DoutPrefixProvider *dpp,
std::optional<rgw_bucket> effective_bucket,
RGWBucketSyncFlowManager::pipe_set *source_pipes,
Expand All @@ -565,6 +621,7 @@ void RGWBucketSyncFlowManager::reflect(const DoutPrefixProvider *dpp,

{
string effective_bucket_key;
bool is_forbidden = false;
if (effective_bucket) {
effective_bucket_key = effective_bucket->get_key();
}
Expand All @@ -574,10 +631,16 @@ void RGWBucketSyncFlowManager::reflect(const DoutPrefixProvider *dpp,

for (auto& item : flow_groups) {
auto& flow_group_map = item.second;

/* only return enabled groups */
if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED &&
is_forbidden = false;

if (flow_group_map.status == rgw_sync_policy_group::Status::FORBIDDEN) {
/* FORBIDDEN takes precedence over all the other rules.
* Remove any other pipes which may allow access.
*/
is_forbidden = true;
} else if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED &&
(only_enabled || flow_group_map.status != rgw_sync_policy_group::Status::ALLOWED)) {
/* only return enabled groups */
continue;
}

Expand All @@ -590,8 +653,13 @@ void RGWBucketSyncFlowManager::reflect(const DoutPrefixProvider *dpp,
pipe.source.apply_bucket(effective_bucket);
pipe.dest.apply_bucket(effective_bucket);

ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding source pipe: " << pipe << dendl;
source_pipes->insert(pipe);
if (is_forbidden) {
ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): removing source pipe: " << pipe << dendl;
source_pipes->disable(pipe);
} else {
ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding source pipe: " << pipe << dendl;
source_pipes->insert(pipe);
}
}

for (auto& entry : flow_group_map.dests) {
Expand All @@ -604,8 +672,13 @@ void RGWBucketSyncFlowManager::reflect(const DoutPrefixProvider *dpp,
pipe.source.apply_bucket(effective_bucket);
pipe.dest.apply_bucket(effective_bucket);

ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding dest pipe: " << pipe << dendl;
dest_pipes->insert(pipe);
if (is_forbidden) {
ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): removing dest pipe: " << pipe << dendl;
dest_pipes->disable(pipe);
} else {
ldpp_dout(dpp, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding dest pipe: " << pipe << dendl;
dest_pipes->insert(pipe);
}
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/rgw/rgw_bucket_sync.h
Expand Up @@ -31,7 +31,7 @@ struct rgw_sync_group_pipe_map {
rgw_zone_id zone;
std::optional<rgw_bucket> bucket;

rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::FORBIDDEN};
rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::UNKNOWN};

using zb_pipe_map_t = std::multimap<rgw_sync_bucket_entity, rgw_sync_bucket_pipe>;

Expand Down Expand Up @@ -209,6 +209,7 @@ class RGWBucketSyncFlowManager {
struct pipe_set {
std::map<endpoints_pair, pipe_rules_ref> rules;
std::multimap<std::string, rgw_sync_bucket_pipe> pipe_map;
std::multimap<std::string, rgw_sync_bucket_pipe> disabled_pipe_map;

std::set<pipe_handler> handlers;

Expand All @@ -217,10 +218,13 @@ class RGWBucketSyncFlowManager {
void clear() {
rules.clear();
pipe_map.clear();
disabled_pipe_map.clear();
handlers.clear();
}

void insert(const rgw_sync_bucket_pipe& pipe);
void remove_all();
void disable(const rgw_sync_bucket_pipe& pipe);

iterator begin() const {
return handlers.begin();
Expand Down

0 comments on commit 3d98234

Please sign in to comment.