/
livelog.go
151 lines (127 loc) · 3.72 KB
/
livelog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package livelog provides a buffer that can be simultaneously written to by
// one writer and read from by many readers.
package livelog
import (
"io"
"sync"
)
const (
// MaxBufferSize is the maximum buffer size, as it is more output than
// we expect from reasonable tests.
MaxBufferSize = 2 << 20 // 2 MB
// truncationMessage is added to the end of the log when it reaches the
// maximum size.
truncationMessage = "\n\n... log truncated ..."
// maxUserSize is the total user output we can place in the buffer
// while still leaving room for the truncation message.
maxUserSize = MaxBufferSize - len(truncationMessage)
)
// Buffer is an io.WriteCloser that provides multiple Readers that each yield
// the same data.
//
// It is safe to Write to a Buffer while Readers consume data. A Buffer has a
// maximum size of MaxBufferSize, after which Write will silently drop
// additional data and the buffer will contain a truncation note at the end.
//
// The zero value is a ready-to-use buffer.
type Buffer struct {
mu sync.Mutex // Guards the fields below.
wake *sync.Cond // Created on demand by reader.
buf []byte // Length is in the range [0, MaxBufferSize].
eof bool
lastID int
}
// Write appends data to the Buffer.
// It will wake any blocked Readers.
func (b *Buffer) Write(b2 []byte) (int, error) {
b.mu.Lock()
defer b.mu.Unlock()
needTrunc := false
b2len := len(b2)
if len(b.buf) == MaxBufferSize {
// b.buf is full and truncationMessage was written.
b2 = nil
} else if len(b.buf)+b2len > maxUserSize {
b2 = b2[:maxUserSize-len(b.buf)]
needTrunc = true
// After this write, b.buf will reach MaxBufferSize length.
}
b.buf = append(b.buf, b2...)
if needTrunc {
b.buf = append(b.buf, []byte(truncationMessage)...)
}
b.wakeReaders()
return b2len, nil
}
// Close signals EOF to all Readers.
func (b *Buffer) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
b.eof = true
b.wakeReaders()
return nil
}
// wakeReaders wakes any sleeping readers.
// b.mu must be held when calling.
func (b *Buffer) wakeReaders() {
if b.wake != nil {
b.wake.Broadcast()
}
}
// Bytes returns a copy of the underlying buffer.
func (b *Buffer) Bytes() []byte {
b.mu.Lock()
defer b.mu.Unlock()
return append([]byte(nil), b.buf...)
}
// String returns a copy of the underlying buffer as a string.
func (b *Buffer) String() string {
b.mu.Lock()
defer b.mu.Unlock()
return string(b.buf)
}
// Reader initializes and returns a ReadCloser that will emit the entire buffer.
// It is safe to call Read and Close concurrently.
func (b *Buffer) Reader() io.ReadCloser {
b.mu.Lock()
defer b.mu.Unlock()
b.lastID++
return &reader{buf: b, id: b.lastID}
}
type reader struct {
buf *Buffer
id int // Read-only.
read int // Bytes read; accessed by only the Read method.
closed bool // Guarded by buf.mu.
}
func (r *reader) Read(b []byte) (int, error) {
r.buf.mu.Lock()
defer r.buf.mu.Unlock()
// Wait for data or writer EOF or reader closed.
for len(r.buf.buf) == r.read && !r.buf.eof && !r.closed {
if r.buf.wake == nil {
r.buf.wake = sync.NewCond(&r.buf.mu)
}
r.buf.wake.Wait()
}
// Return EOF if writer reported EOF or this reader is closed.
if (len(r.buf.buf) == r.read && r.buf.eof) || r.closed {
return 0, io.EOF
}
// Emit some data.
n := copy(b, r.buf.buf[r.read:])
r.read += n
return n, nil
}
func (r *reader) Close() error {
r.buf.mu.Lock()
defer r.buf.mu.Unlock()
r.closed = true
// Wake any sleeping readers to unblock a pending read on this reader.
// (For other open readers this will be a no-op.)
r.buf.wakeReaders()
return nil
}