Skip to content

Commit

Permalink
Add support for global size-based retention policy (grafana/phlare#628)
Browse files Browse the repository at this point in the history
* feat: support for global size-based retention policy

* feat: use manager for ingester subservices

* add querier block eviction test

* decouple retention enforcer and ingester
  • Loading branch information
kolesnikovae committed May 12, 2023
1 parent 9b37118 commit d32dbce
Show file tree
Hide file tree
Showing 7 changed files with 641 additions and 334 deletions.
74 changes: 45 additions & 29 deletions pkg/ingester/ingester.go
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

profilev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1"
Expand Down Expand Up @@ -51,8 +53,9 @@ type Ingester struct {
logger log.Logger
phlarectx context.Context

lifecycler *ring.Lifecycler
lifecyclerWatcher *services.FailureWatcher
lifecycler *ring.Lifecycler
subservices *services.Manager
subservicesWatcher *services.FailureWatcher

storageBucket phlareobjstore.Bucket

Expand Down Expand Up @@ -98,36 +101,42 @@ func New(phlarectx context.Context, cfg Config, dbConfig phlaredb.Config, storag
return nil, err
}

i.lifecyclerWatcher = services.NewFailureWatcher()
i.lifecyclerWatcher.WatchService(i.lifecycler)
rpEnforcer := newRetentionPolicyEnforcer(phlarecontext.Logger(phlarectx), i, defaultRetentionPolicy(), dbConfig)
i.subservices, err = services.NewManager(i.lifecycler, rpEnforcer)
if err != nil {
return nil, errors.Wrap(err, "services manager")
}
i.subservicesWatcher = services.NewFailureWatcher()
i.subservicesWatcher.WatchManager(i.subservices)
i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
return i, nil
}

func (i *Ingester) starting(ctx context.Context) error {
// pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
err := i.lifecycler.StartAsync(context.Background())
if err != nil {
return err
}

err = i.lifecycler.AwaitRunning(ctx)
if err != nil {
return err
}

return nil
return services.StartManagerAndAwaitHealthy(ctx, i.subservices)
}

func (i *Ingester) running(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case err := <-i.lifecyclerWatcher.Chan(): // handle lifecycler errors
case err := <-i.subservicesWatcher.Chan(): // handle lifecycler errors
return fmt.Errorf("lifecycler failed: %w", err)
}
}

func (i *Ingester) stopping(_ error) error {
errs := multierror.New()
errs.Add(services.StopManagerAndAwaitStopped(context.Background(), i.subservices))
// stop all instances
i.instancesMtx.RLock()
defer i.instancesMtx.RUnlock()
for _, inst := range i.instances {
errs.Add(inst.Stop())
}
return errs.Err()
}

func (i *Ingester) GetOrCreateInstance(tenantID string) (*instance, error) { //nolint:revive
inst, ok := i.getInstanceByID(tenantID)
if ok {
Expand Down Expand Up @@ -184,6 +193,25 @@ func (i *Ingester) forInstance(ctx context.Context, f func(*instance) error) err
return f(instance)
}

func (i *Ingester) evictBlock(tenantID string, b ulid.ULID, fn func() error) error {
// We lock instances map for writes to ensure that no new instances are
// created during the procedure. Otherwise, during initialization, the
// new PhlareDB instance may try to load a block that has already been
// deleted, or is being deleted.
i.instancesMtx.RLock()
defer i.instancesMtx.RUnlock()
// The map only contains PhlareDB instances that has been initialized since
// the process start, therefore there is no guarantee that we will find the
// discovered candidate block there. If it is the case, we have to ensure that
// the block won't be accessed, before and during deleting it from the disk.
if tenantInstance, ok := i.instances[tenantID]; ok {
if _, err := tenantInstance.Evict(b); err != nil {
return fmt.Errorf("failed to evict block %s/%s: %w", tenantID, b, err)
}
}
return fn()
}

func (i *Ingester) Push(ctx context.Context, req *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error) {
return forInstanceUnary(ctx, i, func(instance *instance) (*connect.Response[pushv1.PushResponse], error) {
level.Debug(instance.logger).Log("msg", "message received by ingester push")
Expand Down Expand Up @@ -218,18 +246,6 @@ func (i *Ingester) Push(ctx context.Context, req *connect.Request[pushv1.PushReq
})
}

func (i *Ingester) stopping(_ error) error {
errs := multierror.New()
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler))
// stop all instances
i.instancesMtx.RLock()
defer i.instancesMtx.RUnlock()
for _, inst := range i.instances {
errs.Add(inst.Stop())
}
return errs.Err()
}

func (i *Ingester) Flush(ctx context.Context, req *connect.Request[ingesterv1.FlushRequest]) (*connect.Response[ingesterv1.FlushResponse], error) {
i.instancesMtx.RLock()
defer i.instancesMtx.RUnlock()
Expand Down
225 changes: 225 additions & 0 deletions pkg/ingester/retention.go
@@ -0,0 +1,225 @@
package ingester

import (
"context"
"fmt"
"io/fs"
"os"
"path/filepath"
"sort"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"

"github.com/grafana/phlare/pkg/phlaredb"
"github.com/grafana/phlare/pkg/phlaredb/block"
diskutil "github.com/grafana/phlare/pkg/util/disk"
)

const (
defaultMinFreeDisk = 10 * 1024 * 1024 * 1024 // 10Gi
defaultMinDiskAvailablePercentage = 0.05
defaultRetentionPolicyEnforcementInterval = 5 * time.Minute

// TODO(kolesnikovae): Unify with pkg/phlaredb.
phlareDBLocalPath = "local"
)

type retentionPolicy struct {
MinFreeDisk uint64
MinDiskAvailablePercentage float64
EnforcementInterval time.Duration
}

func defaultRetentionPolicy() retentionPolicy {
return retentionPolicy{
MinFreeDisk: defaultMinFreeDisk,
MinDiskAvailablePercentage: defaultMinDiskAvailablePercentage,
EnforcementInterval: defaultRetentionPolicyEnforcementInterval,
}
}

type retentionPolicyEnforcer struct {
services.Service

logger log.Logger
retentionPolicy retentionPolicy
blockEvicter blockEvicter
dbConfig phlaredb.Config
fileSystem fileSystem
volumeChecker diskutil.VolumeChecker

stopCh chan struct{}
wg sync.WaitGroup
}

type tenantBlock struct {
ulid ulid.ULID
tenantID string
path string
}

type fileSystem interface {
fs.ReadDirFS
RemoveAll(string) error
}

type realFileSystem struct{}

func (*realFileSystem) Open(name string) (fs.File, error) { return os.Open(name) }
func (*realFileSystem) ReadDir(name string) ([]fs.DirEntry, error) { return os.ReadDir(name) }
func (*realFileSystem) RemoveAll(path string) error { return os.RemoveAll(path) }

// blockEvicter unloads blocks from tenant instance.
type blockEvicter interface {
// evictBlock evicts the block by its ID for the given tenant and invokes
// fn callback, if the tenant is found. The call is thread-safe: tenant
// can't be added or removed during the execution.
evictBlock(tenant string, b ulid.ULID, fn func() error) error
}

func newRetentionPolicyEnforcer(logger log.Logger, blockEvicter blockEvicter, retentionPolicy retentionPolicy, dbConfig phlaredb.Config) *retentionPolicyEnforcer {
e := retentionPolicyEnforcer{
logger: logger,
blockEvicter: blockEvicter,
retentionPolicy: retentionPolicy,
dbConfig: dbConfig,
stopCh: make(chan struct{}),
fileSystem: new(realFileSystem),
volumeChecker: diskutil.NewVolumeChecker(retentionPolicy.MinFreeDisk, retentionPolicy.MinDiskAvailablePercentage),
}
e.Service = services.NewBasicService(nil, e.running, e.stopping)
return &e
}

func (e *retentionPolicyEnforcer) running(ctx context.Context) error {
e.wg.Add(1)
retentionPolicyEnforcerTicker := time.NewTicker(e.retentionPolicy.EnforcementInterval)
defer func() {
retentionPolicyEnforcerTicker.Stop()
e.wg.Done()
}()
for {
// Enforce retention policy immediately at start.
level.Debug(e.logger).Log("msg", "enforcing retention policy")
if err := e.cleanupBlocksWhenHighDiskUtilization(ctx); err != nil {
level.Error(e.logger).Log("msg", "failed to enforce retention policy", "err", err)
}
select {
case <-retentionPolicyEnforcerTicker.C:
case <-ctx.Done():
return nil
case <-e.stopCh:
return nil
}
}
}

func (e *retentionPolicyEnforcer) stopping(_ error) error {
close(e.stopCh)
e.wg.Wait()
return nil
}

func (e *retentionPolicyEnforcer) localBlocks(dir string) ([]*tenantBlock, error) {
blocks := make([]*tenantBlock, 0, 32)
tenants, err := fs.ReadDir(e.fileSystem, dir)
if err != nil {
return nil, err
}
var blockDirs []fs.DirEntry
for _, tenantDir := range tenants {
if !tenantDir.IsDir() {
continue
}
tenantID := tenantDir.Name()
tenantDirPath := filepath.Join(dir, tenantID, phlareDBLocalPath)
if blockDirs, err = fs.ReadDir(e.fileSystem, tenantDirPath); err != nil {
if os.IsNotExist(err) {
// Must be created by external means, skipping.
continue
}
return nil, err
}
for _, blockDir := range blockDirs {
if !blockDir.IsDir() {
continue
}
blockPath := filepath.Join(tenantDirPath, blockDir.Name())
if blockID, ok := block.IsBlockDir(blockPath); ok {
blocks = append(blocks, &tenantBlock{
ulid: blockID,
path: blockPath,
tenantID: tenantID,
})
}
// A malformed/invalid ULID likely means that the
// directory is not a valid block, ignoring.
}
}

// Sort the blocks by their id, which will be the time they've been created.
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].ulid.Compare(blocks[j].ulid) < 0
})

return blocks, nil
}

func (e *retentionPolicyEnforcer) cleanupBlocksWhenHighDiskUtilization(ctx context.Context) error {
var volumeStatsPrev *diskutil.VolumeStats
volumeStatsCurrent, err := e.volumeChecker.HasHighDiskUtilization(e.dbConfig.DataPath)
if err != nil {
return err
}
// Not in high disk utilization, nothing to do.
if !volumeStatsCurrent.HighDiskUtilization {
return nil
}
// Get all block across all the tenants. Any block
// produced or imported during the procedure is ignored.
blocks, err := e.localBlocks(e.dbConfig.DataPath)
if err != nil {
return err
}

for volumeStatsCurrent.HighDiskUtilization && len(blocks) > 0 && ctx.Err() == nil {
// When disk utilization is not lower since the last loop, we end the
// cleanup there to avoid deleting all blocks when disk usage reporting
// is delayed.
if volumeStatsPrev != nil && volumeStatsPrev.BytesAvailable >= volumeStatsCurrent.BytesAvailable {
level.Warn(e.logger).Log("msg", "disk utilization is not lowered by deletion of a block, pausing until next cycle")
break
}
// Delete the oldest block.
var b *tenantBlock
b, blocks = blocks[0], blocks[1:]
level.Warn(e.logger).Log("msg", "disk utilization is high, deleting the oldest block", "path", b.path)
if err = e.deleteBlock(b); err != nil {
return err
}
volumeStatsPrev = volumeStatsCurrent
if volumeStatsCurrent, err = e.volumeChecker.HasHighDiskUtilization(e.dbConfig.DataPath); err != nil {
return err
}
}

return ctx.Err()
}

func (e *retentionPolicyEnforcer) deleteBlock(b *tenantBlock) error {
return e.blockEvicter.evictBlock(b.tenantID, b.ulid, func() error {
switch err := e.fileSystem.RemoveAll(b.path); {
case err == nil:
case os.IsNotExist(err):
level.Warn(e.logger).Log("msg", "block not found on disk", "path", b.path)
default:
return fmt.Errorf("failed to delete block %q: %w", b.path, err)
}
return nil
})
}

0 comments on commit d32dbce

Please sign in to comment.