Skip to content

Commit

Permalink
Improved restore/sync logging
Browse files Browse the repository at this point in the history
  • Loading branch information
hifi authored and Athos Couto committed May 30, 2023
1 parent 18760d2 commit b4cfb93
Showing 1 changed file with 26 additions and 11 deletions.
37 changes: 26 additions & 11 deletions replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
// Obtain initial position from shadow reader.
// It may have moved to the next index if previous position was at the end.
pos := rd.Pos()
initialPos := pos
startTime := time.Now()
var bytesWritten int

log.Printf("%s(%s): write wal segment %s/%08x:%08x", r.db.Path(), r.Name(), initialPos.Generation, initialPos.Index, initialPos.Offset)

// Copy through pipe into client from the starting position.
var g errgroup.Group
Expand Down Expand Up @@ -263,6 +268,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
return err
}
walBytesCounter.Add(float64(n))
bytesWritten += n
}

// Copy frames.
Expand All @@ -289,6 +295,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
return err
}
walBytesCounter.Add(float64(n))
bytesWritten += n
}

// Flush LZ4 writer, encryption writer and close pipe.
Expand All @@ -314,6 +321,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) {
replicaWALIndexGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Index))
replicaWALOffsetGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Offset))

log.Printf("%s(%s): wal segment written %s/%08x:%08x elapsed=%s sz=%d", r.db.Path(), r.Name(), initialPos.Generation, initialPos.Index, initialPos.Offset, time.Since(startTime).String(), bytesWritten)
return nil
}

Expand Down Expand Up @@ -531,14 +539,17 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) {
return wc.Close()
})

log.Printf("%s(%s): write snapshot %s/%08x", r.db.Path(), r.Name(), pos.Generation, pos.Index)
startTime := time.Now()

// Delegate write to client & wait for writer goroutine to finish.
if info, err = r.Client.WriteSnapshot(ctx, pos.Generation, pos.Index, pr); err != nil {
return info, err
} else if err := g.Wait(); err != nil {
return info, err
}

r.Logger.Printf("%s(%s): snapshot written %s/%08x", r.db.Path(), r.Name(), pos.Generation, pos.Index)
r.Logger.Printf("%s(%s): snapshot written %s/%08x elapsed=%s sz=%d", r.db.Path(), r.Name(), pos.Generation, pos.Index, time.Since(startTime))

return info, nil
}
Expand Down Expand Up @@ -1067,9 +1078,12 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {

// Copy snapshot to output path.
logger.Printf("%s: restoring snapshot %s/%08x to %s", logPrefix, opt.Generation, minWALIndex, tmpPath)
if err := r.restoreSnapshot(ctx, pos.Generation, pos.Index, tmpPath); err != nil {
startTime := time.Now()
bytes, err := r.restoreSnapshot(ctx, pos.Generation, pos.Index, tmpPath)
if err != nil {
return fmt.Errorf("cannot restore snapshot: %w", err)
}
logger.Printf("%s: restored snapshot %s/%08x elapsed=%s sz=%d (uncompressed)", logPrefix, opt.Generation, minWALIndex, time.Since(startTime).String(), bytes)

// If no WAL files available, move snapshot to final path & exit early.
if snapshotOnly {
Expand Down Expand Up @@ -1284,44 +1298,45 @@ func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex
}

// restoreSnapshot copies a snapshot from the replica to a file.
func (r *Replica) restoreSnapshot(ctx context.Context, generation string, index int, filename string) error {
func (r *Replica) restoreSnapshot(ctx context.Context, generation string, index int, filename string) (int64, error) {
// Determine the user/group & mode based on the DB, if available.
var fileInfo, dirInfo os.FileInfo
if db := r.DB(); db != nil {
fileInfo, dirInfo = db.fileInfo, db.dirInfo
}

if err := internal.MkdirAll(filepath.Dir(filename), dirInfo); err != nil {
return err
return 0, err
}

f, err := internal.CreateFile(filename, fileInfo)
if err != nil {
return err
return 0, err
}
defer f.Close()

rd, err := r.Client.SnapshotReader(ctx, generation, index)
if err != nil {
return err
return 0, err
}
defer rd.Close()

if len(r.AgeIdentities) > 0 {
drd, err := age.Decrypt(rd, r.AgeIdentities...)
if err != nil {
return err
return 0, err
}

rd = io.NopCloser(drd)
}

if _, err := io.Copy(f, lz4.NewReader(rd)); err != nil {
return err
if bytes, err := io.Copy(f, lz4.NewReader(rd)); err != nil {
return 0, err
} else if err := f.Sync(); err != nil {
return err
return 0, err
} else {
return bytes, f.Close()
}
return f.Close()
}

// downloadWAL copies a WAL file from the replica to a local copy next to the DB.
Expand Down

0 comments on commit b4cfb93

Please sign in to comment.