Skip to content

Commit

Permalink
Improved restore/sync logging
Browse files Browse the repository at this point in the history
  • Loading branch information
hifi committed Apr 26, 2023
1 parent e3cfc35 commit 2d3c3c4
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
// Obtain initial position from shadow reader.
// It may have moved to the next index if previous position was at the end.
pos := rd.Pos()
initialPos := pos
startTime := time.Now()
var bytesWritten int

log.Printf("%s(%s): write wal segment %s/%08x:%08x", r.db.Path(), r.Name(), initialPos.Generation, initialPos.Index, initialPos.Offset)

// Copy through pipe into client from the starting position.
var g errgroup.Group
Expand Down Expand Up @@ -242,6 +247,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
return err
}
walBytesCounter.Add(float64(n))
bytesWritten += n
}

// Copy frames.
Expand All @@ -268,6 +274,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
return err
}
walBytesCounter.Add(float64(n))
bytesWritten += n
}

// Flush LZ4 writer and close pipe.
Expand All @@ -291,6 +298,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
replicaWALIndexGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Index))
replicaWALOffsetGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Offset))

log.Printf("%s(%s): wal segment written %s/%08x:%08x elapsed=%s sz=%d", r.db.Path(), r.Name(), initialPos.Generation, initialPos.Index, initialPos.Offset, time.Since(startTime).String(), bytesWritten)
return nil
}

Expand Down Expand Up @@ -483,14 +491,17 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
return pw.Close()
})

log.Printf("%s(%s): write snapshot %s/%08x", r.db.Path(), r.Name(), pos.Generation, pos.Index)
startTime := time.Now()

// Delegate write to client & wait for writer goroutine to finish.
if info, err = r.Client.WriteSnapshot(ctx, pos.Generation, pos.Index, pr); err != nil {
return info, err
} else if err := g.Wait(); err != nil {
return info, err
}

log.Printf("%s(%s): snapshot written %s/%08x", r.db.Path(), r.Name(), pos.Generation, pos.Index)
log.Printf("%s(%s): snapshot written %s/%08x elapsed=%s sz=%d", r.db.Path(), r.Name(), pos.Generation, pos.Index, time.Since(startTime).String(), info.Size)

return info, nil
}
Expand Down Expand Up @@ -1019,9 +1030,12 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {

// Copy snapshot to output path.
logger.Printf("%s: restoring snapshot %s/%08x to %s", logPrefix, opt.Generation, minWALIndex, tmpPath)
if err := r.restoreSnapshot(ctx, pos.Generation, pos.Index, tmpPath); err != nil {
startTime := time.Now()
bytes, err := r.restoreSnapshot(ctx, pos.Generation, pos.Index, tmpPath)
if err != nil {
return fmt.Errorf("cannot restore snapshot: %w", err)
}
logger.Printf("%s: restored snapshot %s/%08x elapsed=%s sz=%d (uncompressed)", logPrefix, opt.Generation, minWALIndex, time.Since(startTime).String(), bytes)

// If no WAL files available, move snapshot to final path & exit early.
if snapshotOnly {
Expand Down Expand Up @@ -1236,35 +1250,36 @@ func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex
}

// restoreSnapshot copies a snapshot from the replica to a file.
func (r *Replica) restoreSnapshot(ctx context.Context, generation string, index int, filename string) error {
func (r *Replica) restoreSnapshot(ctx context.Context, generation string, index int, filename string) (int64, error) {
// Determine the user/group & mode based on the DB, if available.
var fileInfo, dirInfo os.FileInfo
if db := r.DB(); db != nil {
fileInfo, dirInfo = db.fileInfo, db.dirInfo
}

if err := internal.MkdirAll(filepath.Dir(filename), dirInfo); err != nil {
return err
return 0, err
}

f, err := internal.CreateFile(filename, fileInfo)
if err != nil {
return err
return 0, err
}
defer f.Close()

rd, err := r.Client.SnapshotReader(ctx, generation, index)
if err != nil {
return err
return 0, err
}
defer rd.Close()

if _, err := io.Copy(f, lz4.NewReader(rd)); err != nil {
return err
if bytes, err := io.Copy(f, lz4.NewReader(rd)); err != nil {
return 0, err
} else if err := f.Sync(); err != nil {
return err
return 0, err
} else {
return bytes, f.Close()
}
return f.Close()
}

// downloadWAL copies a WAL file from the replica to a local copy next to the DB.
Expand Down

0 comments on commit 2d3c3c4

Please sign in to comment.