Skip to content

Commit

Permalink
fix: add mutex to synchronize read/write of active snapshot
Browse files Browse the repository at this point in the history
Though the discipline of the main goroutine handling the transition
from nil, and the InitiateSnapshot goroutine handling the transition
to nil *might* be safe, it's fragile and undocumented. Better to
put in explicit synchronization and avoid the issue.
  • Loading branch information
JimLarson committed May 3, 2023
1 parent 8edbb88 commit c0dacd2
Showing 1 changed file with 48 additions and 14 deletions.
62 changes: 48 additions & 14 deletions golang/cosmos/x/swingset/keeper/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"regexp"
"sync"

"github.com/Agoric/agoric-sdk/golang/cosmos/vm"
"github.com/Agoric/agoric-sdk/golang/cosmos/x/swingset/types"
Expand Down Expand Up @@ -64,13 +65,20 @@ type SwingStoreExporter interface {
}

type SwingsetSnapshotter struct {
sync.RWMutex // must hold to read/write activeSnapshot
isConfigured func() bool
takeSnapshot func(height int64)
newRestoreContext func(height int64) sdk.Context
logger log.Logger
exporter SwingStoreExporter
blockingSend func(action vm.Jsonable) (string, error)
activeSnapshot *activeSnapshot

// activeSnapshot points to the active snapshot.
// Must hold the snapshotter's RWMutex to read or write it.
// The main goroutine moves it from nil to non-nil,
// and the goroutine spawned by InitiateSnapshot() moves it from
// non-nil back to nil.
activeSnapshot *activeSnapshot
}

type snapshotAction struct {
Expand Down Expand Up @@ -116,8 +124,12 @@ func NewSwingsetSnapshotter(app *baseapp.BaseApp, exporter SwingStoreExporter, s
// The snapshot operation is performed in a goroutine, and synchronized with the
// main thread through the `WaitUntilSnapshotStarted` method.
func (snapshotter *SwingsetSnapshotter) InitiateSnapshot(height int64) error {
if snapshotter.activeSnapshot != nil {
return fmt.Errorf("snapshot already in progress for height %d", snapshotter.activeSnapshot.height)
snapshotter.RLock()
active := snapshotter.activeSnapshot
snapshotter.RUnlock()

if active != nil {
return fmt.Errorf("snapshot already in progress for height %d", active.height)
}

if !snapshotter.isConfigured() {
Expand All @@ -129,17 +141,20 @@ func (snapshotter *SwingsetSnapshotter) InitiateSnapshot(height int64) error {
// Indicate that a snapshot has been initiated by setting `activeSnapshot`.
// This structure is used to synchronize with the goroutine spawned below.
// It's nilled-out before exiting (and is the only code that does so).
ch := make(chan error, 1)
snapshotter.Lock()
snapshotter.activeSnapshot = &activeSnapshot{
height: height,
logger: logger,
startedResult: make(chan error, 1),
startedResult: ch,
retrieved: false,
}
snapshotter.Unlock()

go func(startedResult chan error) {
action := &snapshotAction{
Type: "COSMOS_SNAPSHOT",
BlockHeight: snapshotter.activeSnapshot.height,
BlockHeight: height,
Request: "initiate",
}

Expand All @@ -149,7 +164,9 @@ func (snapshotter *SwingsetSnapshotter) InitiateSnapshot(height int64) error {
if err != nil {
// First indicate a snapshot is no longer in progress if the call to
// `WaitUntilSnapshotStarted` has't happened yet.
snapshotter.Lock()
snapshotter.activeSnapshot = nil
snapshotter.Unlock()
// Then signal the current snapshot operation if a call to
// `WaitUntilSnapshotStarted` was already waiting.
startedResult <- err
Expand All @@ -162,18 +179,22 @@ func (snapshotter *SwingsetSnapshotter) InitiateSnapshot(height int64) error {
// `WaitUntilSnapshotStarted` will no longer block.
close(startedResult)

// In production this should indirectly call SnapshotExtension().
snapshotter.takeSnapshot(height)

// Check whether the cosmos Snapshot() method successfully handled our extension
snapshotter.Lock()
if snapshotter.activeSnapshot.retrieved {
snapshotter.activeSnapshot = nil
snapshotter.Unlock()
return
}
snapshotter.Unlock()

logger.Error("failed to make swingset snapshot")
action = &snapshotAction{
Type: "COSMOS_SNAPSHOT",
BlockHeight: snapshotter.activeSnapshot.height,
BlockHeight: height,
Request: "discard",
}
_, err = snapshotter.blockingSend(action)
Expand All @@ -182,8 +203,10 @@ func (snapshotter *SwingsetSnapshotter) InitiateSnapshot(height int64) error {
logger.Error("failed to discard swingset snapshot", "err", err)
}

snapshotter.Lock()
snapshotter.activeSnapshot = nil
}(snapshotter.activeSnapshot.startedResult)
snapshotter.Unlock()
}(ch)

return nil
}
Expand All @@ -199,7 +222,9 @@ func (snapshotter *SwingsetSnapshotter) InitiateSnapshot(height int64) error {
func (snapshotter *SwingsetSnapshotter) WaitUntilSnapshotStarted() error {
// First, copy the synchronization structure in case the snapshot operation
// completes while we check its status
snapshotter.RLock()
activeSnapshot := snapshotter.activeSnapshot
snapshotter.RUnlock()

if activeSnapshot == nil {
return nil
Expand Down Expand Up @@ -243,27 +268,33 @@ func (snapshotter *SwingsetSnapshotter) SnapshotExtension(height uint64, payload
// See https://go.dev/blog/defer-panic-and-recover for details
if err != nil {
var logger log.Logger
snapshotter.RLock()
if snapshotter.activeSnapshot != nil {
logger = snapshotter.activeSnapshot.logger
} else {
logger = snapshotter.logger
}
snapshotter.RUnlock()

logger.Error("swingset snapshot extension failed", "err", err)
}
}()

if snapshotter.activeSnapshot == nil {
snapshotter.RLock()
activeSnapshot := snapshotter.activeSnapshot
snapshotter.RUnlock()

if activeSnapshot == nil {
return errors.New("no active swingset snapshot")
}

if snapshotter.activeSnapshot.height != int64(height) {
return fmt.Errorf("swingset snapshot requested for unexpected height %d (expected %d)", height, snapshotter.activeSnapshot.height)
if activeSnapshot.height != int64(height) {
return fmt.Errorf("swingset snapshot requested for unexpected height %d (expected %d)", height, activeSnapshot.height)
}

action := &snapshotAction{
Type: "COSMOS_SNAPSHOT",
BlockHeight: snapshotter.activeSnapshot.height,
BlockHeight: activeSnapshot.height,
Request: "retrieve",
}
out, err := snapshotter.blockingSend(action)
Expand Down Expand Up @@ -335,9 +366,12 @@ func (snapshotter *SwingsetSnapshotter) SnapshotExtension(height uint64, payload
}
}

snapshotter.activeSnapshot.retrieved = true

snapshotter.activeSnapshot.logger.Info("retrieved snapshot", "exportDir", exportDir)
snapshotter.Lock()
if snapshotter.activeSnapshot != nil {
snapshotter.activeSnapshot.retrieved = true
snapshotter.activeSnapshot.logger.Info("retrieved snapshot", "exportDir", exportDir)
}
snapshotter.Unlock()

return nil
}
Expand Down

0 comments on commit c0dacd2

Please sign in to comment.