-
Notifications
You must be signed in to change notification settings - Fork 482
/
wal.go
129 lines (110 loc) · 3.52 KB
/
wal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package wal
import (
"fmt"
"os"
"github.com/go-kit/log"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/grafana/loki/pkg/ingester/wal"
)
var (
recordPool = wal.NewRecordPool()
)
// WAL is an interface that allows us to abstract ourselves from Prometheus WAL implementation.
type WAL interface {
// Log marshals the records and writes it into the WAL.
Log(*wal.Record) error
Delete() error
Sync() error
Dir() string
Close()
NextSegment() (int, error)
}
type wrapper struct {
wal *wlog.WL
log log.Logger
}
// New creates a new wrapper, instantiating the actual wlog.WL underneath.
func New(cfg Config, log log.Logger, registerer prometheus.Registerer) (WAL, error) {
// TODO: We should fine-tune the WAL instantiated here to allow some buffering of written entries, but not written to disk
// yet. This will attest for the lack of buffering in the channel Writer exposes.
tsdbWAL, err := wlog.NewSize(log, registerer, cfg.Dir, wlog.DefaultSegmentSize, wlog.CompressionSnappy)
if err != nil {
return nil, fmt.Errorf("failde to create tsdb WAL: %w", err)
}
return &wrapper{
wal: tsdbWAL,
log: log,
}, nil
}
// Close closes the underlying wal, flushing pending writes and closing the active segment. Safe to call more than once
func (w *wrapper) Close() {
// Avoid checking the error since it's safe to call Close more than once on wlog.WL
_ = w.wal.Close()
}
func (w *wrapper) Delete() error {
err := w.wal.Close()
if err != nil {
level.Warn(w.log).Log("msg", "failed to close WAL", "err", err)
}
err = os.RemoveAll(w.wal.Dir())
return err
}
func (w *wrapper) Log(record *wal.Record) error {
if record == nil || (len(record.Series) == 0 && len(record.RefEntries) == 0) {
return nil
}
// The code below extracts the wal write operations to when possible, batch both series and records writes
if len(record.Series) > 0 && len(record.RefEntries) > 0 {
return w.logBatched(record)
}
return w.logSingle(record)
}
// logBatched logs to the WAL both series and records, batching the operation to prevent unnecessary page flushes.
func (w *wrapper) logBatched(record *wal.Record) error {
seriesBuf := recordPool.GetBytes()
entriesBuf := recordPool.GetBytes()
defer func() {
recordPool.PutBytes(seriesBuf)
recordPool.PutBytes(entriesBuf)
}()
*seriesBuf = record.EncodeSeries(*seriesBuf)
*entriesBuf = record.EncodeEntries(wal.CurrentEntriesRec, *entriesBuf)
// Always write series then entries
return w.wal.Log(*seriesBuf, *entriesBuf)
}
// logSingle logs to the WAL series and records in separate WAL operation. This causes a page flush after each operation.
func (w *wrapper) logSingle(record *wal.Record) error {
buf := recordPool.GetBytes()
defer func() {
recordPool.PutBytes(buf)
}()
// Always write series then entries.
if len(record.Series) > 0 {
*buf = record.EncodeSeries(*buf)
if err := w.wal.Log(*buf); err != nil {
return err
}
*buf = (*buf)[:0]
}
if len(record.RefEntries) > 0 {
*buf = record.EncodeEntries(wal.CurrentEntriesRec, *buf)
if err := w.wal.Log(*buf); err != nil {
return err
}
}
return nil
}
// Sync flushes changes to disk. Mainly to be used for testing.
func (w *wrapper) Sync() error {
return w.wal.Sync()
}
// Dir returns the path to the WAL directory.
func (w *wrapper) Dir() string {
return w.wal.Dir()
}
// NextSegment closes the current segment synchronously. Mainly used for testing.
func (w *wrapper) NextSegment() (int, error) {
return w.wal.NextSegmentSync()
}