Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't fail writes due to full WAL disk #3136

Merged
merged 3 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions docs/sources/operations/storage/wal.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,29 @@ title: Write Ahead Log

# Write Ahead Log (WAL)

Ingesters temporarily store data in memory. In the event of a crash, there could be data loss. The WAL helps fill this gap in reliability.

Ingesters store all their data in memory. If there is a crash, there can be data loss. The WAL helps fill this gap in reliability.
This section will use Kubernetes as a reference.
The WAL in Loki records incoming data and stores it on the local file system in order to guarantee persistence of acknowledged data in the event of a process crash. Upon restart, Loki will "replay" all of the data in the log before registering itself as ready for subsequent writes. This allows Loki to maintain the performance & cost benefits of buffering data in memory _and_ durability benefits (it won't lose data once a write has been acknowledged).

To use the WAL, there are some changes that needs to be made.
This section will use Kubernetes as a reference deployment paradigm in the examples.

## Disclaimer & WAL nuances

The Write Ahead Log in Loki takes a few particular tradeoffs compared to other WALs you may be familiar with. The WAL aims to add additional durability guarantees, but _not at the expense of availability_. Particularly, there are two scenarios where the WAL sacrifices these guarantees.

1) Corruption/Deletion of the WAL prior to replaying it

In the event the WAL is corrupted/partially deleted, Loki will not be able to recover all of it's data. In this case, Loki will attempt to recover any data it can, but will not prevent Loki from starting.

Note: the Prometheus metric `loki_ingester_wal_corruptions_total` can be used to track and alert when this happens.

1) No space left on disk

In the event the underlying WAL disk is full, Loki will not fail incoming writes, but neither will it log them to the WAL. In this case, the persistence guarantees across process restarts will not hold.
Copy link
Collaborator

@slim-bean slim-bean Jan 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting thought, feel free to tell me this is too much scope.

If we know writing to the WAL is failing can we force a flush on shutdown?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a great idea. Otherwise, we'd end up having to remove an ingester from traffic and wait for chunk_idle to elapse before shutting it down.


Note: the Prometheus metric `loki_ingester_wal_disk_full_failures_total` can be used to track and alert when this happens.

### Metrics

## Changes to deployment

Expand Down
29 changes: 15 additions & 14 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ func ensureIngesterData(ctx context.Context, t *testing.T, start, end time.Time,
require.Len(t, result.resps[0].Streams[1].Entries, ln)
}

func TestIngesterWAL(t *testing.T) {

walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

func defaultIngesterTestConfigWithWAL(t *testing.T, walDir string) Config {
ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.MaxTransferRetries = 0
ingesterConfig.WAL = WALConfig{
Expand All @@ -52,6 +47,18 @@ func TestIngesterWAL(t *testing.T) {
Recover: true,
CheckpointDuration: time.Second,
}

return ingesterConfig
}

func TestIngesterWAL(t *testing.T) {

walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)

limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

Expand Down Expand Up @@ -134,14 +141,8 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
require.Nil(t, err)
defer os.RemoveAll(walDir)

ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.MaxTransferRetries = 0
ingesterConfig.WAL = WALConfig{
Enabled: true,
Dir: walDir,
Recover: true,
CheckpointDuration: time.Second,
}
ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)

limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

Expand Down
39 changes: 34 additions & 5 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package ingester

import (
"fmt"
"io/ioutil"
"os"
"sort"
"sync"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -44,7 +47,7 @@ func TestChunkFlushingIdle(t *testing.T) {
cfg.MaxChunkIdle = 100 * time.Millisecond
cfg.RetainPeriod = 500 * time.Millisecond

store, ing := newTestStore(t, cfg)
store, ing := newTestStore(t, cfg, nil)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
testData := pushTestSamples(t, ing)

Expand All @@ -54,7 +57,25 @@ func TestChunkFlushingIdle(t *testing.T) {
}

func TestChunkFlushingShutdown(t *testing.T) {
store, ing := newTestStore(t, defaultIngesterTestConfig(t))
store, ing := newTestStore(t, defaultIngesterTestConfig(t), nil)
testData := pushTestSamples(t, ing)
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
store.checkData(t, testData)
}

type fullWAL struct{}

func (fullWAL) Log(_ *WALRecord) error { return &os.PathError{Err: syscall.ENOSPC} }
func (fullWAL) Stop() error { return nil }

func TestWALFullFlush(t *testing.T) {
// technically replaced with a fake wal, but the ingester New() function creates a regular wal first,
// so we enable creation/cleanup even though it remains unused.
walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

store, ing := newTestStore(t, defaultIngesterTestConfigWithWAL(t, walDir), fullWAL{})
testData := pushTestSamples(t, ing)
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
store.checkData(t, testData)
Expand All @@ -66,7 +87,7 @@ func TestFlushingCollidingLabels(t *testing.T) {
cfg.MaxChunkIdle = 100 * time.Millisecond
cfg.RetainPeriod = 500 * time.Millisecond

store, ing := newTestStore(t, cfg)
store, ing := newTestStore(t, cfg, nil)
defer store.Stop()

const userID = "testUser"
Expand Down Expand Up @@ -112,7 +133,7 @@ func TestFlushMaxAge(t *testing.T) {
cfg.MaxChunkAge = time.Minute
cfg.MaxChunkIdle = time.Hour

store, ing := newTestStore(t, cfg)
store, ing := newTestStore(t, cfg, nil)
defer store.Stop()

now := time.Unix(0, 0)
Expand Down Expand Up @@ -166,7 +187,10 @@ type testStore struct {
chunks map[string][]chunk.Chunk
}

func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
// Note: the ingester New() function creates it's own WAL first which we then override if specified.
// Because of this, ensure any WAL directories exist/are cleaned up even when overriding the wal.
// This is an ugly hook for testing :(
func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore, *Ingester) {
store := &testStore{
chunks: map[string][]chunk.Chunk{},
}
Expand All @@ -178,6 +202,11 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))

if walOverride != nil {
_ = ing.wal.Stop()
ing.wal = walOverride
}

return store, ing
}

Expand Down
29 changes: 19 additions & 10 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ type Ingester struct {

limiter *Limiter

// Denotes whether the ingester should flush on shutdown.
// Currently only used by the WAL to signal when the disk is full.
flushOnShutdownSwitch *OnceSwitch

metrics *ingesterMetrics

wal WAL
Expand All @@ -169,15 +173,16 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
metrics := newIngesterMetrics(registerer)

i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
metrics: metrics,
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
metrics: metrics,
flushOnShutdownSwitch: &OnceSwitch{},
}

if cfg.WAL.Enabled {
Expand Down Expand Up @@ -319,6 +324,10 @@ func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()
var errs errUtil.MultiError
errs.Add(i.wal.Stop())

if i.flushOnShutdownSwitch.Get() {
i.lifecycler.SetFlushOnShutdown(true)
}
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler))

// Normally, flushers are stopped via lifecycler (in transferOut), but if lifecycler fails,
Expand Down Expand Up @@ -384,7 +393,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics)
inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics, i.flushOnShutdownSwitch)
i.instances[instanceID] = inst
}
return inst
Expand Down
57 changes: 54 additions & 3 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package ingester
import (
"context"
"net/http"
"os"
"sync"
"syscall"

"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -15,6 +18,7 @@ import (

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ingester/index"
"github.com/cortexproject/cortex/pkg/util"
cutil "github.com/cortexproject/cortex/pkg/util"

"github.com/grafana/loki/pkg/helpers"
Expand Down Expand Up @@ -77,6 +81,10 @@ type instance struct {

wal WAL

// Denotes whether the ingester should flush on shutdown.
// Currently only used by the WAL to signal when the disk is full.
flushOnShutdownSwitch *OnceSwitch

metrics *ingesterMetrics
}

Expand All @@ -86,6 +94,7 @@ func newInstance(
limiter *Limiter,
wal WAL,
metrics *ingesterMetrics,
flushOnShutdownSwitch *OnceSwitch,
) *instance {
i := &instance{
cfg: cfg,
Expand All @@ -101,8 +110,9 @@ func newInstance(
tailers: map[uint32]*tailer{},
limiter: limiter,

wal: wal,
metrics: metrics,
wal: wal,
metrics: metrics,
flushOnShutdownSwitch: flushOnShutdownSwitch,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
Expand Down Expand Up @@ -161,8 +171,19 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {

if !record.IsEmpty() {
if err := i.wal.Log(record); err != nil {
return err
if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC {
i.metrics.walDiskFullFailures.Inc()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we could log that this was happening, but we don't want to spam the logs.

Thoughts on using a boolean kept in the instance to log something one time, and then perhaps clear the bool and log that the error is cleared if writes start succeeding again?

i.flushOnShutdownSwitch.TriggerAnd(func() {
level.Error(util.Logger).Log(
"msg",
"Error writing to WAL, disk full, no further messages will be logged for this error",
)
})
} else {
return err
}
}

}

return appendErr
Expand Down Expand Up @@ -578,3 +599,33 @@ func shouldConsiderStream(stream *stream, req *logproto.SeriesRequest) bool {
}
return false
}

// OnceSwitch is a write optimized switch that can only ever be switched "on".
// It uses a RWMutex underneath the hood to quickly and effectively (in a concurrent environment)
// check if the switch has already been triggered, only actually acquiring the mutex for writing if not.
type OnceSwitch struct {
sync.RWMutex
toggle bool
}

func (o *OnceSwitch) Get() bool {
o.RLock()
defer o.RUnlock()
return o.toggle
}

// TriggerAnd will ensure the switch is on and run the provided function if
// the switch was not already toggled on.
func (o *OnceSwitch) TriggerAnd(fn func()) {
o.RLock()
if o.toggle {
o.RUnlock()
return
}

o.RUnlock()
o.Lock()
o.toggle = true
o.Unlock()
fn()
}
12 changes: 6 additions & 6 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestLabelsCollisions(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil)
i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil, &OnceSwitch{})

// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
Expand All @@ -62,7 +62,7 @@ func TestConcurrentPushes(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})

const (
concurrent = 10
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)

inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
lbls := makeRandomLabels()

tt := time.Now()
Expand Down Expand Up @@ -160,7 +160,7 @@ func Test_SeriesQuery(t *testing.T) {
cfg.SyncPeriod = 1 * time.Minute
cfg.SyncMinUtilization = 0.20

instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics)
instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})

currentTime := time.Now()

Expand Down Expand Up @@ -271,7 +271,7 @@ func Benchmark_PushInstance(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics)
i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
ctx := context.Background()

for n := 0; n < b.N; n++ {
Expand Down Expand Up @@ -313,7 +313,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {

ctx := context.Background()

inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics)
inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil)
require.NoError(b, err)
for i := 0; i < 10000; i++ {
Expand Down
Loading