Skip to content

Commit

Permalink
rgw/lc: cleanup duplicate code
Browse files Browse the repository at this point in the history
Cleaning up duplicate code around updating head

Signed-off-by: Soumya Koduri <skoduri@redhat.com>
  • Loading branch information
soumyakoduri committed Apr 23, 2024
1 parent 86a8709 commit e2b3a30
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 70 deletions.
133 changes: 63 additions & 70 deletions src/rgw/rgw_lc.cc
Expand Up @@ -2198,6 +2198,55 @@ inline int RGWLC::advance_head(const std::string& lc_shard,
return ret;
} /* advance head */

inline int RGWLC::check_if_shard_done(const std::string& lc_shard,
rgw::sal::Lifecycle::LCHead& head, int worker_ix)
{
int ret{0};

if (head.get_marker().empty()) {
/* done with this shard */
ldpp_dout(this, 5) <<
"RGWLC::process() next_entry not found. cycle finished lc_shard="
<< lc_shard << " worker=" << worker_ix
<< dendl;
head.set_shard_rollover_date(ceph_clock_now());
ret = sal_lc->put_head(lc_shard, head);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
<< lc_shard
<< dendl;
}
ret = 1; // to mark that shard is done
}
return ret;
}

inline int RGWLC::update_head(const std::string& lc_shard,
rgw::sal::Lifecycle::LCHead& head,
rgw::sal::Lifecycle::LCEntry& entry,
time_t start_date, int worker_ix)
{
int ret{0};

ret = advance_head(lc_shard, head, entry, start_date);
if (ret != 0) {
ldpp_dout(this, 0) << "RGWLC::update_head() failed to advance head "
<< lc_shard
<< dendl;
goto exit;
}

ret = check_if_shard_done(lc_shard, head, worker_ix);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::update_head() failed to check if shard is done "
<< lc_shard
<< dendl;
}

exit:
return ret;
}

int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
bool once = false)
{
Expand Down Expand Up @@ -2280,27 +2329,13 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
ret = sal_lc->get_entry(lc_shard, head->get_marker(), &entry);
if (ret == -ENOENT) {
/* skip to next entry */
std::unique_ptr<rgw::sal::Lifecycle::LCEntry> tmp_entry = sal_lc->get_entry();
tmp_entry->set_bucket(head->get_marker());
if (advance_head(lc_shard, *head.get(), *tmp_entry.get(), now) < 0) {
goto exit;
}
/* done with this shard */
if (head->get_marker().empty()) {
ldpp_dout(this, 5) <<
"RGWLC::process() next_entry not found. cycle finished lc_shard="
<< lc_shard << " worker=" << worker->ix
<< dendl;
head->set_shard_rollover_date(ceph_clock_now());
ret = sal_lc->put_head(lc_shard, *head.get());
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
<< lc_shard
<< dendl;
}
goto exit;
}
continue;
std::unique_ptr<rgw::sal::Lifecycle::LCEntry> tmp_entry = sal_lc->get_entry();
tmp_entry->set_bucket(head->get_marker());

if (update_head(lc_shard, *head.get(), *tmp_entry.get(), now, worker->ix) != 0) {
goto exit;
}
continue;
}
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() sal_lc->get_entry(lc_shard, head.marker, entry) "
Expand All @@ -2321,51 +2356,21 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
<< "RGWLC::process(): ACTIVE entry: " << entry
<< " index: " << index << " worker ix: " << worker->ix << dendl;
/* skip to next entry */
if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) {
goto exit;
}
/* done with this shard */
if (head->get_marker().empty()) {
ldpp_dout(this, 5) <<
"RGWLC::process() cycle finished lc_shard="
<< lc_shard << " worker=" << worker->ix
<< dendl;
head->set_shard_rollover_date(ceph_clock_now());
ret = sal_lc->put_head(lc_shard, *head.get());
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
<< lc_shard
<< dendl;
}
goto exit;
if (update_head(lc_shard, *head.get(), *entry.get(), now, worker->ix) != 0) {
goto exit;
}
continue;
}
} else {
if ((entry->get_status() == lc_complete) &&
already_run_today(cct, entry->get_start_time())) {
/* skip to next entry */
if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) {
goto exit;
}
ldpp_dout(this, 5) << "RGWLC::process() worker ix: " << worker->ix
<< " SKIP processing for already-processed bucket " << entry->get_bucket()
<< dendl;
/* done with this shard */
if (head->get_marker().empty()) {
ldpp_dout(this, 5) <<
"RGWLC::process() cycle finished lc_shard="
<< lc_shard << " worker=" << worker->ix
<< dendl;
head->set_shard_rollover_date(ceph_clock_now());
ret = sal_lc->put_head(lc_shard, *head.get());
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
<< lc_shard
<< dendl;
}
goto exit;
}
/* skip to next entry */
if (update_head(lc_shard, *head.get(), *entry.get(), now, worker->ix) != 0) {
goto exit;
}
continue;
}
}
Expand Down Expand Up @@ -2447,19 +2452,7 @@ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
}
}

/* done with this shard */
if (head->get_marker().empty()) {
ldpp_dout(this, 5) <<
"RGWLC::process() cycle finished lc_shard="
<< lc_shard << " worker=" << worker->ix
<< dendl;
head->set_shard_rollover_date(ceph_clock_now());
ret = sal_lc->put_head(lc_shard, *head.get());
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
<< lc_shard
<< dendl;
}
if (check_if_shard_done(lc_shard, *head.get(), worker->ix) != 0 ) {
goto exit;
}
} while(1 && !once && !going_down());
Expand Down
7 changes: 7 additions & 0 deletions src/rgw/rgw_lc.h
Expand Up @@ -631,6 +631,13 @@ class RGWLC : public DoutPrefixProvider {
rgw::sal::Lifecycle::LCHead& head,
rgw::sal::Lifecycle::LCEntry& entry,
time_t start_date);
int check_if_shard_done(const std::string& lc_shard,
rgw::sal::Lifecycle::LCHead& head,
int worker_ix);
int update_head(const std::string& lc_shard,
rgw::sal::Lifecycle::LCHead& head,
rgw::sal::Lifecycle::LCEntry& entry,
time_t start_date, int worker_ix);
int process(int index, int max_lock_secs, LCWorker* worker, bool once);
int process_bucket(int index, int max_lock_secs, LCWorker* worker,
const std::string& bucket_entry_marker, bool once);
Expand Down

0 comments on commit e2b3a30

Please sign in to comment.