Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wip rgw sync status #8030

Merged
merged 15 commits into from Mar 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/mstart.sh
Expand Up @@ -38,7 +38,7 @@ if [ $? -ne 0 ]; then
fi

pos=`echo $pos | cut -d: -f1`
base_port=$((6800+pos*10))
base_port=$((6800+pos*20))

export VSTART_DEST=$RUN_ROOT_PATH/$instance
export CEPH_PORT=$base_port
Expand Down
346 changes: 346 additions & 0 deletions src/rgw/rgw_admin.cc
Expand Up @@ -364,6 +364,7 @@ enum {
OPT_PERIOD_LIST,
OPT_PERIOD_UPDATE,
OPT_PERIOD_COMMIT,
OPT_SYNC_STATUS,
};

static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_cmd, bool *need_more)
Expand Down Expand Up @@ -716,6 +717,9 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
return OPT_REPLICALOG_UPDATE;
if (strcmp(cmd, "delete") == 0)
return OPT_REPLICALOG_DELETE;
} else if (strcmp(prev_cmd, "sync") == 0) {
if (strcmp(cmd, "status") == 0)
return OPT_SYNC_STATUS;
}

return -EINVAL;
Expand Down Expand Up @@ -1570,6 +1574,344 @@ static int read_current_period_id(RGWRados* store, const std::string& realm_id,
return 0;
}

void flush_ss(stringstream& ss, list<string>& l)
{
if (!ss.str().empty()) {
l.push_back(ss.str());
}
ss.str("");
}

stringstream& push_ss(stringstream& ss, list<string>& l, int tab = 0)
{
flush_ss(ss, l);
if (tab > 0) {
ss << setw(tab) << "" << setw(1);
}
return ss;
}

static void get_md_sync_status(list<string>& status)
{
RGWMetaSyncStatusManager sync(store, store->get_async_rados());

int ret = sync.init();
if (ret < 0) {
status.push_back(string("failed to retrieve sync info: sync.init() failed: ") + cpp_strerror(-ret));
return;
}

ret = sync.read_sync_status();
if (ret < 0) {
status.push_back(string("failed to read sync status: ") + cpp_strerror(-ret));
return;
}

const rgw_meta_sync_status& sync_status = sync.get_sync_status();

string status_str;
switch (sync_status.sync_info.state) {
case rgw_meta_sync_info::StateInit:
status_str = "init";
break;
case rgw_meta_sync_info::StateBuildingFullSyncMaps:
status_str = "preparing for full sync";
break;
case rgw_meta_sync_info::StateSync:
status_str = "syncing";
break;
default:
status_str = "unknown";
}

status.push_back(status_str);

uint64_t full_total = 0;
uint64_t full_complete = 0;

int num_full = 0;
int num_inc = 0;
int total_shards = 0;

for (auto marker_iter : sync_status.sync_markers) {
full_total += marker_iter.second.total_entries;
total_shards++;
if (marker_iter.second.state == rgw_meta_sync_marker::SyncState::FullSync) {
num_full++;
full_complete += marker_iter.second.pos;
} else {
full_complete += marker_iter.second.total_entries;
}
if (marker_iter.second.state == rgw_meta_sync_marker::SyncState::IncrementalSync) {
num_inc++;
}
}

stringstream ss;
push_ss(ss, status) << "full sync: " << num_full << "/" << total_shards << " shards";

if (num_full > 0) {
push_ss(ss, status) << "full sync: " << full_total - full_complete << " entries to sync";
}

push_ss(ss, status) << "incremental sync: " << num_inc << "/" << total_shards << " shards";

rgw_mdlog_info log_info;
ret = sync.read_log_info(&log_info);
if (ret < 0) {
status.push_back(string("failed to fetch local sync status: ") + cpp_strerror(-ret));
return;
}

map<int, RGWMetadataLogInfo> master_shards_info;
string master_period;

ret = sync.read_master_log_shards_info(&master_period, &master_shards_info);
if (ret < 0) {
status.push_back(string("failed to fetch master sync status: ") + cpp_strerror(-ret));
return;
}

map<int, string> shards_behind;

if (sync_status.sync_info.period != master_period) {
status.push_back(string("master is on a different period: master_period=" + master_period + " local_period=" + sync_status.sync_info.period));
} else {
for (auto local_iter : sync_status.sync_markers) {
int shard_id = local_iter.first;
auto iter = master_shards_info.find(shard_id);

if (iter == master_shards_info.end()) {
/* huh? */
derr << "ERROR: could not find remote sync shard status for shard_id=" << shard_id << dendl;
continue;
}
auto master_marker = iter->second.marker;
if (master_marker > local_iter.second.marker) {
shards_behind[shard_id] = local_iter.second.marker;
}
}
}

int total_behind = shards_behind.size() + (sync_status.sync_info.num_shards - num_inc);
if (total_behind == 0) {
status.push_back("metadata is caught up with master");
} else {
push_ss(ss, status) << "metadata is behind on " << total_behind << " shards";

map<int, rgw_mdlog_shard_data> master_pos;
ret = sync.read_master_log_shards_next(sync_status.sync_info.period, shards_behind, &master_pos);
if (ret < 0) {
derr << "ERROR: failed to fetch master next positions (" << cpp_strerror(-ret) << ")" << dendl;
} else {
utime_t oldest;
for (auto iter : master_pos) {
rgw_mdlog_shard_data& shard_data = iter.second;

if (!shard_data.entries.empty()) {
rgw_mdlog_entry& entry = shard_data.entries.front();
if (oldest.is_zero()) {
oldest = entry.timestamp;
} else if (!entry.timestamp.is_zero() && entry.timestamp < oldest) {
oldest = entry.timestamp;
}
}
}

if (!oldest.is_zero()) {
push_ss(ss, status) << "oldest incremental change not applied: " << oldest;
}
}
}

flush_ss(ss, status);
}

static void get_data_sync_status(const string& source_zone, list<string>& status, int tab)
{
RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone);

stringstream ss;

int ret = sync.init();
if (ret < 0) {
push_ss(ss, status, tab) << string("failed to retrieve sync info: ") + cpp_strerror(-ret);
flush_ss(ss, status);
return;
}

ret = sync.read_sync_status();
if (ret < 0) {
push_ss(ss, status, tab) << string("failed read sync status: ") + cpp_strerror(-ret);
return;
}

const rgw_data_sync_status& sync_status = sync.get_sync_status();

string status_str;
switch (sync_status.sync_info.state) {
case rgw_data_sync_info::StateInit:
status_str = "init";
break;
case rgw_data_sync_info::StateBuildingFullSyncMaps:
status_str = "preparing for full sync";
break;
case rgw_data_sync_info::StateSync:
status_str = "syncing";
break;
default:
status_str = "unknown";
}

push_ss(ss, status, tab) << status_str;

uint64_t full_total = 0;
uint64_t full_complete = 0;

int num_full = 0;
int num_inc = 0;
int total_shards = 0;

for (auto marker_iter : sync_status.sync_markers) {
full_total += marker_iter.second.total_entries;
total_shards++;
if (marker_iter.second.state == rgw_data_sync_marker::SyncState::FullSync) {
num_full++;
full_complete += marker_iter.second.pos;
} else {
full_complete += marker_iter.second.total_entries;
}
if (marker_iter.second.state == rgw_data_sync_marker::SyncState::IncrementalSync) {
num_inc++;
}
}

push_ss(ss, status, tab) << "full sync: " << num_full << "/" << total_shards << " shards";

if (num_full > 0) {
push_ss(ss, status, tab) << "full sync: " << full_total - full_complete << " buckets to sync";
}

push_ss(ss, status, tab) << "incremental sync: " << num_inc << "/" << total_shards << " shards";

rgw_datalog_info log_info;
ret = sync.read_log_info(&log_info);
if (ret < 0) {
push_ss(ss, status, tab) << string("failed to fetch local sync status: ") + cpp_strerror(-ret);
return;
}


map<int, RGWDataChangesLogInfo> source_shards_info;

ret = sync.read_source_log_shards_info(&source_shards_info);
if (ret < 0) {
push_ss(ss, status, tab) << string("failed to fetch source sync status: ") + cpp_strerror(-ret);
return;
}

map<int, string> shards_behind;

for (auto local_iter : sync_status.sync_markers) {
int shard_id = local_iter.first;
auto iter = source_shards_info.find(shard_id);

if (iter == source_shards_info.end()) {
/* huh? */
derr << "ERROR: could not find remote sync shard status for shard_id=" << shard_id << dendl;
continue;
}
auto master_marker = iter->second.marker;
if (master_marker > local_iter.second.marker) {
shards_behind[shard_id] = local_iter.second.marker;
}
}

int total_behind = shards_behind.size() + (sync_status.sync_info.num_shards - num_inc);
if (total_behind == 0) {
push_ss(ss, status, tab) << "data is caught up with source";
} else {
push_ss(ss, status, tab) << "data is behind on " << total_behind << " shards";

map<int, rgw_datalog_shard_data> master_pos;
ret = sync.read_source_log_shards_next(shards_behind, &master_pos);
if (ret < 0) {
derr << "ERROR: failed to fetch next positions (" << cpp_strerror(-ret) << ")" << dendl;
} else {
utime_t oldest;
for (auto iter : master_pos) {
rgw_datalog_shard_data& shard_data = iter.second;

if (!shard_data.entries.empty()) {
rgw_datalog_entry& entry = shard_data.entries.front();
if (oldest.is_zero()) {
oldest = entry.timestamp;
} else if (!entry.timestamp.is_zero() && entry.timestamp < oldest) {
oldest = entry.timestamp;
}
}
}

if (!oldest.is_zero()) {
push_ss(ss, status, tab) << "oldest incremental change not applied: " << oldest;
}
}
}

flush_ss(ss, status);
}

static void tab_dump(const string& header, int width, const list<string>& entries)
{
string s = header;

for (auto e : entries) {
cout << std::setw(width) << s << std::setw(1) << " " << e << std::endl;
s.clear();
}
}


static void sync_status(Formatter *formatter)
{
RGWRealm& realm = store->realm;
RGWZoneGroup& zonegroup = store->get_zonegroup();
RGWZone& zone = store->get_zone();

int width = 15;

cout << std::setw(width) << "realm" << std::setw(1) << " " << realm.get_id() << " (" << realm.get_name() << ")" << std::endl;
cout << std::setw(width) << "zonegroup" << std::setw(1) << " " << zonegroup.get_id() << " (" << zonegroup.get_name() << ")" << std::endl;
cout << std::setw(width) << "zone" << std::setw(1) << " " << zone.id << " (" << zone.name << ")" << std::endl;

list<string> md_status;

if (zone.id == zonegroup.master_zone) {
md_status.push_back("no sync (zone is master)");
} else {
get_md_sync_status(md_status);
}

tab_dump("metadata sync", width, md_status);

list<string> data_status;

for (auto iter : store->zone_conn_map) {
const string& source_id = iter.first;
string zone_name;
string source_str = "source: ";
string s = source_str + source_id;
auto siter = store->zone_name_by_id.find(source_id);
if (siter != store->zone_name_by_id.end()) {
s += string(" (") + siter->second + ")";
}
data_status.push_back(s);
get_data_sync_status(source_id, data_status, source_str.size());
}

tab_dump("data sync", width, data_status);
}

int main(int argc, char **argv)
{
vector<const char*> args;
Expand Down Expand Up @@ -4489,6 +4831,10 @@ int main(int argc, char **argv)
}
}

if (opt_cmd == OPT_SYNC_STATUS) {
sync_status(formatter);
}

if (opt_cmd == OPT_METADATA_SYNC_STATUS) {
RGWMetaSyncStatusManager sync(store, store->get_async_rados());

Expand Down