-
Notifications
You must be signed in to change notification settings - Fork 466
/
line_buffer.go
160 lines (129 loc) · 4.17 KB
/
line_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/grafana/loki/blob/7c78d7ea44afb420847255f9f5a4f677ad0f47bf/pkg/util/log/line_buffer.go
// Provenance-includes-copyright: Grafana Labs
package log
import (
"bytes"
"io"
"sync"
"time"
"go.uber.org/atomic"
)
// LineBufferedLogger buffers log lines to be flushed periodically. Without a line buffer, Log() will call the write
// syscall for every log line which is expensive if logging thousands of lines per second.
type LineBufferedLogger struct {
buf *threadsafeBuffer
entries atomic.Uint32
cap uint32
w io.Writer
onFlush func(entries uint32)
}
// Size returns the number of entries in the buffer.
func (l *LineBufferedLogger) Size() uint32 {
return l.entries.Load()
}
// Write writes the given bytes to the line buffer, and increments the entries counter.
// If the buffer is full (entries == cap), it will be flushed, and the entries counter reset.
func (l *LineBufferedLogger) Write(p []byte) (n int, err error) {
// when we've filled the buffer, flush it
if l.Size() >= l.cap {
// Flush resets the size to 0
if err := l.Flush(); err != nil {
l.buf.Reset()
return 0, err
}
}
l.entries.Inc()
return l.buf.Write(p)
}
// Flush forces the buffer to be written to the underlying writer.
func (l *LineBufferedLogger) Flush() error {
// reset the counter
sz := l.entries.Swap(0)
if sz <= 0 {
return nil
}
// WriteTo() calls Reset() on the underlying buffer, so it's not needed here
_, err := l.buf.WriteTo(l.w)
// only call OnFlush callback if write was successful
if err == nil && l.onFlush != nil {
l.onFlush(sz)
}
return err
}
type LineBufferedLoggerOption func(*LineBufferedLogger)
// WithFlushPeriod creates a new LineBufferedLoggerOption that sets the flush period for the LineBufferedLogger.
func WithFlushPeriod(d time.Duration) LineBufferedLoggerOption {
return func(l *LineBufferedLogger) {
go func() {
tick := time.NewTicker(d)
defer tick.Stop()
for range tick.C {
l.Flush()
}
}()
}
}
// WithFlushCallback allows for a callback function to be executed when Flush() is called.
// The length of the buffer at the time of the Flush() will be passed to the function.
func WithFlushCallback(fn func(entries uint32)) LineBufferedLoggerOption {
return func(l *LineBufferedLogger) {
l.onFlush = fn
}
}
// WithPrellocatedBuffer preallocates a buffer to reduce GC cycles and slice resizing.
func WithPrellocatedBuffer(size uint32) LineBufferedLoggerOption {
return func(l *LineBufferedLogger) {
l.buf = newThreadsafeBuffer(bytes.NewBuffer(make([]byte, 0, size)))
}
}
// NewLineBufferedLogger creates a new LineBufferedLogger with a configured capacity.
// Lines are flushed when the context is done, the buffer is full, or the flush period is reached.
func NewLineBufferedLogger(w io.Writer, cap uint32, opts ...LineBufferedLoggerOption) *LineBufferedLogger {
l := &LineBufferedLogger{
w: w,
buf: newThreadsafeBuffer(bytes.NewBuffer([]byte{})),
cap: cap,
}
for _, opt := range opts {
opt(l)
}
return l
}
// threadsafeBuffer wraps the non-threadsafe bytes.Buffer.
type threadsafeBuffer struct {
sync.RWMutex
buf *bytes.Buffer
}
// Read reads up to len(p) bytes into p. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered.
func (t *threadsafeBuffer) Read(p []byte) (n int, err error) {
t.RLock()
defer t.RUnlock()
return t.buf.Read(p)
}
// Write writes the given bytes to the underlying writer.
func (t *threadsafeBuffer) Write(p []byte) (n int, err error) {
t.Lock()
defer t.Unlock()
return t.buf.Write(p)
}
// WriteTo writes the buffered lines to the given writer.
func (t *threadsafeBuffer) WriteTo(w io.Writer) (n int64, err error) {
t.Lock()
defer t.Unlock()
return t.buf.WriteTo(w)
}
// Reset resets the buffer to be empty,
// but it retains the underlying storage for use by future writes.
// Reset is the same as Truncate(0).
func (t *threadsafeBuffer) Reset() {
t.Lock()
defer t.Unlock()
t.buf.Reset()
}
// newThreadsafeBuffer returns a new threadsafeBuffer wrapping the given bytes.Buffer.
func newThreadsafeBuffer(buf *bytes.Buffer) *threadsafeBuffer {
return &threadsafeBuffer{
buf: buf,
}
}