Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mimic: multisite: intermittent test_bucket_index_log_trim failures #24400

Merged
merged 2 commits into from Oct 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/rgw/rgw_bucket.cc
Expand Up @@ -1800,6 +1800,10 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
if (!store->need_to_log_data())
return 0;

if (observer) {
observer->on_bucket_changed(bucket.get_key());
}

rgw_bucket_shard bs(bucket, shard_id);

int index = choose_oid(bs);
Expand Down
9 changes: 9 additions & 0 deletions src/rgw/rgw_bucket.h
Expand Up @@ -408,9 +408,14 @@ struct RGWDataChangesLogInfo {
void decode_json(JSONObj *obj);
};

namespace rgw {
struct BucketChangeObserver;
}

class RGWDataChangesLog {
CephContext *cct;
RGWRados *store;
rgw::BucketChangeObserver *observer = nullptr;

int num_shards;
string *oids;
Expand Down Expand Up @@ -521,6 +526,10 @@ class RGWDataChangesLog {
void mark_modified(int shard_id, const rgw_bucket_shard& bs);
void read_clear_modified(map<int, set<string> > &modified);

void set_observer(rgw::BucketChangeObserver *observer) {
this->observer = observer;
}

bool going_down();
};

Expand Down
10 changes: 3 additions & 7 deletions src/rgw/rgw_data_sync.cc
Expand Up @@ -665,7 +665,7 @@ int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSy
RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module)
{
sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
_sync_tracer, _source_zone, _sync_module, observer);
_sync_tracer, _source_zone, _sync_module);

if (initialized) {
return 0;
Expand Down Expand Up @@ -1126,9 +1126,6 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
<< error_repo->get_obj() << " retcode=" << retcode));
}
}
if (sync_env->observer) {
sync_env->observer->on_bucket_changed(bs.bucket.get_key());
}
/* FIXME: what do do in case of error */
if (marker_tracker && !entry_marker.empty()) {
/* update marker */
Expand Down Expand Up @@ -1917,8 +1914,7 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
bs.shard_id = shard_id;

sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
_error_logger, _sync_tracer, source_zone, _sync_module,
nullptr);
_error_logger, _sync_tracer, source_zone, _sync_module);

return 0;
}
Expand Down Expand Up @@ -3431,7 +3427,7 @@ int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
RGWDataSyncEnv env;
RGWSyncModuleInstanceRef module; // null sync module
env.init(store->ctx(), store, nullptr, store->get_async_rados(),
nullptr, nullptr, nullptr, source_zone, module, nullptr);
nullptr, nullptr, nullptr, source_zone, module);

RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
Expand Down
25 changes: 7 additions & 18 deletions src/rgw/rgw_data_sync.h
Expand Up @@ -14,10 +14,6 @@
#include "rgw_sync_module.h"
#include "rgw_sync_trace.h"

namespace rgw {
struct BucketChangeObserver;
}

struct rgw_datalog_info {
uint32_t num_shards;

Expand Down Expand Up @@ -243,15 +239,13 @@ struct RGWDataSyncEnv {
RGWSyncTraceManager *sync_tracer{nullptr};
string source_zone;
RGWSyncModuleInstanceRef sync_module{nullptr};
rgw::BucketChangeObserver *observer{nullptr};

RGWDataSyncEnv() {}

void init(CephContext *_cct, RGWRados *_store, RGWRESTConn *_conn,
RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager,
RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer,
const string& _source_zone, RGWSyncModuleInstanceRef& _sync_module,
rgw::BucketChangeObserver *_observer) {
const string& _source_zone, RGWSyncModuleInstanceRef& _sync_module) {
cct = _cct;
store = _store;
conn = _conn;
Expand All @@ -261,7 +255,6 @@ struct RGWDataSyncEnv {
sync_tracer = _sync_tracer;
source_zone = _source_zone;
sync_module = _sync_module;
observer = _observer;
}

string shard_obj_name(int shard_id);
Expand All @@ -271,7 +264,6 @@ struct RGWDataSyncEnv {
class RGWRemoteDataLog : public RGWCoroutinesManager {
RGWRados *store;
RGWAsyncRadosProcessor *async_rados;
rgw::BucketChangeObserver *observer;
RGWHTTPManager http_manager;

RGWDataSyncEnv sync_env;
Expand All @@ -284,10 +276,9 @@ class RGWRemoteDataLog : public RGWCoroutinesManager {
bool initialized;

public:
RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
rgw::BucketChangeObserver *observer)
RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
: RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
store(_store), async_rados(async_rados), observer(observer),
store(_store), async_rados(async_rados),
http_manager(store->ctx(), completion_mgr),
lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
initialized(false) {}
Expand Down Expand Up @@ -327,17 +318,15 @@ class RGWDataSyncStatusManager {

public:
RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
const string& _source_zone,
rgw::BucketChangeObserver *observer = nullptr)
const string& _source_zone)
: store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
sync_module(nullptr),
source_log(store, async_rados, observer), num_shards(0) {}
source_log(store, async_rados), num_shards(0) {}
RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
const string& _source_zone, const RGWSyncModuleInstanceRef& _sync_module,
rgw::BucketChangeObserver *observer = nullptr)
const string& _source_zone, const RGWSyncModuleInstanceRef& _sync_module)
: store(_store), source_zone(_source_zone), conn(NULL), error_logger(NULL),
sync_module(_sync_module),
source_log(store, async_rados, observer), num_shards(0) {}
source_log(store, async_rados), num_shards(0) {}
~RGWDataSyncStatusManager() {
finalize();
}
Expand Down
9 changes: 4 additions & 5 deletions src/rgw/rgw_rados.cc
Expand Up @@ -3230,10 +3230,9 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread
}
public:
RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
const string& _source_zone,
rgw::BucketChangeObserver *observer)
const string& _source_zone)
: RGWSyncProcessorThread(_store, "data-sync"),
sync(_store, async_rados, _source_zone, observer),
sync(_store, async_rados, _source_zone),
initialized(false) {}

void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
Expand Down Expand Up @@ -4618,12 +4617,12 @@ int RGWRados::init_complete()
ldout(cct, 0) << "ERROR: failed to start bucket trim manager" << dendl;
return ret;
}
data_log->set_observer(&*bucket_trim);

Mutex::Locker dl(data_sync_thread_lock);
for (auto iter : zone_data_sync_from_map) {
ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl;
auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first,
&*bucket_trim);
auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
ret = thread->init();
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;
Expand Down