Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rgwlc: permit lifecycle to reduce data conditionally in archive zone #46928

Merged
merged 8 commits into from
Jul 19, 2022
58 changes: 50 additions & 8 deletions src/rgw/rgw_bucket.cc
Expand Up @@ -2139,7 +2139,10 @@ class RGWBucketInstanceMetadataHandler : public RGWBucketInstanceMetadataHandler
RGWSI_BucketIndex *bi{nullptr};
} svc;

RGWBucketInstanceMetadataHandler() {}
rgw::sal::Store* store;

RGWBucketInstanceMetadataHandler(rgw::sal::Store* store)
: store(store) {}

void init(RGWSI_Zone *zone_svc,
RGWSI_Bucket *bucket_svc,
Expand Down Expand Up @@ -2371,12 +2374,51 @@ int RGWMetadataHandlerPut_BucketInstance::put_post(const DoutPrefixProvider *dpp
return ret;
}

/* update lifecyle policy */
{
std::unique_ptr<rgw::sal::Bucket> bucket;
ret = bihandler->store->get_bucket(nullptr, bci.info, &bucket);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << " failed to get_bucket(...) for "
<< bci.info.bucket.name
<< dendl;
return ret;
}

auto lc = bihandler->store->get_rgwlc();

auto lc_it = bci.attrs.find(RGW_ATTR_LC);
if (lc_it != bci.attrs.end()) {
ldpp_dout(dpp, 20) << "set lc config for " << bci.info.bucket.name << dendl;
ret = lc->set_bucket_config(bucket.get(), bci.attrs, nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this will copy lc_config to all the zones but not just archive_zone. Is it intentional? i.e, can every zone now execute LC rules (provided archivezone filter not applied)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been a long-standing issue, tracked in https://tracker.ceph.com/issues/44268 and https://tracker.ceph.com/issues/55487

the current model, where only the primary zone runs lifecycle processing, works okay for the 'normal' disaster recovery use case where the primary and secondary zones have the same set of data. any expirations/transitions that happen on the primary would sync to the secondary, so we should get the same result as if we'd run lifecycle processing on both zones

however, with per-bucket replication in the picture, zones may not have the same data sets. for example, a bucket replication policy may specify that only a subset of objects (like those beginning with prefix 'foo') should replicate from the secondary zone to the primary. lifecycle processing on the primary zone would only run on that subset of objects and skip the rest. so in this case, we really do need to run lifecycle processing on every zone to get the expected result

had we fixed this issue earlier, we would have broken the archive zone feature because lifecycle could delete the archived object versions. so we're fixing both at the same time :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's a better answer than what I had in mind, as for full sync, it's probably not even as efficient to run lc everywhere--but still feels more correct for symmetry. but per-bucket replication does change things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay..thanks for confirming.

if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << " failed to set lc config for "
<< bci.info.bucket.name
<< dendl;
return ret;
}

} else {
ldpp_dout(dpp, 20) << "remove lc config for " << bci.info.bucket.name << dendl;
ret = lc->remove_bucket_config(bucket.get(), bci.attrs);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << " failed to remove lc config for "
<< bci.info.bucket.name
<< dendl;
return ret;
Comment on lines +2394 to +2408
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this error path seems to be causing the regression in metadata sync. i've opened https://tracker.ceph.com/issues/56997 with what we've learned so far

cc @yuvalif

}
}
} /* update lc */

return STATUS_APPLIED;
}

class RGWArchiveBucketInstanceMetadataHandler : public RGWBucketInstanceMetadataHandler {
public:
RGWArchiveBucketInstanceMetadataHandler() {}
RGWArchiveBucketInstanceMetadataHandler(rgw::sal::Store* store)
: RGWBucketInstanceMetadataHandler(store) {}

// N.B. replication of lifecycle policy relies on logic in RGWBucketInstanceMetadataHandler::do_put(...), override with caution

int do_remove(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) override {
ldpp_dout(dpp, 0) << "SKIP: bucket instance removal is not allowed on archive zone: bucket.instance:" << entry << dendl;
Expand Down Expand Up @@ -3027,24 +3069,24 @@ int RGWBucketCtl::bucket_imports_data(const rgw_bucket& bucket,
return handler->bucket_imports_data();
}

RGWBucketMetadataHandlerBase *RGWBucketMetaHandlerAllocator::alloc()
RGWBucketMetadataHandlerBase* RGWBucketMetaHandlerAllocator::alloc()
{
return new RGWBucketMetadataHandler();
}

RGWBucketInstanceMetadataHandlerBase *RGWBucketInstanceMetaHandlerAllocator::alloc()
RGWBucketInstanceMetadataHandlerBase* RGWBucketInstanceMetaHandlerAllocator::alloc(rgw::sal::Store* store)
{
return new RGWBucketInstanceMetadataHandler();
return new RGWBucketInstanceMetadataHandler(store);
}

RGWBucketMetadataHandlerBase *RGWArchiveBucketMetaHandlerAllocator::alloc()
RGWBucketMetadataHandlerBase* RGWArchiveBucketMetaHandlerAllocator::alloc()
{
return new RGWArchiveBucketMetadataHandler();
}

RGWBucketInstanceMetadataHandlerBase *RGWArchiveBucketInstanceMetaHandlerAllocator::alloc()
RGWBucketInstanceMetadataHandlerBase* RGWArchiveBucketInstanceMetaHandlerAllocator::alloc(rgw::sal::Store* store)
{
return new RGWArchiveBucketInstanceMetadataHandler();
return new RGWArchiveBucketInstanceMetadataHandler(store);
}


Expand Down
4 changes: 2 additions & 2 deletions src/rgw/rgw_bucket.h
Expand Up @@ -204,7 +204,7 @@ class RGWBucketMetaHandlerAllocator {

class RGWBucketInstanceMetaHandlerAllocator {
public:
static RGWBucketInstanceMetadataHandlerBase *alloc();
static RGWBucketInstanceMetadataHandlerBase *alloc(rgw::sal::Store* store);
};

class RGWArchiveBucketMetaHandlerAllocator {
Expand All @@ -214,7 +214,7 @@ class RGWArchiveBucketMetaHandlerAllocator {

class RGWArchiveBucketInstanceMetaHandlerAllocator {
public:
static RGWBucketInstanceMetadataHandlerBase *alloc();
static RGWBucketInstanceMetadataHandlerBase *alloc(rgw::sal::Store* store);
};

extern int rgw_remove_object(const DoutPrefixProvider *dpp, rgw::sal::Store* store, rgw::sal::Bucket* bucket, rgw_obj_key& key);
Expand Down
6 changes: 3 additions & 3 deletions src/rgw/rgw_data_sync.cc
Expand Up @@ -520,7 +520,7 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
static constexpr uint32_t lock_duration = 30;
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw::sal::RadosStore* store;
rgw::sal::RadosStore* store; // RGWDataSyncEnv also has a pointer to store
const rgw_pool& pool;
const uint32_t num_shards;

Expand Down Expand Up @@ -2500,8 +2500,8 @@ class RGWArchiveSyncModuleInstance : public RGWDefaultSyncModuleInstance {
RGWMetadataHandler *alloc_bucket_meta_handler() override {
return RGWArchiveBucketMetaHandlerAllocator::alloc();
}
RGWBucketInstanceMetadataHandlerBase *alloc_bucket_instance_meta_handler() override {
return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc();
RGWBucketInstanceMetadataHandlerBase *alloc_bucket_instance_meta_handler(rgw::sal::Store* store) override {
return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc(store);
}
};

Expand Down
52 changes: 40 additions & 12 deletions src/rgw/rgw_lc.cc
Expand Up @@ -17,6 +17,7 @@
#include "include/function2.hpp"
#include "common/Formatter.h"
#include "common/containers.h"
#include "common/split.h"
#include <common/errno.h>
#include "include/random.h"
#include "cls/lock/cls_lock_client.h"
Expand Down Expand Up @@ -150,10 +151,10 @@ bool RGWLifecycleConfiguration::_add_rule(const LCRule& rule)
} else {
prefix = rule.get_prefix();
}

if (rule.get_filter().has_tags()){
op.obj_tags = rule.get_filter().get_tags();
}
op.rule_flags = rule.get_filter().get_flags();
prefix_map.emplace(std::move(prefix), std::move(op));
return true;
}
Expand Down Expand Up @@ -910,7 +911,7 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,

worker->workpool->drain();
return 0;
}
} /* RGWLC::handle_multipart_expiration */

static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, bufferlist& tags_bl)
{
Expand All @@ -930,6 +931,16 @@ static bool is_valid_op(const lc_op& op)
|| !op.noncur_transitions.empty()));
}

static bool zone_check(const lc_op& op, rgw::sal::Zone* zone)
{

if (zone->get_tier_type() == "archive") {
return (op.rule_flags & uint32_t(LCFlagType::ArchiveZone));
} else {
return (! (op.rule_flags & uint32_t(LCFlagType::ArchiveZone)));
}
}

static inline bool has_all_tags(const lc_op& rule_action,
const RGWObjTags& object_tags)
{
Expand Down Expand Up @@ -1572,6 +1583,9 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
return -1;
}

/* fetch information for zone checks */
rgw::sal::Zone* zone = store->get_zone();

auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
auto wt =
boost::get<std::tuple<LCOpRule, rgw_bucket_dir_entry>>(wi);
Expand Down Expand Up @@ -1624,11 +1638,17 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
LCObjsLister ol(store, bucket.get());
ol.set_prefix(prefix_iter->first);

if (! zone_check(op, zone)) {
ldpp_dout(this, 7) << "LC rule not executable in " << zone->get_tier_type()
<< " zone, skipping" << dendl;
continue;
}

ret = ol.init(this);
if (ret < 0) {
if (ret == (-ENOENT))
return 0;
ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
ldpp_dout(this, 0) << "ERROR: store->list_objects():" << dendl;
return ret;
}

Expand Down Expand Up @@ -2435,16 +2455,21 @@ int RGWLC::set_bucket_config(rgw::sal::Bucket* bucket,
const rgw::sal::Attrs& bucket_attrs,
RGWLifecycleConfiguration *config)
{
int ret{0};
rgw::sal::Attrs attrs = bucket_attrs;
bufferlist lc_bl;
config->encode(lc_bl);

attrs[RGW_ATTR_LC] = std::move(lc_bl);

int ret =
bucket->merge_and_store_attrs(this, attrs, null_yield);
if (ret < 0)
return ret;
if (config) {
/* if no RGWLifecycleconfiguration provided, it means
* RGW_ATTR_LC is already valid and present */
bufferlist lc_bl;
config->encode(lc_bl);
attrs[RGW_ATTR_LC] = std::move(lc_bl);

ret =
bucket->merge_and_store_attrs(this, attrs, null_yield);
if (ret < 0) {
return ret;
}
}

rgw_bucket& b = bucket->get_key();

Expand Down Expand Up @@ -2782,6 +2807,9 @@ void LCFilter::dump(Formatter *f) const
{
f->dump_string("prefix", prefix);
f->dump_object("obj_tags", obj_tags);
if (have_flag(LCFlagType::ArchiveZone)) {
f->dump_string("archivezone", "");
}
}

void LCExpiration::dump(Formatter *f) const
Expand Down
67 changes: 61 additions & 6 deletions src/rgw/rgw_lc.h
Expand Up @@ -5,6 +5,7 @@
#define CEPH_RGW_LC_H

#include <map>
#include <array>
#include <string>
#include <iostream>

Expand Down Expand Up @@ -159,13 +160,50 @@ class LCTransition
};
WRITE_CLASS_ENCODER(LCTransition)

enum class LCFlagType : uint16_t
{
none = 0,
ArchiveZone,
};

class LCFlag {
public:
LCFlagType bit;
const char* name;

constexpr LCFlag(LCFlagType ord, const char* name) : bit(ord), name(name)
{}
};

class LCFilter
{
protected:
public:

static constexpr uint32_t make_flag(LCFlagType type) {
switch (type) {
case LCFlagType::none:
return 0;
break;
default:
return 1 << (uint32_t(type) - 1);
}
}

static constexpr std::array<LCFlag, 2> filter_flags =
{
LCFlag(LCFlagType::none, "none"),
LCFlag(LCFlagType::ArchiveZone, "ArchiveZone"),
};

protected:
std::string prefix;
RGWObjTags obj_tags;
uint32_t flags;

public:
public:

LCFilter() : flags(make_flag(LCFlagType::none))
{}

const std::string& get_prefix() const {
return prefix;
Expand All @@ -175,13 +213,17 @@ class LCFilter
return obj_tags;
}

const uint32_t get_flags() const {
return flags;
}

bool empty() const {
return !(has_prefix() || has_tags());
return !(has_prefix() || has_tags() || has_flags());
}

// Determine if we need AND tag when creating xml
bool has_multi_condition() const {
if (obj_tags.count() > 1)
if (obj_tags.count() + int(has_prefix()) + int(has_flags()) > 1) // Prefix is a member of Filter
return true;
return false;
}
Expand All @@ -194,17 +236,29 @@ class LCFilter
return !obj_tags.empty();
}

bool has_flags() const {
return !(flags == uint32_t(LCFlagType::none));
}

bool have_flag(LCFlagType flag) const {
return flags & make_flag(flag);
}

void encode(bufferlist& bl) const {
ENCODE_START(2, 1, bl);
ENCODE_START(3, 1, bl);
encode(prefix, bl);
encode(obj_tags, bl);
encode(flags, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(2, bl);
DECODE_START(3, bl);
decode(prefix, bl);
if (struct_v >= 2) {
decode(obj_tags, bl);
if (struct_v >= 3) {
decode(flags, bl);
}
}
DECODE_FINISH(bl);
}
Expand Down Expand Up @@ -392,6 +446,7 @@ struct lc_op
boost::optional<RGWObjTags> obj_tags;
std::map<std::string, transition_action> transitions;
std::map<std::string, transition_action> noncur_transitions;
uint32_t rule_flags;

/* ctors are nice */
lc_op() = delete;
Expand Down