/
checkpoint.go
137 lines (124 loc) · 4.64 KB
/
checkpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package kv
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
var errMissingStateForCheckpoint = errors.New("missing state summary for checkpoint root")
// JustifiedCheckpoint returns the latest justified checkpoint in beacon chain.
func (s *Store) JustifiedCheckpoint(ctx context.Context) (*ethpb.Checkpoint, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.JustifiedCheckpoint")
defer span.End()
var checkpoint *ethpb.Checkpoint
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(checkpointBucket)
enc := bkt.Get(justifiedCheckpointKey)
if enc == nil {
checkpoint = ðpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
return nil
}
checkpoint = ðpb.Checkpoint{}
return decode(ctx, enc, checkpoint)
})
return checkpoint, err
}
// FinalizedCheckpoint returns the latest finalized checkpoint in beacon chain.
func (s *Store) FinalizedCheckpoint(ctx context.Context) (*ethpb.Checkpoint, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.FinalizedCheckpoint")
defer span.End()
var checkpoint *ethpb.Checkpoint
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(checkpointBucket)
enc := bkt.Get(finalizedCheckpointKey)
if enc == nil {
checkpoint = ðpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
return nil
}
checkpoint = ðpb.Checkpoint{}
return decode(ctx, enc, checkpoint)
})
return checkpoint, err
}
// SaveJustifiedCheckpoint saves justified checkpoint in beacon chain.
func (s *Store) SaveJustifiedCheckpoint(ctx context.Context, checkpoint *ethpb.Checkpoint) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveJustifiedCheckpoint")
defer span.End()
return s.saveCheckpoint(ctx, justifiedCheckpointKey, checkpoint)
}
// SaveFinalizedCheckpoint saves finalized checkpoint in beacon chain.
func (s *Store) SaveFinalizedCheckpoint(ctx context.Context, checkpoint *ethpb.Checkpoint) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveFinalizedCheckpoint")
defer span.End()
enc, err := encode(ctx, checkpoint)
if err != nil {
tracing.AnnotateError(span, err)
return err
}
hasStateSummary := s.HasStateSummary(ctx, bytesutil.ToBytes32(checkpoint.Root))
err = s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(checkpointBucket)
hasStateInDB := tx.Bucket(stateBucket).Get(checkpoint.Root) != nil
if !(hasStateInDB || hasStateSummary) {
log.Warnf("Recovering state summary for finalized root: %#x", bytesutil.Trunc(checkpoint.Root))
if err := recoverStateSummary(ctx, tx, checkpoint.Root); err != nil {
return errors.Wrapf(errMissingStateForCheckpoint, "could not save finalized checkpoint, finalized root: %#x", bytesutil.Trunc(checkpoint.Root))
}
}
if err := bucket.Put(finalizedCheckpointKey, enc); err != nil {
return err
}
return s.updateFinalizedBlockRoots(ctx, tx, checkpoint)
})
tracing.AnnotateError(span, err)
return err
}
func (s *Store) saveCheckpoint(ctx context.Context, key []byte, checkpoint *ethpb.Checkpoint) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.saveCheckpoint")
defer span.End()
enc, err := encode(ctx, checkpoint)
if err != nil {
tracing.AnnotateError(span, err)
return err
}
hasStateSummary := s.HasStateSummary(ctx, bytesutil.ToBytes32(checkpoint.Root))
err = s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(checkpointBucket)
hasStateInDB := tx.Bucket(stateBucket).Get(checkpoint.Root) != nil
if !(hasStateInDB || hasStateSummary) {
log.WithField("root", fmt.Sprintf("%#x", bytesutil.Trunc(checkpoint.Root))).Warn("Recovering state summary")
if err := recoverStateSummary(ctx, tx, checkpoint.Root); err != nil {
return errMissingStateForCheckpoint
}
}
return bucket.Put(key, enc)
})
tracing.AnnotateError(span, err)
return err
}
// Recovers and saves state summary for a given root if the root has a block in the DB.
func recoverStateSummary(ctx context.Context, tx *bolt.Tx, root []byte) error {
blkBucket := tx.Bucket(blocksBucket)
blkEnc := blkBucket.Get(root)
if blkEnc == nil {
return fmt.Errorf("nil block, root: %#x", bytesutil.Trunc(root))
}
blk, err := unmarshalBlock(ctx, blkEnc)
if err != nil {
return errors.Wrapf(err, "Could not unmarshal block: %#x", bytesutil.Trunc(root))
}
summaryEnc, err := encode(ctx, ðpb.StateSummary{
Slot: blk.Block().Slot(),
Root: root,
})
if err != nil {
return err
}
summaryBucket := tx.Bucket(stateBucket)
return summaryBucket.Put(root, summaryEnc)
}