Skip to content

Commit

Permalink
Update metadata snapshotter to lease on exists
Browse files Browse the repository at this point in the history
Currently the metadata snapshotter is not consistently adding keys to a
lease when already exists is returned. When a lease is provided, any
already exists errors should add the relevant key to the lease. It is
not expected that clients must explicitly lease a key after calling
Prepare/Commit.

Signed-off-by: Derek McGowan <derek@mcg.dev>
  • Loading branch information
dmcgowan committed May 7, 2024
1 parent 114ef75 commit c7fb8a9
Showing 1 changed file with 34 additions and 16 deletions.
50 changes: 34 additions & 16 deletions core/metadata/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
bopts = []snapshots.Opt{
snapshots.WithLabels(snapshots.FilterInheritedLabels(base.Labels)),
}
rerr error
)

if err := update(ctx, s.db, func(tx *bolt.Tx) error {
Expand All @@ -334,12 +335,20 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
// Check if target exists, if so, return already exists
if target != "" {
if tbkt := bkt.Bucket([]byte(target)); tbkt != nil {
return fmt.Errorf("target snapshot %q: %w", target, errdefs.ErrAlreadyExists)
rerr = fmt.Errorf("target snapshot %q: %w", target, errdefs.ErrAlreadyExists)
if err := addSnapshotLease(ctx, tx, s.name, target); err != nil {
return err
}
return nil
}
}

if bbkt := bkt.Bucket([]byte(key)); bbkt != nil {
return fmt.Errorf("snapshot %q: %w", key, errdefs.ErrAlreadyExists)
rerr = fmt.Errorf("snapshot %q: %w", key, errdefs.ErrAlreadyExists)
if err := addSnapshotLease(ctx, tx, s.name, key); err != nil {
return err
}
return nil
}

if parent != "" {
Expand All @@ -360,11 +369,14 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
}); err != nil {
return nil, err
}
// Already exists and lease successfully added in transaction
if rerr != nil {
return nil, rerr
}

var (
m []mount.Mount
created string
rerr error
)
if readonly {
m, err = s.Snapshotter.View(ctx, bkey, bparent, bopts...)
Expand Down Expand Up @@ -527,24 +539,28 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
return err
}

var bname string
var (
bname string
rerr error
)
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getSnapshotterBucket(tx, ns, s.name)
if bkt == nil {
return fmt.Errorf("can not find snapshotter %q: %w",
s.name, errdefs.ErrNotFound)
}

if err := addSnapshotLease(ctx, tx, s.name, name); err != nil {
return err
}
bbkt, err := bkt.CreateBucket([]byte(name))
if err != nil {
if err == bolt.ErrBucketExists {
err = fmt.Errorf("snapshot %q: %w", name, errdefs.ErrAlreadyExists)
rerr = fmt.Errorf("snapshot %q: %w", name, errdefs.ErrAlreadyExists)
return nil
}
return err
}
if err := addSnapshotLease(ctx, tx, s.name, name); err != nil {
return err
}

obkt := bkt.Bucket([]byte(key))
if obkt == nil {
Expand Down Expand Up @@ -634,17 +650,19 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
return err
}

if publisher := s.db.Publisher(ctx); publisher != nil {
if err := publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{
Key: key,
Name: name,
Snapshotter: s.name,
}); err != nil {
return err
if rerr == nil {
if publisher := s.db.Publisher(ctx); publisher != nil {
if err := publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{
Key: key,
Name: name,
Snapshotter: s.name,
}); err != nil {
return err
}
}
}

return nil
return rerr

}

Expand Down

0 comments on commit c7fb8a9

Please sign in to comment.