Skip to content

Commit

Permalink
Merge PR #21847 into wip-sage-testing-20180521.023125
Browse files Browse the repository at this point in the history
* refs/pull/21847/head:
	log: disk write coalescing

Reviewed-by: Sage Weil <sage@redhat.com>
  • Loading branch information
liewegas committed May 21, 2018
2 parents 47dffc5 + 65da5ba commit ec418e0
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 28 deletions.
109 changes: 81 additions & 28 deletions src/log/Log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#define DEFAULT_MAX_RECENT 10000

#define PREALLOC 1000000

#define MAX_LOG_BUF 65536

namespace ceph {
namespace logging {
Expand Down Expand Up @@ -52,6 +52,7 @@ Log::Log(SubsystemMap *s)
m_syslog_log(-2), m_syslog_crash(-2),
m_stderr_log(1), m_stderr_crash(-1),
m_graylog_log(-3), m_graylog_crash(-3),
m_log_buf(nullptr), m_log_buf_pos(0),
m_stop(false),
m_max_new(DEFAULT_MAX_NEW),
m_max_recent(DEFAULT_MAX_RECENT),
Expand All @@ -71,6 +72,8 @@ Log::Log(SubsystemMap *s)
ret = pthread_cond_init(&m_cond_flusher, NULL);
assert(ret == 0);

m_log_buf = (char*)malloc(MAX_LOG_BUF);

// kludge for prealloc testing
if (false)
for (int i=0; i < PREALLOC; i++)
Expand All @@ -86,6 +89,7 @@ Log::~Log()
assert(!is_started());
if (m_fd >= 0)
VOID_TEMP_FAILURE_RETRY(::close(m_fd));
free(m_log_buf);

pthread_mutex_destroy(&m_queue_mutex);
pthread_mutex_destroy(&m_flush_mutex);
Expand Down Expand Up @@ -296,6 +300,45 @@ void Log::flush()
pthread_mutex_unlock(&m_flush_mutex);
}

void Log::_log_safe_write(const char* what, size_t write_len)
{
if (m_fd < 0)
return;
int r = safe_write(m_fd, m_log_buf, m_log_buf_pos);
if (r != m_fd_last_error) {
if (r < 0)
cerr << "problem writing to " << m_log_file
<< ": " << cpp_strerror(r)
<< std::endl;
m_fd_last_error = r;
}
}

void Log::_flush_logbuf()
{
if (m_log_buf_pos) {
_log_safe_write(m_log_buf, m_log_buf_pos);
m_log_buf_pos = 0;
}
}

// write part of "what" directly to disk, copy remaining part to m_log_buf
// for later coalescing
void Log::_write_and_copy(char* what, size_t len)
{
size_t write_len = len - (len & (MAX_LOG_BUF - 1));
if (write_len) {
_log_safe_write(what, write_len);
what += write_len;
}

write_len = len - write_len;
if (write_len) {
maybe_inline_memcpy((void*)m_log_buf, (void*)what, write_len, 32);
m_log_buf_pos = write_len;
}
}

void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash)
{
Entry *e;
Expand All @@ -310,59 +353,66 @@ void Log::_flush(EntryQueue *t, EntryQueue *requeue, bool crash)

e->hint_size();
if (do_fd || do_syslog || do_stderr) {
size_t buflen = 0;
size_t line_used = 0;

char *buf;
size_t buf_size = 80 + e->size();
bool need_dynamic = buf_size >= 0x10000; //avoids >64K buffers
//allocation at stack
char buf0[need_dynamic ? 1 : buf_size];
char *line;
size_t line_size = 80 + e->size();
bool need_dynamic = line_size >= MAX_LOG_BUF;

// this flushes the existing buffers if either line is longer
// than our buffer, or buffer is too full to fit it
if (m_log_buf_pos + line_size >= MAX_LOG_BUF) {
_flush_logbuf();
}
if (need_dynamic) {
buf = new char[buf_size];
line = new char[line_size];
} else {
buf = buf0;
line = &m_log_buf[m_log_buf_pos];
}

if (crash)
buflen += snprintf(buf, buf_size, "%6d> ", -t->m_len);
buflen += append_time(e->m_stamp, buf + buflen, buf_size - buflen);
buflen += snprintf(buf + buflen, buf_size-buflen, " %lx %2d ",
line_used += snprintf(line, line_size, "%6d> ", -t->m_len);
line_used += append_time(e->m_stamp, line + line_used, line_size - line_used);
line_used += snprintf(line + line_used, line_size - line_used, " %lx %2d ",
(unsigned long)e->m_thread, e->m_prio);

buflen += e->snprintf(buf + buflen, buf_size - buflen - 1);
if (buflen > buf_size - 1) { //paranoid check, buf was declared
line_used += e->snprintf(line + line_used, line_size - line_used - 1);
if (line_used > line_size - 1) { //paranoid check, buf was declared
//to hold everything
buflen = buf_size - 1;
buf[buflen] = 0;
line_used = line_size - 1;
line[line_used] = 0;
}

if (do_syslog) {
syslog(LOG_USER|LOG_INFO, "%s", buf);
syslog(LOG_USER|LOG_INFO, "%s", line);
}

if (do_stderr) {
cerr << m_log_stderr_prefix << buf << std::endl;
cerr << m_log_stderr_prefix << line << std::endl;
}

if (do_fd) {
buf[buflen] = '\n';
int r = safe_write(m_fd, buf, buflen+1);
if (r != m_fd_last_error) {
if (r < 0)
cerr << "problem writing to " << m_log_file
<< ": " << cpp_strerror(r)
<< std::endl;
m_fd_last_error = r;
}
line[line_used] = '\n';
if (need_dynamic) {
_write_and_copy(line, line_used + 1);
}
}

if (need_dynamic)
delete[] buf;
delete[] line;

m_log_buf_pos += line_used + 1;
}

if (do_graylog2 && m_graylog) {
m_graylog->log_entry(e);
}

requeue->enqueue(e);
}

_flush_logbuf();

}

void Log::_log_message(const char *s, bool crash)
Expand Down Expand Up @@ -400,6 +450,7 @@ void Log::dump_recent()
m_queue_mutex_holder = 0;
pthread_mutex_unlock(&m_queue_mutex);
_flush(&t, &m_recent, false);
_flush_logbuf();

EntryQueue old;
_log_message("--- begin dump of recent events ---", true);
Expand All @@ -425,6 +476,8 @@ void Log::dump_recent()

_log_message("--- end dump of recent events ---", true);

_flush_logbuf();

m_flush_mutex_holder = 0;
pthread_mutex_unlock(&m_flush_mutex);
}
Expand Down
6 changes: 6 additions & 0 deletions src/log/Log.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class Log : private Thread

std::shared_ptr<Graylog> m_graylog;

char* m_log_buf; ///< coalescing buffer
int m_log_buf_pos; ///< where we're at within coalescing buffer

bool m_stop;

int m_max_new, m_max_recent;
Expand All @@ -58,6 +61,9 @@ class Log : private Thread

void *entry() override;

void _log_safe_write(const char* what, size_t write_len);
void _write_and_copy(char* what, size_t len);
void _flush_logbuf();
void _flush(EntryQueue *q, EntryQueue *requeue, bool crash);

void _log_message(const char *s, bool crash);
Expand Down

0 comments on commit ec418e0

Please sign in to comment.