Skip to content

Commit

Permalink
Merge pull request #21395 from liewegas/wip-log-channels
Browse files Browse the repository at this point in the history
mon/LogMonitor: separate out summary by channel

Reviewed-by: John Spray <john.spray@redhat.com>
  • Loading branch information
tchaikov committed Apr 15, 2018
2 parents 7720ef5 + 648aaf2 commit a07c344
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 73 deletions.
73 changes: 63 additions & 10 deletions src/common/LogEntry.cc
Expand Up @@ -263,33 +263,86 @@ void LogEntry::generate_test_instances(list<LogEntry*>& o)

// -----

void LogSummary::build_ordered_tail(list<LogEntry> *tail) const
{
tail->clear();
// channel -> (begin, end)
map<string,pair<list<pair<uint64_t,LogEntry>>::const_iterator,
list<pair<uint64_t,LogEntry>>::const_iterator>> pos;
for (auto& i : tail_by_channel) {
pos.emplace(i.first, make_pair(i.second.begin(), i.second.end()));
}
while (true) {
uint64_t min_seq = 0;
list<pair<uint64_t,LogEntry>>::const_iterator *minp = 0;
for (auto& i : pos) {
if (i.second.first == i.second.second) {
continue;
}
if (min_seq == 0 || i.second.first->first < min_seq) {
min_seq = i.second.first->first;
minp = &i.second.first;
}
}
if (min_seq == 0) {
break; // done
}
tail->push_back((*minp)->second);
++(*minp);
}
}

void LogSummary::encode(bufferlist& bl, uint64_t features) const
{
ENCODE_START(2, 2, bl);
if (!HAVE_FEATURE(features, SERVER_MIMIC)) {
ENCODE_START(2, 2, bl);
encode(version, bl);
list<LogEntry> tail;
build_ordered_tail(&tail);
encode(tail, bl, features);
ENCODE_FINISH(bl);
return;
}
ENCODE_START(3, 3, bl);
encode(version, bl);
encode(tail, bl, features);
encode(seq, bl);
encode(tail_by_channel, bl, features);
ENCODE_FINISH(bl);
}

void LogSummary::decode(bufferlist::iterator& bl)
{
DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
decode(version, bl);
decode(tail, bl);
if (struct_v < 3) {
list<LogEntry> tail;
decode(tail, bl);
for (auto& i : tail) {
add(i);
}
} else {
decode(seq, bl);
decode(tail_by_channel, bl);
}
DECODE_FINISH(bl);
keys.clear();
for (auto& p : tail) {
keys.insert(p.key());
for (auto& i : tail_by_channel) {
for (auto& e : i.second) {
keys.insert(e.second.key());
}
}
}

void LogSummary::dump(Formatter *f) const
{
f->dump_unsigned("version", version);
f->open_array_section("tail");
for (list<LogEntry>::const_iterator p = tail.begin(); p != tail.end(); ++p) {
f->open_object_section("entry");
p->dump(f);
f->open_object_section("tail_by_channel");
for (auto& i : tail_by_channel) {
f->open_object_section(i.first.c_str());
for (auto& j : i.second) {
string s = stringify(j.first);
f->dump_object(s.c_str(), j.second);
}
f->close_section();
}
f->close_section();
Expand Down
18 changes: 12 additions & 6 deletions src/common/LogEntry.h
Expand Up @@ -121,19 +121,25 @@ WRITE_CLASS_ENCODER_FEATURES(LogEntry)

struct LogSummary {
version_t version;
list<LogEntry> tail;
// channel -> [(seq#, entry), ...]
map<string,list<pair<uint64_t,LogEntry>>> tail_by_channel;
uint64_t seq = 0;
ceph::unordered_set<LogEntryKey> keys;

LogSummary() : version(0) {}

void build_ordered_tail(list<LogEntry> *tail) const;

void add(const LogEntry& e) {
tail.push_back(e);
keys.insert(tail.back().key());
keys.insert(e.key());
tail_by_channel[e.channel].push_back(make_pair(++seq, e));
}
void prune(size_t max) {
while (tail.size() > max) {
keys.erase(tail.front().key());
tail.pop_front();
for (auto& i : tail_by_channel) {
while (i.second.size() > max) {
keys.erase(i.second.front().second.key());
i.second.pop_front();
}
}
}
bool contains(const LogEntryKey& k) const {
Expand Down
104 changes: 48 additions & 56 deletions src/mon/LogMonitor.cc
Expand Up @@ -428,29 +428,57 @@ bool LogMonitor::preprocess_command(MonOpRequestRef op)

// We'll apply this twice, once while counting out lines
// and once while outputting them.
auto match = [level, channel](const LogEntry &entry) {
return entry.prio >= level && (entry.channel == channel || channel == "*");
auto match = [level](const LogEntry &entry) {
return entry.prio >= level;
};

auto rp = summary.tail.rbegin();
for (; num > 0 && rp != summary.tail.rend(); ++rp) {
if (match(*rp)) {
num--;
}
}
if (rp == summary.tail.rend()) {
--rp;
}
ostringstream ss;
for (; rp != summary.tail.rbegin(); --rp) {
if (!match(*rp)) {
continue;
if (channel == "*") {
list<LogEntry> full_tail;
summary.build_ordered_tail(&full_tail);
derr << "full " << full_tail << dendl;
auto rp = full_tail.rbegin();
for (; num > 0 && rp != full_tail.rend(); ++rp) {
if (match(*rp)) {
num--;
}
}

if (f) {
f->dump_object("entry", *rp);
} else {
ss << *rp << "\n";
if (rp == full_tail.rend()) {
--rp;
}
for (; rp != full_tail.rbegin(); --rp) {
if (!match(*rp)) {
continue;
}
if (f) {
f->dump_object("entry", *rp);
} else {
ss << *rp << "\n";
}
}
} else {
derr << "bar" << dendl;
auto p = summary.tail_by_channel.find(channel);
if (p != summary.tail_by_channel.end()) {
auto rp = p->second.rbegin();
for (; num > 0 && rp != p->second.rend(); ++rp) {
if (match(rp->second)) {
num--;
}
}
if (rp == p->second.rend()) {
--rp;
}
for (; rp != p->second.rbegin(); --rp) {
if (!match(rp->second)) {
continue;
}
if (f) {
f->dump_object("entry", rp->second);
} else {
ss << rp->second << "\n";
}
}
}
}
if (f) {
Expand Down Expand Up @@ -563,12 +591,7 @@ void LogMonitor::check_sub(Subscription *s)

if (s->next == 0) {
/* First timer, heh? */
bool ret = _create_sub_summary(mlog, sub_level);
if (!ret) {
dout(1) << __func__ << " ret = " << ret << dendl;
mlog->put();
return;
}
_create_sub_incremental(mlog, sub_level, get_last_committed());
} else {
/* let us send you an incremental log... */
_create_sub_incremental(mlog, sub_level, s->next);
Expand All @@ -589,37 +612,6 @@ void LogMonitor::check_sub(Subscription *s)
s->next = summary_version+1;
}

/**
* Create a log message containing only the last message in the summary.
*
* @param mlog Log message we'll send to the client.
* @param level Maximum log level the client is interested in.
* @return 'true' if we consider we successfully populated @mlog;
* 'false' otherwise.
*/
bool LogMonitor::_create_sub_summary(MLog *mlog, int level)
{
dout(10) << __func__ << dendl;

assert(mlog != NULL);

if (!summary.tail.size())
return false;

list<LogEntry>::reverse_iterator it = summary.tail.rbegin();
for (; it != summary.tail.rend(); ++it) {
LogEntry e = *it;
if (e.prio < level)
continue;

mlog->entries.push_back(e);
mlog->version = summary.version;
break;
}

return true;
}

/**
* Create an incremental log message from version \p sv to \p summary.version
*
Expand Down
1 change: 0 additions & 1 deletion src/mon/LogMonitor.h
Expand Up @@ -155,7 +155,6 @@ class LogMonitor : public PaxosService,
bool preprocess_command(MonOpRequestRef op);
bool prepare_command(MonOpRequestRef op);

bool _create_sub_summary(MLog *mlog, int level);
void _create_sub_incremental(MLog *mlog, int level, version_t sv);

public:
Expand Down

0 comments on commit a07c344

Please sign in to comment.