forked from ruiaylin/pgparser
-
Notifications
You must be signed in to change notification settings - Fork 0
/
log_flush.go
167 lines (146 loc) · 4.74 KB
/
log_flush.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package log
import (
"context"
"io"
"time"
"github.com/ruiaylin/pgparser/utils/envutil"
"github.com/ruiaylin/pgparser/utils/sysutil"
)
// flushSyncWriter is the interface satisfied by logging destinations.
type flushSyncWriter interface {
Flush() error
Sync() error
io.Writer
}
// Flush explicitly flushes all pending log I/O.
// See also flushDaemon() that manages background (asynchronous)
// flushes, and signalFlusher() that manages flushes in reaction to a
// user signal.
func Flush() {
mainLog.lockAndFlushAndSync(true /*doSync*/)
secondaryLogRegistry.mu.Lock()
defer secondaryLogRegistry.mu.Unlock()
for _, l := range secondaryLogRegistry.mu.loggers {
// Some loggers (e.g. the audit log) want to keep all the files.
l.logger.lockAndFlushAndSync(true /*doSync*/)
}
}
func init() {
go flushDaemon()
go signalFlusher()
}
// flushInterval is the delay between periodic flushes of the buffered log data.
const flushInterval = time.Second
// syncInterval is the multiple of flushInterval where the log is also synced to disk.
const syncInterval = 30
// maxSyncDuration is set to a conservative value since this is a new mechanism.
// In practice, even a fraction of that would indicate a problem.
var maxSyncDuration = envutil.EnvOrDefaultDuration("COCKROACH_LOG_MAX_SYNC_DURATION", 30*time.Second)
// flushDaemon periodically flushes and syncs the log file buffers.
// This manages both the primary and secondary loggers.
//
// Flush propagates the in-memory buffer inside CockroachDB to the
// in-memory buffer(s) of the OS. The flush is relatively frequent so
// that a human operator can see "up to date" logging data in the log
// file.
//
// Syncs ensure that the OS commits the data to disk. Syncs are less
// frequent because they can incur more significant I/O costs.
func flushDaemon() {
syncCounter := 1
// This doesn't need to be Stop()'d as the loop never escapes.
for range time.Tick(flushInterval) {
doSync := syncCounter == syncInterval
syncCounter = (syncCounter + 1) % syncInterval
// Is flushing disabled?
logging.mu.Lock()
disableDaemons := logging.mu.disableDaemons
logging.mu.Unlock()
// Flush the main log.
if !disableDaemons {
mainLog.lockAndFlushAndSync(doSync)
// Flush the secondary logs.
secondaryLogRegistry.mu.Lock()
for _, l := range secondaryLogRegistry.mu.loggers {
l.logger.lockAndFlushAndSync(doSync)
}
secondaryLogRegistry.mu.Unlock()
}
}
}
// signalFlusher flushes the log(s) every time SIGHUP is received.
// This handles both the primary and secondary loggers.
func signalFlusher() {
ch := sysutil.RefreshSignaledChan()
for sig := range ch {
Infof(context.Background(), "%s received, flushing logs", sig)
Flush()
}
}
// lockAndFlushAndSync is like flushAndSync but locks l.mu first.
func (l *loggerT) lockAndFlushAndSync(doSync bool) {
l.mu.Lock()
l.flushAndSync(doSync)
l.mu.Unlock()
}
// SetSync configures whether logging synchronizes all writes.
// This overrides the synchronization setting for both primary
// and secondary loggers.
// This is used e.g. in `cockroach start` when an error occurs,
// to ensure that all log writes from the point the error
// occurs are flushed to logs (in case the error degenerates
// into a panic / segfault on the way out).
func SetSync(sync bool) {
mainLog.lockAndSetSync(sync)
func() {
secondaryLogRegistry.mu.Lock()
defer secondaryLogRegistry.mu.Unlock()
for _, l := range secondaryLogRegistry.mu.loggers {
if !sync && l.forceSyncWrites {
// We're not changing this.
continue
}
l.logger.lockAndSetSync(sync)
}
}()
if sync {
// There may be something in the buffers already; flush it.
Flush()
}
}
// lockAndSetSync configures syncWrites.
func (l *loggerT) lockAndSetSync(sync bool) {
l.mu.Lock()
l.mu.syncWrites = sync
l.mu.Unlock()
}
// flushAndSync flushes the current log and, if doSync is set,
// attempts to sync its data to disk.
//
// l.mu is held.
func (l *loggerT) flushAndSync(doSync bool) {
if l.mu.file == nil {
return
}
// If we can't sync within this duration, exit the process.
t := time.AfterFunc(maxSyncDuration, func() {
// NB: the disk-stall-detected roachtest matches on this message.
Shoutf(context.Background(), Severity_FATAL,
"disk stall detected: unable to sync log files within %s", maxSyncDuration,
)
})
defer t.Stop()
_ = l.mu.file.Flush() // ignore error
if doSync {
_ = l.mu.file.Sync() // ignore error
}
}