Skip to content

Commit

Permalink
Adds WAL support (experimental) (#2981)
Browse files Browse the repository at this point in the history
* marshalable chunks

* wal record types custom serialization

* proto types for wal checkpoints

* byteswith output unaffected by buffer

* wal & record pool ifcs

* wal record can hold entries from multiple series

* entry pool

* ingester uses noopWal

* removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding.

* segment writing

* [WIP] wal recovery from segments

* replay uses sync.Maps & preserves WAL fingerprints

* in memory wal recovery

* wal segment recovery

* ingester metrics struct

* wal replay locks streamsMtx in instances, adds checkpoint codec

* ingester metrics

* checkpointer

* WAL checkpoint writer

* checkpointwriter can write multiple checkpoints

* reorgs checkpointing

* wires up checkpointwriter to wal

* ingester SeriesIter impl

* wires up ingesterRecoverer to consume checkpoints

* generic recovery fn

* generic recovery fn

* recover from both wal types

* cleans up old tmp checkpoints & allows aborting in flight checkpoints

* wires up wal checkpointing

* more granular wal logging

* fixes off by 1 wal truncation & removes double logging

* adds userID to wal records correctly

* wire chunk encoding tests

* more granular wal metrics

* checkpoint encoding test

* ignores debug bins

* segment replay ignores out of orders

* fixes bug between WAL reading []byte validity and proto unmarshalling refs

* conf validations, removes comments

* flush on shutdown config

* POST /ingester/shutdown

* renames flush on shutdown

* wal & checkpoint use same segment size

* writes entries to wal regardless of tailers

* makes wal checkpoing duration default to 5m

* recovery metrics

* encodes headchunks separately for wal purposes

* merge upstream

* linting

* addresses pr feedback

uses entry pool in stream push/tailer

removes unnecessary pool interaction

checkpointbytes comment

fillchunk helper, record resetting in tests via pool

redundant comment

defers wg done in recovery

s/num/count/

checkpoint wal uses a logger

encodeWithTypeHeader now creates its own []byte

removes pool from decodeEntries

wal stop can error

* prevent shared access bug with tailers and entry pool

* removes stream push entry pool optimization
  • Loading branch information
owen-d authored Nov 27, 2020
1 parent ae9c4b8 commit 4d9865a
Show file tree
Hide file tree
Showing 26 changed files with 2,387 additions and 308 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*.output
*.tgz
*.exe
__debug_bin
requirements.lock
mixin/vendor/
cmd/loki/loki
Expand Down
4 changes: 4 additions & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ server:
http_listen_port: 3100

ingester:
wal:
enabled: true
dir: /tmp/wal
recover: true
lifecycler:
address: 127.0.0.1
ring:
Expand Down
10 changes: 10 additions & 0 deletions docs/sources/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The HTTP API includes the following endpoints:
- [Examples](#examples-8)
- [`GET /ready`](#get-ready)
- [`POST /flush`](#post-flush)
- [`POST /ingester/shutdown`](#post-shutdown)
- [`GET /metrics`](#get-metrics)
- [Series](#series)
- [Examples](#examples-9)
Expand Down Expand Up @@ -107,6 +108,7 @@ While these endpoints are exposed by just the distributor:
And these endpoints are exposed by just the ingester:

- [`POST /flush`](#post-flush)
- [`POST /ingester/shutdown`](#post-shutdown)

The API endpoints starting with `/loki/` are [Prometheus API-compatible](https://prometheus.io/docs/prometheus/latest/querying/api/) and the result formats can be used interchangeably.

Expand Down Expand Up @@ -844,6 +846,14 @@ backing store. Mainly used for local testing.

In microservices mode, the `/flush` endpoint is exposed by the ingester.

## `POST /ingester/shutdown`

`/ingester/shutdown` triggers a shutdown of the ingester and notably will _always_ flush any in memory chunks it holds.
This is helpful for scaling down WAL-enabled ingesters where we want to ensure old WAL directories are not orphaned,
but instead flushed to our chunk backend.

In microservices mode, the `/ingester/shutdown` endpoint is exposed by the ingester.

## `GET /metrics`

`/metrics` exposes Prometheus metrics. See
Expand Down
13 changes: 13 additions & 0 deletions pkg/chunkenc/encoding_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,17 @@ func (d *decbuf) byte() byte {
return x
}

func (d *decbuf) bytes(n int) []byte {
if d.e != nil {
return nil
}
if len(d.b) < n {
d.e = ErrInvalidSize
return nil
}
x := d.b[:n]
d.b = d.b[n:]
return x
}

func (d *decbuf) err() error { return d.e }
130 changes: 124 additions & 6 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,94 @@ func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) {
return outBuf.Bytes(), nil
}

// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing,
// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but
// needs to serialize/deserialize the data to disk to ensure data durability.
func (hb *headBlock) CheckpointBytes(version byte) ([]byte, error) {
encB := BytesBufferPool.Get(1 << 10).([]byte)

defer func() {
BytesBufferPool.Put(encB[:0])
}()

buf := bytes.NewBuffer(make([]byte, 0, 1<<10))
eb := encbuf{b: encB}

eb.putByte(version)
_, err := buf.Write(eb.get())
if err != nil {
return nil, errors.Wrap(err, "write headBlock version")
}
eb.reset()

eb.putUvarint(len(hb.entries))
eb.putUvarint(hb.size)
eb.putVarint64(hb.mint)
eb.putVarint64(hb.maxt)

_, err = buf.Write(eb.get())
if err != nil {
return nil, errors.Wrap(err, "write headBlock metas")
}
eb.reset()

for _, entry := range hb.entries {
eb.putVarint64(entry.t)
eb.putUvarint(len(entry.s))
_, err = buf.Write(eb.get())
if err != nil {
return nil, errors.Wrap(err, "write headBlock entry ts")
}
eb.reset()

_, err := buf.WriteString(entry.s)
if err != nil {
return nil, errors.Wrap(err, "write headblock entry line")
}
}
return buf.Bytes(), nil
}

func (hb *headBlock) FromCheckpoint(b []byte) error {
if len(b) < 1 {
return nil
}

db := decbuf{b: b}

version := db.byte()
if db.err() != nil {
return errors.Wrap(db.err(), "verifying headblock header")
}
if version != chunkFormatV3 {
return errors.New("incompatible headBlock version, only V3 is currently supported")
}

ln := db.uvarint()
hb.size = db.uvarint()
hb.mint = db.varint64()
hb.maxt = db.varint64()

if err := db.err(); err != nil {
return errors.Wrap(err, "verifying headblock metadata")
}

hb.entries = make([]entry, ln)
for i := 0; i < ln && db.err() == nil; i++ {
var entry entry
entry.t = db.varint64()
lineLn := db.uvarint()
entry.s = string(db.bytes(lineLn))
hb.entries[i] = entry
}

if err := db.err(); err != nil {
return errors.Wrap(err, "decoding entries")
}

return nil
}

type entry struct {
t int64
s string
Expand Down Expand Up @@ -256,6 +344,7 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
}

// BytesWith uses a provided []byte for buffer instantiation
// NOTE: This does not cut the head block nor include any head block data.
func (c *MemChunk) BytesWith(b []byte) ([]byte, error) {
buf := bytes.NewBuffer(b[:0])
if _, err := c.WriteTo(buf); err != nil {
Expand All @@ -265,18 +354,18 @@ func (c *MemChunk) BytesWith(b []byte) ([]byte, error) {
}

// Bytes implements Chunk.
// NOTE: Does not cut head block or include any head block data.
func (c *MemChunk) Bytes() ([]byte, error) {
return c.BytesWith(nil)
}

// WriteTo Implements io.WriterTo
// NOTE: Does not cut head block or include any head block data.
// For this to be the case you must call Close() first.
// This decision notably enables WAL checkpointing, which would otherwise
// result in different content addressable chunks in storage based on the timing of when
// they were checkpointed (which would cause new blocks to be cut early).
func (c *MemChunk) WriteTo(w io.Writer) (int64, error) {
if c.head != nil {
// When generating the bytes, we need to flush the data held in-buffer.
if err := c.cut(); err != nil {
return 0, err
}
}
crc32Hash := newCRC32()

offset := int64(0)
Expand Down Expand Up @@ -348,6 +437,35 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) {
return offset, nil
}

// SerializeForCheckpoint returns []bytes representing the chunk & head. This is to ensure eventually
// flushed chunks don't have different substructures depending on when they were checkpointed.
// In turn this allows us to maintain a more effective dedupe ratio in storage.
func (c *MemChunk) SerializeForCheckpoint(b []byte) (chk, head []byte, err error) {
chk, err = c.BytesWith(b)
if err != nil {
return nil, nil, err
}

if c.head.isEmpty() {
return chk, nil, nil
}

head, err = c.head.CheckpointBytes(c.format)
if err != nil {
return nil, nil, err
}

return chk, head, nil
}

func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*MemChunk, error) {
mc, err := NewByteChunk(chk, blockSize, targetSize)
if err != nil {
return nil, err
}
return mc, mc.head.FromCheckpoint(head)
}

// Encoding implements Chunk.
func (c *MemChunk) Encoding() Encoding {
return c.encoding
Expand Down
63 changes: 63 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func TestSerialization(t *testing.T) {
for i := 0; i < numSamples; i++ {
require.NoError(t, chk.Append(logprotoEntry(int64(i), strconv.Itoa(i))))
}
require.NoError(t, chk.Close())

byt, err := chk.Bytes()
require.NoError(t, err)
Expand Down Expand Up @@ -840,6 +841,68 @@ func TestBytesWith(t *testing.T) {
require.Equal(t, exp, out)
}

func TestHeadBlockCheckpointing(t *testing.T) {
c := NewMemChunk(EncSnappy, 256*1024, 1500*1024)

// add a few entries
for i := 0; i < 5; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}

// ensure blocks are not cut
require.Equal(t, 0, len(c.blocks))

b, err := c.head.CheckpointBytes(c.format)
require.Nil(t, err)

hb := &headBlock{}
require.Nil(t, hb.FromCheckpoint(b))
require.Equal(t, c.head, hb)
}

func TestCheckpointEncoding(t *testing.T) {
blockSize, targetSize := 256*1024, 1500*1024
c := NewMemChunk(EncSnappy, blockSize, targetSize)

// add a few entries
for i := 0; i < 5; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}

// cut it
require.Nil(t, c.cut())

// add a few more to head
for i := 5; i < 10; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}

// ensure new blocks are not cut
require.Equal(t, 1, len(c.blocks))

chk, head, err := c.SerializeForCheckpoint(nil)
require.Nil(t, err)

cpy, err := MemchunkFromCheckpoint(chk, head, blockSize, targetSize)
require.Nil(t, err)
require.Equal(t, c, cpy)
}

var streams = []logproto.Stream{}
var series = []logproto.Series{}

Expand Down
1 change: 1 addition & 0 deletions pkg/chunkenc/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,6 @@ func fillChunk(c Chunk) int64 {
entry.Line = testdata.LogString(i)

}
_ = c.Close()
return inserted
}
Loading

0 comments on commit 4d9865a

Please sign in to comment.