Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datamanager: clean old data files #160

Merged
merged 1 commit into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
196 changes: 181 additions & 15 deletions internal/datamanager/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ package datamanager

import (
"bytes"
"container/ring"
"context"
"encoding/json"
"fmt"
"io"
"path"
"regexp"
"sort"
"strings"

Expand All @@ -32,6 +35,12 @@ import (

const (
DefaultMaxDataFileSize = 10 * 1024 * 1024
dataStatusToKeep = 3
)

var (
DataFileRegexp = regexp.MustCompile(`^([a-zA-Z0-9]+-[a-zA-Z0-9]+)-([a-zA-Z0-9-]+)\.(data|index)$`)
DataStatusFileRegexp = regexp.MustCompile(`^([a-zA-Z0-9]+-[a-zA-Z0-9]+)\.status$`)
)

type DataStatus struct {
Expand Down Expand Up @@ -527,33 +536,49 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) {
return bytes.NewReader(de.Data), nil
}

func (d *DataManager) GetLastDataStatusPath() (string, error) {
func (d *DataManager) GetLastDataStatusSequences(n int) ([]*sequence.Sequence, error) {
if n < 1 {
return nil, errors.Errorf("n must be greater than 0")
}
r := ring.New(n)
re := r

doneCh := make(chan struct{})
defer close(doneCh)

var dataStatusPath string
for object := range d.ost.List(d.storageDataDir()+"/", "", false, doneCh) {
if object.Err != nil {
return "", object.Err
return nil, object.Err
}
if strings.HasSuffix(object.Path, ".status") {
dataStatusPath = object.Path
if m := DataStatusFileRegexp.FindStringSubmatch(path.Base(object.Path)); m != nil {
seq, err := sequence.Parse(m[1])
if err != nil {
d.log.Warnf("cannot parse sequence for data status file %q", object.Path)
continue
}
re.Value = seq
re = re.Next()
} else {
d.log.Warnf("bad file %q found in storage data dir", object.Path)
}
}
if dataStatusPath == "" {
return "", ostypes.ErrNotExist
}

return dataStatusPath, nil
}
dataStatusSequences := []*sequence.Sequence{}
re.Do(func(x interface{}) {
if x != nil {
dataStatusSequences = append([]*sequence.Sequence{x.(*sequence.Sequence)}, dataStatusSequences...)
}
})

func (d *DataManager) GetLastDataStatus() (*DataStatus, error) {
dataStatusPath, err := d.GetLastDataStatusPath()
if err != nil {
return nil, err
if len(dataStatusSequences) == 0 {
return nil, ostypes.ErrNotExist
}

dataStatusf, err := d.ost.ReadObject(dataStatusPath)
return dataStatusSequences, nil
}

func (d *DataManager) GetDataStatus(dataSequence *sequence.Sequence) (*DataStatus, error) {
dataStatusf, err := d.ost.ReadObject(d.dataStatusPath(dataSequence))
if err != nil {
return nil, err
}
Expand All @@ -564,6 +589,24 @@ func (d *DataManager) GetLastDataStatus() (*DataStatus, error) {
return dataStatus, dec.Decode(&dataStatus)
}

func (d *DataManager) GetLastDataStatusSequence() (*sequence.Sequence, error) {
dataStatusSequences, err := d.GetLastDataStatusSequences(1)
if err != nil {
return nil, err
}

return dataStatusSequences[0], nil
}

func (d *DataManager) GetLastDataStatus() (*DataStatus, error) {
dataStatusSequence, err := d.GetLastDataStatusSequence()
if err != nil {
return nil, err
}

return d.GetDataStatus(dataStatusSequence)
}

func (d *DataManager) Export(ctx context.Context, w io.Writer) error {
if err := d.checkpoint(ctx, true); err != nil {
return err
Expand Down Expand Up @@ -725,3 +768,126 @@ func (d *DataManager) Import(ctx context.Context, r io.Reader) error {

return nil
}

func (d *DataManager) CleanOldCheckpoints(ctx context.Context) error {
dataStatusSequences, err := d.GetLastDataStatusSequences(dataStatusToKeep)
if err != nil {
return err
}

return d.cleanOldCheckpoints(ctx, dataStatusSequences)
}

func (d *DataManager) cleanOldCheckpoints(ctx context.Context, dataStatusSequences []*sequence.Sequence) error {
if len(dataStatusSequences) == 0 {
return nil
}

lastDataStatusSequence := dataStatusSequences[0]

// Remove old data status paths
if len(dataStatusSequences) >= dataStatusToKeep {
dataStatusPathsMap := map[string]struct{}{}
for _, seq := range dataStatusSequences {
dataStatusPathsMap[d.dataStatusPath(seq)] = struct{}{}
}

doneCh := make(chan struct{})
defer close(doneCh)
for object := range d.ost.List(d.storageDataDir()+"/", "", false, doneCh) {
if object.Err != nil {
return object.Err
}

skip := false
if m := DataStatusFileRegexp.FindStringSubmatch(path.Base(object.Path)); m != nil {
seq, err := sequence.Parse(m[1])
if err == nil && seq.String() > lastDataStatusSequence.String() {
d.log.Infof("skipping file %q since its sequence is greater than %q", object.Path, lastDataStatusSequence)
skip = true
}
}
if skip {
continue
}

if _, ok := dataStatusPathsMap[object.Path]; !ok {
d.log.Infof("removing %q", object.Path)
if err := d.ost.DeleteObject(object.Path); err != nil {
if err != ostypes.ErrNotExist {
return err
}
}
}
}
}

// A list of files to keep
files := map[string]struct{}{}

for _, dataStatusSequence := range dataStatusSequences {
dataStatus, err := d.GetDataStatus(dataStatusSequence)
if err != nil {
return err
}

for dataType := range dataStatus.Files {
for _, file := range dataStatus.Files[dataType] {
files[d.DataFileBasePath(dataType, file.ID)] = struct{}{}
}
}
}

doneCh := make(chan struct{})
defer close(doneCh)

for object := range d.ost.List(d.storageDataDir()+"/", "", true, doneCh) {
if object.Err != nil {
return object.Err
}

p := object.Path
// object file relative to the storageDataDir
pr := strings.TrimPrefix(p, d.storageDataDir()+"/")
// object file full path without final extension
pne := strings.TrimSuffix(p, path.Ext(p))
// object file base name
pb := path.Base(p)

// skip status files
if !strings.Contains(pr, "/") && strings.HasSuffix(pr, ".status") {
continue
}

// skip data files with a sequence greater than the last known sequence.
// this is to avoid possible conditions where there's a Clean concurrent
// with a running Checkpoint (also if protect by etcd locks, they cannot
// enforce these kind of operations that are acting on resources
// external to etcd during network errors) that will remove the objects
// created by this checkpoint since the data status file doesn't yet
// exist.
skip := false
// extract the data sequence from the object name
if m := DataFileRegexp.FindStringSubmatch(pb); m != nil {
seq, err := sequence.Parse(m[1])
if err == nil && seq.String() > lastDataStatusSequence.String() {
d.log.Infof("skipping file %q since its sequence is greater than %q", p, lastDataStatusSequence)
skip = true
}
}
if skip {
continue
}

if _, ok := files[pne]; !ok {
d.log.Infof("removing %q", object.Path)
if err := d.ost.DeleteObject(object.Path); err != nil {
if err != ostypes.ErrNotExist {
return err
}
}
}
}

return nil
}
70 changes: 39 additions & 31 deletions internal/datamanager/datamanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import (
// * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one

const (
DefaultCheckpointInterval = 10 * time.Second
DefaultEtcdWalsKeepNum = 100
DefaultMinCheckpointWalsNum = 100
DefaultCheckpointInterval = 10 * time.Second
DefaultCheckpointCleanInterval = 5 * time.Minute
DefaultEtcdWalsKeepNum = 100
DefaultMinCheckpointWalsNum = 100
)

var (
Expand Down Expand Up @@ -79,30 +80,32 @@ const (
)

type DataManagerConfig struct {
BasePath string
E *etcd.Store
OST *objectstorage.ObjStorage
DataTypes []string
EtcdWalsKeepNum int
CheckpointInterval time.Duration
BasePath string
E *etcd.Store
OST *objectstorage.ObjStorage
DataTypes []string
EtcdWalsKeepNum int
CheckpointInterval time.Duration
CheckpointCleanInterval time.Duration
// MinCheckpointWalsNum is the minimum number of wals required before doing a checkpoint
MinCheckpointWalsNum int
MaxDataFileSize int64
MaintenanceMode bool
}

type DataManager struct {
basePath string
log *zap.SugaredLogger
e *etcd.Store
ost *objectstorage.ObjStorage
changes *WalChanges
dataTypes []string
etcdWalsKeepNum int
checkpointInterval time.Duration
minCheckpointWalsNum int
maxDataFileSize int64
maintenanceMode bool
basePath string
log *zap.SugaredLogger
e *etcd.Store
ost *objectstorage.ObjStorage
changes *WalChanges
dataTypes []string
etcdWalsKeepNum int
checkpointInterval time.Duration
checkpointCleanInterval time.Duration
minCheckpointWalsNum int
maxDataFileSize int64
maintenanceMode bool
}

func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerConfig) (*DataManager, error) {
Expand All @@ -115,6 +118,9 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo
if conf.CheckpointInterval == 0 {
conf.CheckpointInterval = DefaultCheckpointInterval
}
if conf.CheckpointCleanInterval == 0 {
conf.CheckpointCleanInterval = DefaultCheckpointCleanInterval
}
if conf.MinCheckpointWalsNum == 0 {
conf.MinCheckpointWalsNum = DefaultMinCheckpointWalsNum
}
Expand All @@ -126,17 +132,18 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo
}

d := &DataManager{
basePath: conf.BasePath,
log: logger.Sugar(),
e: conf.E,
ost: conf.OST,
changes: NewWalChanges(conf.DataTypes),
dataTypes: conf.DataTypes,
etcdWalsKeepNum: conf.EtcdWalsKeepNum,
checkpointInterval: conf.CheckpointInterval,
minCheckpointWalsNum: conf.MinCheckpointWalsNum,
maxDataFileSize: conf.MaxDataFileSize,
maintenanceMode: conf.MaintenanceMode,
basePath: conf.BasePath,
log: logger.Sugar(),
e: conf.E,
ost: conf.OST,
changes: NewWalChanges(conf.DataTypes),
dataTypes: conf.DataTypes,
etcdWalsKeepNum: conf.EtcdWalsKeepNum,
checkpointInterval: conf.CheckpointInterval,
checkpointCleanInterval: conf.CheckpointCleanInterval,
minCheckpointWalsNum: conf.MinCheckpointWalsNum,
maxDataFileSize: conf.MaxDataFileSize,
maintenanceMode: conf.MaintenanceMode,
}

// add trailing slash the basepath
Expand Down Expand Up @@ -231,6 +238,7 @@ func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error {
go d.watcherLoop(ctx)
go d.syncLoop(ctx)
go d.checkpointLoop(ctx)
go d.checkpointCleanLoop(ctx)
go d.walCleanerLoop(ctx)
go d.compactChangeGroupsLoop(ctx)
go d.etcdPingerLoop(ctx)
Expand Down