Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds WAL support (experimental) #2981

Merged
merged 58 commits into from
Nov 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
1b2b931
marshalable chunks
owen-d Oct 1, 2020
ddecf02
wal record types custom serialization
owen-d Oct 2, 2020
48fa77f
proto types for wal checkpoints
owen-d Oct 15, 2020
7ec0fb0
byteswith output unaffected by buffer
owen-d Oct 15, 2020
d6ccccf
wal & record pool ifcs
owen-d Oct 28, 2020
eb6a519
wal record can hold entries from multiple series
owen-d Oct 28, 2020
f089f09
entry pool
owen-d Oct 28, 2020
211356d
ingester uses noopWal
owen-d Oct 29, 2020
7adf966
removes duplicate argument passing in ingester code. adds ingester co…
owen-d Oct 29, 2020
29fafd6
Merge remote-tracking branch 'upstream/master' into wal/writes
owen-d Nov 4, 2020
5ed8ffb
segment writing
owen-d Nov 4, 2020
107b03b
[WIP] wal recovery from segments
owen-d Nov 5, 2020
fb194f5
replay uses sync.Maps & preserves WAL fingerprints
owen-d Nov 5, 2020
3d36b2d
in memory wal recovery
owen-d Nov 5, 2020
9a7eaa5
wal segment recovery
owen-d Nov 6, 2020
30cdbf2
ingester metrics struct
owen-d Nov 6, 2020
d723c0b
wal replay locks streamsMtx in instances, adds checkpoint codec
owen-d Nov 9, 2020
897196e
ingester metrics
owen-d Nov 9, 2020
8df2598
checkpointer
owen-d Nov 10, 2020
b5c2ef0
WAL checkpoint writer
owen-d Nov 10, 2020
60ebe5e
checkpointwriter can write multiple checkpoints
owen-d Nov 10, 2020
ed14e0c
reorgs checkpointing
owen-d Nov 10, 2020
5acf745
wires up checkpointwriter to wal
owen-d Nov 10, 2020
6dd28b1
ingester SeriesIter impl
owen-d Nov 10, 2020
a7e40fd
wires up ingesterRecoverer to consume checkpoints
owen-d Nov 10, 2020
957717b
generic recovery fn
owen-d Nov 10, 2020
9384e98
generic recovery fn
owen-d Nov 10, 2020
a1beb69
recover from both wal types
owen-d Nov 10, 2020
4e27f7c
cleans up old tmp checkpoints & allows aborting in flight checkpoints
owen-d Nov 12, 2020
45840bc
wires up wal checkpointing
owen-d Nov 12, 2020
d1b7f6c
more granular wal logging
owen-d Nov 12, 2020
2ca359d
fixes off by 1 wal truncation & removes double logging
owen-d Nov 12, 2020
ff6b35b
adds userID to wal records correctly
owen-d Nov 12, 2020
d811aef
wire chunk encoding tests
owen-d Nov 12, 2020
6eb4ec0
more granular wal metrics
owen-d Nov 12, 2020
7962164
checkpoint encoding test
owen-d Nov 12, 2020
1961726
Merge remote-tracking branch 'upstream/master' into wal/writes
owen-d Nov 13, 2020
67f298d
ignores debug bins
owen-d Nov 13, 2020
d4318c7
Merge remote-tracking branch 'upstream/master' into wal/writes
owen-d Nov 13, 2020
d43ecde
segment replay ignores out of orders
owen-d Nov 13, 2020
ce72f33
fixes bug between WAL reading []byte validity and proto unmarshalling…
owen-d Nov 13, 2020
c9614aa
conf validations, removes comments
owen-d Nov 13, 2020
571f8fc
flush on shutdown config
owen-d Nov 18, 2020
2c965f3
POST /ingester/shutdown
owen-d Nov 18, 2020
1c4d526
renames flush on shutdown
owen-d Nov 18, 2020
d9139de
wal & checkpoint use same segment size
owen-d Nov 18, 2020
9a4c810
Merge remote-tracking branch 'upstream/master' into wal/writes
owen-d Nov 19, 2020
bd9cee0
writes entries to wal regardless of tailers
owen-d Nov 19, 2020
e042812
makes wal checkpoing duration default to 5m
owen-d Nov 20, 2020
f632022
recovery metrics
owen-d Nov 20, 2020
a948785
encodes headchunks separately for wal purposes
owen-d Nov 20, 2020
2e972c7
merge upstream
owen-d Nov 23, 2020
43d8a78
typo
owen-d Nov 23, 2020
3166183
linting
owen-d Nov 23, 2020
6059256
Merge remote-tracking branch 'upstream/master' into wal/headblock
owen-d Nov 25, 2020
51ff8d7
addresses pr feedback
owen-d Nov 25, 2020
ff211ce
prevent shared access bug with tailers and entry pool
owen-d Nov 25, 2020
cb9bc46
removes stream push entry pool optimization
owen-d Nov 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*.output
*.tgz
*.exe
__debug_bin
requirements.lock
mixin/vendor/
cmd/loki/loki
Expand Down
4 changes: 4 additions & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ server:
http_listen_port: 3100

ingester:
wal:
enabled: true
dir: /tmp/wal
recover: true
lifecycler:
address: 127.0.0.1
ring:
Expand Down
10 changes: 10 additions & 0 deletions docs/sources/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The HTTP API includes the following endpoints:
- [Examples](#examples-8)
- [`GET /ready`](#get-ready)
- [`POST /flush`](#post-flush)
- [`POST /ingester/shutdown`](#post-shutdown)
- [`GET /metrics`](#get-metrics)
- [Series](#series)
- [Examples](#examples-9)
Expand Down Expand Up @@ -107,6 +108,7 @@ While these endpoints are exposed by just the distributor:
And these endpoints are exposed by just the ingester:

- [`POST /flush`](#post-flush)
- [`POST /ingester/shutdown`](#post-shutdown)

The API endpoints starting with `/loki/` are [Prometheus API-compatible](https://prometheus.io/docs/prometheus/latest/querying/api/) and the result formats can be used interchangeably.

Expand Down Expand Up @@ -844,6 +846,14 @@ backing store. Mainly used for local testing.

In microservices mode, the `/flush` endpoint is exposed by the ingester.

## `POST /ingester/shutdown`

`/ingester/shutdown` triggers a shutdown of the ingester and notably will _always_ flush any in memory chunks it holds.
This is helpful for scaling down WAL-enabled ingesters where we want to ensure old WAL directories are not orphaned,
but instead flushed to our chunk backend.

In microservices mode, the `/ingester/shutdown` endpoint is exposed by the ingester.

## `GET /metrics`

`/metrics` exposes Prometheus metrics. See
Expand Down
13 changes: 13 additions & 0 deletions pkg/chunkenc/encoding_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,17 @@ func (d *decbuf) byte() byte {
return x
}

func (d *decbuf) bytes(n int) []byte {
if d.e != nil {
return nil
}
if len(d.b) < n {
d.e = ErrInvalidSize
return nil
}
x := d.b[:n]
d.b = d.b[n:]
return x
}

func (d *decbuf) err() error { return d.e }
130 changes: 124 additions & 6 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,94 @@ func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) {
return outBuf.Bytes(), nil
}

// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing,
// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but
// needs to serialize/deserialize the data to disk to ensure data durability.
func (hb *headBlock) CheckpointBytes(version byte) ([]byte, error) {
owen-d marked this conversation as resolved.
Show resolved Hide resolved
encB := BytesBufferPool.Get(1 << 10).([]byte)

defer func() {
BytesBufferPool.Put(encB[:0])
}()

buf := bytes.NewBuffer(make([]byte, 0, 1<<10))
eb := encbuf{b: encB}

eb.putByte(version)
_, err := buf.Write(eb.get())
if err != nil {
return nil, errors.Wrap(err, "write headBlock version")
}
eb.reset()

eb.putUvarint(len(hb.entries))
eb.putUvarint(hb.size)
eb.putVarint64(hb.mint)
eb.putVarint64(hb.maxt)

_, err = buf.Write(eb.get())
if err != nil {
return nil, errors.Wrap(err, "write headBlock metas")
}
eb.reset()

for _, entry := range hb.entries {
eb.putVarint64(entry.t)
eb.putUvarint(len(entry.s))
_, err = buf.Write(eb.get())
if err != nil {
return nil, errors.Wrap(err, "write headBlock entry ts")
}
eb.reset()

_, err := buf.WriteString(entry.s)
if err != nil {
return nil, errors.Wrap(err, "write headblock entry line")
}
}
return buf.Bytes(), nil
}

func (hb *headBlock) FromCheckpoint(b []byte) error {
owen-d marked this conversation as resolved.
Show resolved Hide resolved
if len(b) < 1 {
return nil
}

db := decbuf{b: b}

version := db.byte()
if db.err() != nil {
return errors.Wrap(db.err(), "verifying headblock header")
}
if version != chunkFormatV3 {
return errors.New("incompatible headBlock version, only V3 is currently supported")
}

ln := db.uvarint()
hb.size = db.uvarint()
hb.mint = db.varint64()
hb.maxt = db.varint64()

if err := db.err(); err != nil {
return errors.Wrap(err, "verifying headblock metadata")
}

hb.entries = make([]entry, ln)
for i := 0; i < ln && db.err() == nil; i++ {
var entry entry
entry.t = db.varint64()
lineLn := db.uvarint()
entry.s = string(db.bytes(lineLn))
hb.entries[i] = entry
}

if err := db.err(); err != nil {
return errors.Wrap(err, "decoding entries")
}

return nil
}

type entry struct {
t int64
s string
Expand Down Expand Up @@ -256,6 +344,7 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
}

// BytesWith uses a provided []byte for buffer instantiation
// NOTE: This does not cut the head block nor include any head block data.
func (c *MemChunk) BytesWith(b []byte) ([]byte, error) {
buf := bytes.NewBuffer(b[:0])
if _, err := c.WriteTo(buf); err != nil {
Expand All @@ -265,18 +354,18 @@ func (c *MemChunk) BytesWith(b []byte) ([]byte, error) {
}

// Bytes implements Chunk.
// NOTE: Does not cut head block or include any head block data.
func (c *MemChunk) Bytes() ([]byte, error) {
return c.BytesWith(nil)
}

// WriteTo Implements io.WriterTo
// NOTE: Does not cut head block or include any head block data.
// For this to be the case you must call Close() first.
// This decision notably enables WAL checkpointing, which would otherwise
// result in different content addressable chunks in storage based on the timing of when
// they were checkpointed (which would cause new blocks to be cut early).
func (c *MemChunk) WriteTo(w io.Writer) (int64, error) {
if c.head != nil {
// When generating the bytes, we need to flush the data held in-buffer.
if err := c.cut(); err != nil {
return 0, err
}
}
crc32Hash := newCRC32()

offset := int64(0)
Expand Down Expand Up @@ -348,6 +437,35 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) {
return offset, nil
}

// SerializeForCheckpoint returns []bytes representing the chunk & head. This is to ensure eventually
// flushed chunks don't have different substructures depending on when they were checkpointed.
// In turn this allows us to maintain a more effective dedupe ratio in storage.
func (c *MemChunk) SerializeForCheckpoint(b []byte) (chk, head []byte, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion, may be chk, head []byte could be in a struct like:

type checkpoint struct {
chk, head []byte
}

chk, err = c.BytesWith(b)
if err != nil {
return nil, nil, err
}

if c.head.isEmpty() {
return chk, nil, nil
}

head, err = c.head.CheckpointBytes(c.format)
if err != nil {
return nil, nil, err
}

return chk, head, nil
}

func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*MemChunk, error) {
mc, err := NewByteChunk(chk, blockSize, targetSize)
if err != nil {
return nil, err
}
return mc, mc.head.FromCheckpoint(head)
}

// Encoding implements Chunk.
func (c *MemChunk) Encoding() Encoding {
return c.encoding
Expand Down
63 changes: 63 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func TestSerialization(t *testing.T) {
for i := 0; i < numSamples; i++ {
require.NoError(t, chk.Append(logprotoEntry(int64(i), strconv.Itoa(i))))
}
require.NoError(t, chk.Close())

byt, err := chk.Bytes()
require.NoError(t, err)
Expand Down Expand Up @@ -840,6 +841,68 @@ func TestBytesWith(t *testing.T) {
require.Equal(t, exp, out)
}

func TestHeadBlockCheckpointing(t *testing.T) {
c := NewMemChunk(EncSnappy, 256*1024, 1500*1024)

// add a few entries
for i := 0; i < 5; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}

// ensure blocks are not cut
require.Equal(t, 0, len(c.blocks))

b, err := c.head.CheckpointBytes(c.format)
require.Nil(t, err)

hb := &headBlock{}
require.Nil(t, hb.FromCheckpoint(b))
require.Equal(t, c.head, hb)
}

func TestCheckpointEncoding(t *testing.T) {
blockSize, targetSize := 256*1024, 1500*1024
c := NewMemChunk(EncSnappy, blockSize, targetSize)

// add a few entries
for i := 0; i < 5; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}

// cut it
require.Nil(t, c.cut())

// add a few more to head
for i := 5; i < 10; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}

// ensure new blocks are not cut
require.Equal(t, 1, len(c.blocks))

chk, head, err := c.SerializeForCheckpoint(nil)
require.Nil(t, err)

cpy, err := MemchunkFromCheckpoint(chk, head, blockSize, targetSize)
require.Nil(t, err)
require.Equal(t, c, cpy)
}

var streams = []logproto.Stream{}
var series = []logproto.Series{}

Expand Down
1 change: 1 addition & 0 deletions pkg/chunkenc/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,6 @@ func fillChunk(c Chunk) int64 {
entry.Line = testdata.LogString(i)

}
_ = c.Close()
return inserted
}
Loading