Skip to content

Commit

Permalink
Slasher proto and function renames (prysmaticlabs#4797)
Browse files Browse the repository at this point in the history
* Rename elements for clarity
* Merge branch 'master' of https://github.com/prysmaticlabs/Prysm into slasher-renames
* Fix test
* Rename more functions
* Cleanup
* Fix logs
* Merge branch 'master' of https://github.com/prysmaticlabs/Prysm into slasher-renames
* Reorganize and clean up logs
* Address comments
* Add comments
  • Loading branch information
0xKiwi authored and cryptomental committed Feb 28, 2020
1 parent 0abde01 commit 6758a5a
Show file tree
Hide file tree
Showing 12 changed files with 344 additions and 356 deletions.
284 changes: 129 additions & 155 deletions proto/slashing/slashing.pb.go

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions proto/slashing/slashing.proto
Expand Up @@ -27,15 +27,19 @@ service Slasher {
rpc AttesterSlashings(SlashingStatusRequest) returns (AttesterSlashingResponse);
}

message ValidatorIDToIdxAtt {
// CompressedIdxAtt is an indexed attestation with the []byte data root
// in place of its AttestationData.
message CompressedIdxAtt {
repeated uint64 indices = 1 ;
bytes data_root = 2;
// 96 bytes aggregate signature.
bytes signature = 3;
}

message ValidatorIDToIdxAttList {
repeated ValidatorIDToIdxAtt indicesList = 1 ;
// CompressedIdxAttList is a list of CompressedIdxAtts used for
// accessing an array from the DB.
message CompressedIdxAttList {
repeated CompressedIdxAtt list = 1;
}

message ProposerSlashingRequest {
Expand Down Expand Up @@ -97,7 +101,7 @@ message SlashingStatusRequest {
// Included slashing proof that has been included in a block.
Included = 2;
// Reverted slashing proof that has been reverted and therefore is relevant again.
Reverted = 3;
Reverted = 3;
}
SlashingStatus status = 1;
}
4 changes: 2 additions & 2 deletions slasher/db/block_header.go
Expand Up @@ -20,9 +20,9 @@ func unmarshalBlockHeader(enc []byte) (*ethpb.SignedBeaconBlockHeader, error) {
return protoBlockHeader, nil
}

// BlockHeader accepts an epoch and validator id and returns the corresponding block header array.
// BlockHeaders accepts an epoch and validator id and returns the corresponding block header array.
// Returns nil if the block header for those values does not exist.
func (db *Store) BlockHeader(epoch uint64, validatorID uint64) ([]*ethpb.SignedBeaconBlockHeader, error) {
func (db *Store) BlockHeaders(epoch uint64, validatorID uint64) ([]*ethpb.SignedBeaconBlockHeader, error) {
var blockHeaders []*ethpb.SignedBeaconBlockHeader
err := db.view(func(tx *bolt.Tx) error {
c := tx.Bucket(historicBlockHeadersBucket).Cursor()
Expand Down
12 changes: 6 additions & 6 deletions slasher/db/block_header_test.go
Expand Up @@ -24,7 +24,7 @@ func TestNilDBHistoryBlkHdr(t *testing.T) {
t.Fatal("HasBlockHeader should return false")
}

bPrime, err := db.BlockHeader(epoch, validatorID)
bPrime, err := db.BlockHeaders(epoch, validatorID)
if err != nil {
t.Fatalf("failed to get block: %v", err)
}
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestSaveHistoryBlkHdr(t *testing.T) {
t.Fatalf("save block failed: %v", err)
}

bha, err := db.BlockHeader(tt.epoch, tt.vID)
bha, err := db.BlockHeaders(tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block: %v", err)
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestDeleteHistoryBlkHdr(t *testing.T) {
}

for _, tt := range tests {
bha, err := db.BlockHeader(tt.epoch, tt.vID)
bha, err := db.BlockHeaders(tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block: %v", err)
}
Expand All @@ -127,7 +127,7 @@ func TestDeleteHistoryBlkHdr(t *testing.T) {
if err != nil {
t.Fatalf("save block failed: %v", err)
}
bh, err := db.BlockHeader(tt.epoch, tt.vID)
bh, err := db.BlockHeaders(tt.epoch, tt.vID)

if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestPruneHistoryBlkHdr(t *testing.T) {
t.Fatalf("save block header failed: %v", err)
}

bha, err := db.BlockHeader(tt.epoch, tt.vID)
bha, err := db.BlockHeaders(tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block header: %v", err)
}
Expand All @@ -253,7 +253,7 @@ func TestPruneHistoryBlkHdr(t *testing.T) {
}

for _, tt := range tests {
bha, err := db.BlockHeader(tt.epoch, tt.vID)
bha, err := db.BlockHeaders(tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block header: %v", err)
}
Expand Down
16 changes: 9 additions & 7 deletions slasher/db/db.go
Expand Up @@ -12,6 +12,9 @@ import (
)

var log = logrus.WithField("prefix", "slasherDB")

var databaseFileName = "slasher.db"

var d *Store

// Store defines an implementation of the Prysm Database interface
Expand Down Expand Up @@ -53,12 +56,12 @@ func NewDB(dirPath string, cfg *Config) (*Store, error) {
return d, err
}

// ClearDB removes the previously stored directory at the data directory.
// ClearDB removes any previously stored data at the configured data directory.
func (db *Store) ClearDB() error {
if _, err := os.Stat(db.databasePath); os.IsNotExist(err) {
return nil
}
return os.RemoveAll(db.databasePath)
return os.Remove(db.databasePath)
}

// DatabasePath at which this database writes files.
Expand All @@ -82,7 +85,7 @@ func NewKVStore(dirPath string, cfg *Config) (*Store, error) {
if err := os.MkdirAll(dirPath, 0700); err != nil {
return nil, err
}
datafile := path.Join(dirPath, "slasher.db")
datafile := path.Join(dirPath, databaseFileName)
boltDB, err := bolt.Open(datafile, 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
if err == bolt.ErrTimeout {
Expand All @@ -103,17 +106,16 @@ func NewKVStore(dirPath string, cfg *Config) (*Store, error) {
OnEvict: saveToDB,
})
if err != nil {
errors.Wrap(err, "failed to start span cache")
return nil, err
return nil, errors.Wrap(err, "failed to start span cache")
}
kv := &Store{db: boltDB, databasePath: dirPath, spanCache: spanCache, spanCacheEnabled: cfg.SpanCacheEnabled}
kv := &Store{db: boltDB, databasePath: datafile, spanCache: spanCache, spanCacheEnabled: cfg.SpanCacheEnabled}

if err := kv.db.Update(func(tx *bolt.Tx) error {
return createBuckets(
tx,
historicIndexedAttestationsBucket,
historicBlockHeadersBucket,
indexedAttestationsIndicesBucket,
compressedIdxAttsBucket,
validatorsPublicKeysBucket,
validatorsMinMaxSpanBucket,
slashingBucket,
Expand Down
52 changes: 26 additions & 26 deletions slasher/db/indexed_attestations.go
Expand Up @@ -25,8 +25,8 @@ func unmarshalIdxAtt(enc []byte) (*ethpb.IndexedAttestation, error) {
return protoIdxAtt, nil
}

func unmarshalValIDsToIdxAttList(enc []byte) (*slashpb.ValidatorIDToIdxAttList, error) {
protoIdxAtt := &slashpb.ValidatorIDToIdxAttList{}
func unmarshalCompressedIdxAttList(enc []byte) (*slashpb.CompressedIdxAttList, error) {
protoIdxAtt := &slashpb.CompressedIdxAttList{}
err := proto.Unmarshal(enc, protoIdxAtt)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoding")
Expand All @@ -41,17 +41,17 @@ func (db *Store) IdxAttsForTargetFromID(targetEpoch uint64, validatorID uint64)
var idxAtts []*ethpb.IndexedAttestation

err := db.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(indexedAttestationsIndicesBucket)
bucket := tx.Bucket(compressedIdxAttsBucket)
enc := bucket.Get(bytesutil.Bytes8(targetEpoch))
if enc == nil {
return nil
}
idToIdxAttsList, err := unmarshalValIDsToIdxAttList(enc)
idToIdxAttsList, err := unmarshalCompressedIdxAttList(enc)
if err != nil {
return err
}

for _, idxAtt := range idToIdxAttsList.IndicesList {
for _, idxAtt := range idToIdxAttsList.List {
i := sort.Search(len(idxAtt.Indices), func(i int) bool {
return idxAtt.Indices[i] >= validatorID
})
Expand Down Expand Up @@ -116,7 +116,7 @@ func (db *Store) LatestIndexedAttestationsTargetEpoch() (uint64, error) {
func (db *Store) LatestValidatorIdx() (uint64, error) {
var lt uint64
err := db.view(func(tx *bolt.Tx) error {
c := tx.Bucket(indexedAttestationsIndicesBucket).Cursor()
c := tx.Bucket(compressedIdxAttsBucket).Cursor()
k, _ := c.Last()
if k == nil {
return nil
Expand Down Expand Up @@ -167,16 +167,16 @@ func (db *Store) HasIndexedAttestation(targetEpoch uint64, validatorID uint64) (
var hasAttestation bool
// #nosec G104
err := db.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(indexedAttestationsIndicesBucket)
bucket := tx.Bucket(compressedIdxAttsBucket)
enc := bucket.Get(key)
if enc == nil {
return nil
}
iList, err := unmarshalValIDsToIdxAttList(enc)
iList, err := unmarshalCompressedIdxAttList(enc)
if err != nil {
return err
}
for _, idxAtt := range iList.IndicesList {
for _, idxAtt := range iList.List {
i := sort.Search(len(idxAtt.Indices), func(i int) bool {
return idxAtt.Indices[i] >= validatorID
})
Expand Down Expand Up @@ -205,7 +205,7 @@ func (db *Store) SaveIndexedAttestation(idxAttestation *ethpb.IndexedAttestation
if val != nil {
return nil
}
if err := saveIdxAttIndicesByEpochToDB(idxAttestation, tx); err != nil {
if err := saveCompressedIdxAttToEpochList(idxAttestation, tx); err != nil {
return errors.Wrap(err, "failed to save indices from indexed attestation")
}
if err := bucket.Put(key, enc); err != nil {
Expand All @@ -225,26 +225,26 @@ func (db *Store) SaveIndexedAttestation(idxAttestation *ethpb.IndexedAttestation
return err
}

func saveIdxAttIndicesByEpochToDB(idxAttestation *ethpb.IndexedAttestation, tx *bolt.Tx) error {
func saveCompressedIdxAttToEpochList(idxAttestation *ethpb.IndexedAttestation, tx *bolt.Tx) error {
dataRoot, err := hashutil.HashProto(idxAttestation.Data)
if err != nil {
return errors.Wrap(err, "failed to hash indexed attestation data.")
}
protoIdxAtt := &slashpb.ValidatorIDToIdxAtt{
protoIdxAtt := &slashpb.CompressedIdxAtt{
Signature: idxAttestation.Signature,
Indices: idxAttestation.AttestingIndices,
DataRoot: dataRoot[:],
}

key := bytesutil.Bytes8(idxAttestation.Data.Target.Epoch)
bucket := tx.Bucket(indexedAttestationsIndicesBucket)
bucket := tx.Bucket(compressedIdxAttsBucket)
enc := bucket.Get(key)
vIdxList, err := unmarshalValIDsToIdxAttList(enc)
compressedIdxAttList, err := unmarshalCompressedIdxAttList(enc)
if err != nil {
return errors.Wrap(err, "failed to decode value into ValidatorIDToIndexedAttestationList")
return errors.Wrap(err, "failed to decode value into CompressedIdxAtt")
}
vIdxList.IndicesList = append(vIdxList.IndicesList, protoIdxAtt)
enc, err = proto.Marshal(vIdxList)
compressedIdxAttList.List = append(compressedIdxAttList.List, protoIdxAtt)
enc, err = proto.Marshal(compressedIdxAttList)
if err != nil {
return errors.Wrap(err, "failed to marshal")
}
Expand Down Expand Up @@ -281,26 +281,26 @@ func removeIdxAttIndicesByEpochFromDB(idxAttestation *ethpb.IndexedAttestation,
if err != nil {
return err
}
protoIdxAtt := &slashpb.ValidatorIDToIdxAtt{
protoIdxAtt := &slashpb.CompressedIdxAtt{
Signature: idxAttestation.Signature,
Indices: idxAttestation.AttestingIndices,
DataRoot: dataRoot[:],
}
key := bytesutil.Bytes8(idxAttestation.Data.Target.Epoch)
bucket := tx.Bucket(indexedAttestationsIndicesBucket)
bucket := tx.Bucket(compressedIdxAttsBucket)
enc := bucket.Get(key)
if enc == nil {
return errors.New("requested to delete data that is not present")
}
vIdxList, err := unmarshalValIDsToIdxAttList(enc)
vIdxList, err := unmarshalCompressedIdxAttList(enc)
if err != nil {
return errors.Wrap(err, "failed to decode value into ValidatorIDToIndexedAttestationList")
}
for i, attIdx := range vIdxList.IndicesList {
for i, attIdx := range vIdxList.List {
if reflect.DeepEqual(attIdx, protoIdxAtt) {
copy(vIdxList.IndicesList[i:], vIdxList.IndicesList[i+1:])
vIdxList.IndicesList[len(vIdxList.IndicesList)-1] = nil // or the zero value of T
vIdxList.IndicesList = vIdxList.IndicesList[:len(vIdxList.IndicesList)-1]
copy(vIdxList.List[i:], vIdxList.List[i+1:])
vIdxList.List[len(vIdxList.List)-1] = nil // or the zero value of T
vIdxList.List = vIdxList.List[:len(vIdxList.List)-1]
break
}
}
Expand Down Expand Up @@ -330,8 +330,8 @@ func (db *Store) pruneAttHistory(currentEpoch uint64, historySize uint64) error
}
}

idxBucket := tx.Bucket(indexedAttestationsIndicesBucket)
c = tx.Bucket(indexedAttestationsIndicesBucket).Cursor()
idxBucket := tx.Bucket(compressedIdxAttsBucket)
c = tx.Bucket(compressedIdxAttsBucket).Cursor()
for k, _ := c.First(); k != nil && bytes.Compare(k[:8], max) <= 0; k, _ = c.Next() {
if err := idxBucket.Delete(k); err != nil {
return errors.Wrap(err, "failed to delete indexed attestation from historical bucket")
Expand Down
2 changes: 1 addition & 1 deletion slasher/db/schema.go
Expand Up @@ -11,7 +11,7 @@ var (
historicIndexedAttestationsBucket = []byte("historic-indexed-attestations-bucket")
historicBlockHeadersBucket = []byte("historic-block-headers-bucket")
slashingBucket = []byte("slashing-bucket")
indexedAttestationsIndicesBucket = []byte("indexed-attestations-indices-bucket")
compressedIdxAttsBucket = []byte("compressed-idx-atts-bucket")
validatorsPublicKeysBucket = []byte("validators-public-keys-bucket")
// In order to quickly detect surround and surrounded attestations we need to store
// the min and max span for each validator for each epoch.
Expand Down
2 changes: 1 addition & 1 deletion slasher/rpc/server.go
Expand Up @@ -141,7 +141,7 @@ func (ss *Server) UpdateSpanMaps(ctx context.Context, req *ethpb.IndexedAttestat
func (ss *Server) IsSlashableBlock(ctx context.Context, psr *slashpb.ProposerSlashingRequest) (*slashpb.ProposerSlashingResponse, error) {
//TODO(#3133): add signature validation
epoch := helpers.SlotToEpoch(psr.BlockHeader.Header.Slot)
blockHeaders, err := ss.SlasherDB.BlockHeader(epoch, psr.ValidatorIndex)
blockHeaders, err := ss.SlasherDB.BlockHeaders(epoch, psr.ValidatorIndex)
if err != nil {
return nil, errors.Wrap(err, "slasher service error while trying to retrieve blocks")
}
Expand Down
1 change: 1 addition & 0 deletions slasher/service/BUILD.bazel
Expand Up @@ -14,6 +14,7 @@ go_library(
"//shared/cmd:go_default_library",
"//shared/debug:go_default_library",
"//shared/params:go_default_library",
"//shared/sliceutil:go_default_library",
"//shared/version:go_default_library",
"//slasher/db:go_default_library",
"//slasher/flags:go_default_library",
Expand Down

0 comments on commit 6758a5a

Please sign in to comment.