-
Notifications
You must be signed in to change notification settings - Fork 643
/
async_logger.h
206 lines (172 loc) · 6.74 KB
/
async_logger.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <ctime>
#include <memory>
#include <mutex> // IWYU pragma: keep
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include "kudu/gutil/macros.h"
#include "kudu/util/condition_variable.h"
#include "kudu/util/mutex.h"
namespace kudu {
// Wrapper for a glog Logger which asynchronously writes log messages.
// This class starts a new thread responsible for forwarding the messages
// to the logger, and performs double buffering. Writers append to the
// current buffer and then wake up the logger thread. The logger swaps in
// a new buffer and writes any accumulated messages to the wrapped
// Logger.
//
// This double-buffering behavior dramatically improves performance, especially
// for logging messages which require flushing the underlying file (i.e WARNING
// and above for default). The flush can take a couple of milliseconds, and in
// some cases can even block for hundreds of milliseconds or more. With the
// double-buffered approach, threads can proceed with useful work while the IO
// thread blocks.
//
// The semantics provided by this wrapper are slightly weaker than the default
// glog semantics. By default, glog will immediately (synchronously) flush WARNING
// and above to the underlying file, whereas here we are deferring that flush to
// the separate thread. This means that a crash just after a 'LOG(WARNING)' would
// may be missing the message in the logs, but the perf benefit is probably
// worth it. We do take care that a glog FATAL message flushes all buffered log
// messages before exiting.
//
// NOTE: the logger limits the total amount of buffer space, so if the underlying
// log blocks for too long, eventually the threads generating the log messages
// will block as well. This prevents runaway memory usage.
class AsyncLogger final : public google::base::Logger {
public:
AsyncLogger(google::base::Logger* wrapped,
size_t max_buffer_bytes);
~AsyncLogger() override = default;
void Start();
// Stop the thread. Flush() and Write() must not be called after this.
//
// NOTE: this is currently only used in tests: in real life, we enable async
// logging once when the program starts and then never disable it.
//
// REQUIRES: Start() must have been called.
void Stop();
// Write a message to the log.
//
// 'force_flush' is set by the GLog library based on the configured '--logbuflevel'
// flag. Any messages logged at the configured level or higher result in 'force_flush'
// being set to true, indicating that the message should be immediately written to the
// log rather than buffered in memory. See the class-level docs above for more detail
// about the implementation provided here.
//
// REQUIRES: Start() must have been called.
void Write(bool force_flush,
time_t timestamp,
const char* message,
size_t message_len) override;
// Flush any buffered messages.
void Flush() override;
// Get the current LOG file size.
// The returned value is approximate since some
// logged data may not have been flushed to disk yet.
uint32_t LogSize() override;
// Return a count of how many times an application thread was
// blocked due to the buffers being full and the writer thread
// not keeping up.
int app_threads_blocked_count_for_tests() const {
std::lock_guard l(lock_);
return app_threads_blocked_count_for_tests_;
}
private:
// A buffered message.
//
// TODO(todd): using std::string for buffered messages is convenient but not
// as efficient as it could be. Better would be to make the buffers just be
// Arenas and allocate both the message data and Msg struct from them, forming
// a linked list.
struct Msg {
time_t ts;
std::string message;
Msg(time_t ts, std::string message)
: ts(ts),
message(std::move(message)) {
}
};
// A buffer of messages waiting to be flushed.
struct Buffer {
std::vector<Msg> messages;
// Estimate of the size of 'messages'.
size_t size = 0;
// Whether this buffer needs an explicit flush of the
// underlying logger.
bool flush = false;
Buffer() {}
void clear() {
messages.clear();
size = 0;
flush = false;
}
void add(Msg msg, bool flush) {
size += sizeof(msg) + msg.message.size();
messages.emplace_back(std::move(msg));
this->flush |= flush;
}
bool needs_flush_or_write() const {
return flush || !messages.empty();
}
private:
DISALLOW_COPY_AND_ASSIGN(Buffer);
};
bool BufferFull(const Buffer& buf) const;
void RunThread();
// The maximum number of bytes used by the entire class.
const size_t max_buffer_bytes_;
google::base::Logger* const wrapped_;
std::thread thread_;
// Count of how many times an application thread was blocked due to
// a full buffer.
int app_threads_blocked_count_for_tests_ = 0;
// Count of how many times the writer thread has flushed the buffers.
// 64 bits should be enough to never worry about overflow.
uint64_t flush_count_ = 0;
// Protects buffers as well as 'state_'.
mutable Mutex lock_;
// Signaled by app threads to wake up the flusher, either for new
// data or because 'state_' changed.
ConditionVariable wake_flusher_cond_;
// Signaled by the flusher thread when the flusher has swapped in
// a free buffer to write to.
ConditionVariable free_buffer_cond_;
// Signaled by the flusher thread when it has completed flushing
// the current buffer.
ConditionVariable flush_complete_cond_;
// The buffer to which application threads append new log messages.
std::unique_ptr<Buffer> active_buf_;
// The buffer currently being flushed by the logger thread, cleared
// after a successful flush.
std::unique_ptr<Buffer> flushing_buf_;
// Trigger for the logger thread to stop.
enum State {
INITTED,
RUNNING,
STOPPED
};
State state_ = INITTED;
DISALLOW_COPY_AND_ASSIGN(AsyncLogger);
};
} // namespace kudu