Skip to content

Commit

Permalink
mon/LogMonitor: separate out summary by channel
Browse files Browse the repository at this point in the history
Instead of keeping the last N entries, keep the last N entries for each
channel.  This ensures that lots of audit records don't age out the
cluster records (or vice versa).

The overall approach does not change, and that approach is overall pretty
lame.  We're still rewriting a big summary blob on every log commit.  We
still should refactor this later.  This solves the immediate problem at a
small cost of increasing the log summary structure by ~2x.

Signed-off-by: Sage Weil <sage@redhat.com>
  • Loading branch information
liewegas committed Apr 12, 2018
1 parent ad4f0fb commit 87164fd
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 35 deletions.
71 changes: 61 additions & 10 deletions src/common/LogEntry.cc
Expand Up @@ -263,33 +263,84 @@ void LogEntry::generate_test_instances(list<LogEntry*>& o)

// -----

void LogSummary::build_ordered_tail(list<LogEntry> *tail) const
{
tail->clear();
// channel -> (begin, end)
map<string,pair<list<pair<uint64_t,LogEntry>>::const_iterator,
list<pair<uint64_t,LogEntry>>::const_iterator>> pos;
for (auto& i : tail_by_channel) {
pos.emplace(i.first, make_pair(i.second.begin(), i.second.end()));
}
while (true) {
uint64_t min_seq = 0;
list<pair<uint64_t,LogEntry>>::const_iterator *minp = 0;
for (auto& i : pos) {
if (i.second.first == i.second.second) {
continue;
}
if (min_seq == 0 || i.second.first->first < min_seq) {
min_seq = i.second.first->first;
minp = &i.second.first;
}
}
if (min_seq == 0) {
break; // done
}
tail->push_back((*minp)->second);
++(*minp);
}
}

void LogSummary::encode(bufferlist& bl, uint64_t features) const
{
ENCODE_START(2, 2, bl);
if (!HAVE_FEATURE(features, SERVER_MIMIC)) {
ENCODE_START(2, 2, bl);
encode(version, bl);
list<LogEntry> tail;
build_ordered_tail(&tail);
encode(tail, bl, features);
ENCODE_FINISH(bl);
return;
}
ENCODE_START(3, 3, bl);
encode(version, bl);
encode(tail, bl, features);
encode(tail_by_channel, bl, features);
ENCODE_FINISH(bl);
}

void LogSummary::decode(bufferlist::iterator& bl)
{
DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
decode(version, bl);
decode(tail, bl);
if (struct_v < 3) {
list<LogEntry> tail;
decode(tail, bl);
for (auto& i : tail) {
add(i);
}
} else {
decode(tail_by_channel, bl);
}
DECODE_FINISH(bl);
keys.clear();
for (auto& p : tail) {
keys.insert(p.key());
for (auto& i : tail_by_channel) {
for (auto& e : i.second) {
keys.insert(e.second.key());
}
}
}

void LogSummary::dump(Formatter *f) const
{
f->dump_unsigned("version", version);
f->open_array_section("tail");
for (list<LogEntry>::const_iterator p = tail.begin(); p != tail.end(); ++p) {
f->open_object_section("entry");
p->dump(f);
f->open_object_section("tail_by_channel");
for (auto& i : tail_by_channel) {
f->open_object_section(i.first.c_str());
for (auto& j : i.second) {
string s = stringify(j.first);
f->dump_object(s.c_str(), j.second);
}
f->close_section();
}
f->close_section();
Expand Down
18 changes: 12 additions & 6 deletions src/common/LogEntry.h
Expand Up @@ -121,19 +121,25 @@ WRITE_CLASS_ENCODER_FEATURES(LogEntry)

struct LogSummary {
version_t version;
list<LogEntry> tail;
// channel -> [(seq#, entry), ...]
map<string,list<pair<uint64_t,LogEntry>>> tail_by_channel;
uint64_t seq = 0;
ceph::unordered_set<LogEntryKey> keys;

LogSummary() : version(0) {}

void build_ordered_tail(list<LogEntry> *tail) const;

void add(const LogEntry& e) {
tail.push_back(e);
keys.insert(tail.back().key());
keys.insert(e.key());
tail_by_channel[e.channel].push_back(make_pair(++seq, e));
}
void prune(size_t max) {
while (tail.size() > max) {
keys.erase(tail.front().key());
tail.pop_front();
for (auto& i : tail_by_channel) {
while (i.second.size() > max) {
keys.erase(i.second.front().second.key());
i.second.pop_front();
}
}
}
bool contains(const LogEntryKey& k) const {
Expand Down
66 changes: 47 additions & 19 deletions src/mon/LogMonitor.cc
Expand Up @@ -428,29 +428,57 @@ bool LogMonitor::preprocess_command(MonOpRequestRef op)

// We'll apply this twice, once while counting out lines
// and once while outputting them.
auto match = [level, channel](const LogEntry &entry) {
return entry.prio >= level && (entry.channel == channel || channel == "*");
auto match = [level](const LogEntry &entry) {
return entry.prio >= level;
};

auto rp = summary.tail.rbegin();
for (; num > 0 && rp != summary.tail.rend(); ++rp) {
if (match(*rp)) {
num--;
}
}
if (rp == summary.tail.rend()) {
--rp;
}
ostringstream ss;
for (; rp != summary.tail.rbegin(); --rp) {
if (!match(*rp)) {
continue;
if (channel == "*") {
list<LogEntry> full_tail;
summary.build_ordered_tail(&full_tail);
derr << "full " << full_tail << dendl;
auto rp = full_tail.rbegin();
for (; num > 0 && rp != full_tail.rend(); ++rp) {
if (match(*rp)) {
num--;
}
}

if (f) {
f->dump_object("entry", *rp);
} else {
ss << *rp << "\n";
if (rp == full_tail.rend()) {
--rp;
}
for (; rp != full_tail.rbegin(); --rp) {
if (!match(*rp)) {
continue;
}
if (f) {
f->dump_object("entry", *rp);
} else {
ss << *rp << "\n";
}
}
} else {
derr << "bar" << dendl;
auto p = summary.tail_by_channel.find(channel);
if (p != summary.tail_by_channel.end()) {
auto rp = p->second.rbegin();
for (; num > 0 && rp != p->second.rend(); ++rp) {
if (match(rp->second)) {
num--;
}
}
if (rp == p->second.rend()) {
--rp;
}
for (; rp != p->second.rbegin(); --rp) {
if (!match(rp->second)) {
continue;
}
if (f) {
f->dump_object("entry", rp->second);
} else {
ss << rp->second << "\n";
}
}
}
}
if (f) {
Expand Down

0 comments on commit 87164fd

Please sign in to comment.