Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupds: Compact log on restart #5875

Merged
merged 2 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cmd/lotus-shed/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ var datastoreBackupStatCmd = &cli.Command{
}
defer f.Close() // nolint:errcheck

var keys, kbytes, vbytes uint64
err = backupds.ReadBackup(f, func(key datastore.Key, value []byte) error {
var keys, logs, kbytes, vbytes uint64
err = backupds.ReadBackup(f, func(key datastore.Key, value []byte, log bool) error {
if log {
logs++
}
keys++
kbytes += uint64(len(key.String()))
vbytes += uint64(len(value))
Expand All @@ -192,6 +195,7 @@ var datastoreBackupStatCmd = &cli.Command{
}

fmt.Println("Keys: ", keys)
fmt.Println("Log values: ", log)
fmt.Println("Key bytes: ", units.BytesSize(float64(kbytes)))
fmt.Println("Value bytes: ", units.BytesSize(float64(vbytes)))

Expand Down Expand Up @@ -225,7 +229,7 @@ var datastoreBackupListCmd = &cli.Command{
defer f.Close() // nolint:errcheck

printKv := kvPrinter(cctx.Bool("top-level"), cctx.String("get-enc"))
err = backupds.ReadBackup(f, func(key datastore.Key, value []byte) error {
err = backupds.ReadBackup(f, func(key datastore.Key, value []byte, _ bool) error {
return printKv(key.String(), value)
})
if err != nil {
Expand Down
59 changes: 43 additions & 16 deletions lib/backupds/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (d *Datastore) startLog(logdir string) error {
return xerrors.Errorf("creating log: %w", err)
}
} else {
l, err = d.openLog(filepath.Join(logdir, latest))
l, latest, err = d.openLog(filepath.Join(logdir, latest))
if err != nil {
return xerrors.Errorf("opening log: %w", err)
}
Expand Down Expand Up @@ -97,6 +97,8 @@ type logfile struct {
file *os.File
}

var compactThresh = 2

func (d *Datastore) createLog(logdir string) (*logfile, string, error) {
p := filepath.Join(logdir, strconv.FormatInt(time.Now().Unix(), 10)+".log.cbor")
log.Infow("creating log", "file", p)
Expand All @@ -119,65 +121,90 @@ func (d *Datastore) createLog(logdir string) (*logfile, string, error) {
}, filepath.Base(p), nil
}

func (d *Datastore) openLog(p string) (*logfile, error) {
func (d *Datastore) openLog(p string) (*logfile, string, error) {
log.Infow("opening log", "file", p)
lh, err := d.child.Get(loghead)
if err != nil {
return nil, xerrors.Errorf("checking log head (logfile '%s'): %w", p, err)
return nil, "", xerrors.Errorf("checking log head (logfile '%s'): %w", p, err)
}

lhp := strings.Split(string(lh), ";")
if len(lhp) != 3 {
return nil, xerrors.Errorf("expected loghead to have 3 parts")
return nil, "", xerrors.Errorf("expected loghead to have 3 parts")
}

if lhp[0] != filepath.Base(p) {
return nil, xerrors.Errorf("loghead log file doesn't match, opening %s, expected %s", p, lhp[0])
return nil, "", xerrors.Errorf("loghead log file doesn't match, opening %s, expected %s", p, lhp[0])
}

f, err := os.OpenFile(p, os.O_RDWR, 0644)
if err != nil {
return nil, err
return nil, "", err
}

var lastLogHead string
var openCount, logvals int64
var openCount, vals, logvals int64
// check file integrity
err = ReadBackup(f, func(k datastore.Key, v []byte) error {
logvals++
err = ReadBackup(f, func(k datastore.Key, v []byte, log bool) error {
if log {
logvals++
} else {
vals++
}
if k == loghead {
lastLogHead = string(v)
openCount++
}
return nil
})
if err != nil {
return nil, xerrors.Errorf("reading backup part of the logfile: %w", err)
return nil, "", xerrors.Errorf("reading backup part of the logfile: %w", err)
}
if string(lh) != lastLogHead {
return nil, xerrors.Errorf("loghead didn't match, expected '%s', last in logfile '%s'", string(lh), lastLogHead)
return nil, "", xerrors.Errorf("loghead didn't match, expected '%s', last in logfile '%s'", string(lh), lastLogHead)
}

// make sure we're at the end of the file
at, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return nil, xerrors.Errorf("get current logfile offset: %w", err)
return nil, "", xerrors.Errorf("get current logfile offset: %w", err)
}
end, err := f.Seek(0, io.SeekEnd)
if err != nil {
return nil, xerrors.Errorf("get current logfile offset: %w", err)
return nil, "", xerrors.Errorf("get current logfile offset: %w", err)
}
if at != end {
return nil, xerrors.Errorf("logfile %s validated %d bytes, but the file has %d bytes (%d more)", p, at, end, end-at)
return nil, "", xerrors.Errorf("logfile %s validated %d bytes, but the file has %d bytes (%d more)", p, at, end, end-at)
}

log.Infow("log opened", "file", p, "openCount", openCount, "logValues", logvals)
compact := logvals > vals*int64(compactThresh)
if compact {
log.Infow("compacting log", "current", p, "openCount", openCount, "baseValues", vals, "logValues", logvals)
if err := f.Close(); err != nil {
return nil, "", xerrors.Errorf("closing current log: %w", err)
}

l, latest, err := d.createLog(filepath.Dir(p))
if err != nil {
return nil, "", xerrors.Errorf("creating compacted log: %w", err)
}

log.Infow("compacted log created, cleaning up old", "old", p, "new", latest)
if err := os.Remove(p); err != nil {
l.Close() // nolint
return nil, "", xerrors.Errorf("cleaning up old logfile: %w", err)
}

return l, latest, nil
}

log.Infow("log opened", "file", p, "openCount", openCount, "baseValues", vals, "logValues", logvals)

// todo: maybe write a magic 'opened at' entry; pad the log to filesystem page to prevent more exotic types of corruption

return &logfile{
file: f,
}, nil
}, filepath.Base(p), nil
}

func (l *logfile) writeLogHead(logname string, ds datastore.Batching) error {
Expand Down
8 changes: 4 additions & 4 deletions lib/backupds/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"golang.org/x/xerrors"
)

func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) error {
func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte, log bool) error) error {
scratch := make([]byte, 9)

// read array[2](
Expand Down Expand Up @@ -61,7 +61,7 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err
return xerrors.Errorf("reading value: %w", err)
}

if err := cb(key, value); err != nil {
if err := cb(key, value, false); err != nil {
return err
}
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) err

key := datastore.NewKey(string(ent.Key))

if err := cb(key, ent.Value); err != nil {
if err := cb(key, ent.Value, true); err != nil {
return err
}
}
Expand All @@ -122,7 +122,7 @@ func RestoreInto(r io.Reader, dest datastore.Batching) error {
return xerrors.Errorf("creating batch: %w", err)
}

err = ReadBackup(r, func(key datastore.Key, value []byte) error {
err = ReadBackup(r, func(key datastore.Key, value []byte, _ bool) error {
if err := batch.Put(key, value); err != nil {
return xerrors.Errorf("put key: %w", err)
}
Expand Down