Skip to content

Commit

Permalink
Create cron to remove signatures with status unknown (#49)
Browse files Browse the repository at this point in the history
* create type network with enum

* add cron remove unknown

* refactor get active validators

* return unknown validators

* rename to update signatures status

* rename to lower letter

* do not update status
  • Loading branch information
pablomendezroyo committed May 17, 2024
1 parent aa0185e commit 9cb6be4
Show file tree
Hide file tree
Showing 16 changed files with 359 additions and 253 deletions.
4 changes: 4 additions & 0 deletions listener/cmd/listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func main() {
c.AddFunc("@daily", func() {
// there are 24 * 30 = 720 hours in 30 days
apiCron.RemoveOldSignatures(dbCollection, 720)

})
c.AddFunc("* * * * *", func() {
apiCron.UpdateSignaturesStatus(dbCollection, config.BeaconNodeURLs)
})
c.Start()

Expand Down
5 changes: 3 additions & 2 deletions listener/internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"

"github.com/dappnode/validator-monitoring/listener/internal/api/routes"
"github.com/dappnode/validator-monitoring/listener/internal/api/types"
"github.com/dappnode/validator-monitoring/listener/internal/logger"
"go.mongodb.org/mongo-driver/mongo"
)
Expand All @@ -15,11 +16,11 @@ type httpApi struct {
port string
dbClient *mongo.Client
dbCollection *mongo.Collection
beaconNodeUrls map[string]string
beaconNodeUrls map[types.Network]string
}

// create a new api instance
func NewApi(port string, dbClient *mongo.Client, dbCollection *mongo.Collection, beaconNodeUrls map[string]string) *httpApi {
func NewApi(port string, dbClient *mongo.Client, dbCollection *mongo.Collection, beaconNodeUrls map[types.Network]string) *httpApi {
return &httpApi{
port: port,
dbClient: dbClient,
Expand Down
113 changes: 64 additions & 49 deletions listener/internal/api/handlers/postNewSignature.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,102 +13,117 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)

// Posting a new singature consists in the following steps:
// 1. Decode and validate
// 2. Get active validators
// 3. Validate signature and insert into MongoDB
func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection, beaconNodeUrls map[string]string) {
func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection, beaconNodeUrls map[types.Network]string) {
logger.Debug("Received new POST '/newSignature' request")

// Parse request body
var requests []types.SignatureRequest
err := json.NewDecoder(r.Body).Decode(&requests)
if err != nil {
logger.Error("Failed to decode request body: " + err.Error())
respondError(w, http.StatusBadRequest, "Invalid request format")
return
}
// Decode and validate incoming requests
validRequests, err := validation.ValidateAndDecodeRequests(requests)
if err != nil {

// Parse and validate request body
if err := json.NewDecoder(r.Body).Decode(&requests); err != nil {
logger.Error("Failed to decode request body: " + err.Error())
respondError(w, http.StatusBadRequest, "Invalid request format")
return
}
// Respond with an error if no valid requests were found
if len(validRequests) == 0 {

requestsValidatedAndDecoded, err := validation.ValidateAndDecodeRequests(requests)
if err != nil || len(requestsValidatedAndDecoded) == 0 {
logger.Error("Failed to validate and decode requests: " + err.Error())
respondError(w, http.StatusBadRequest, "No valid requests")
return
}

// Get active validators from the network, get the network from the first item in the array
beaconNodeUrl, ok := beaconNodeUrls[validRequests[0].Network]
// Check network validity
network := requestsValidatedAndDecoded[0].Network
beaconNodeUrl, ok := beaconNodeUrls[network]
if !ok {
respondError(w, http.StatusBadRequest, "Invalid network")
return
}

activeValidators, err := validation.GetActiveValidators(validRequests, beaconNodeUrl)
// Get active validators
pubkeys := getPubkeys(requestsValidatedAndDecoded)
validatorsStatusMap, err := validation.GetValidatorsStatus(pubkeys, beaconNodeUrl)
if err != nil {
logger.Error("Failed to get active validators: " + err.Error())
respondError(w, http.StatusInternalServerError, "Failed to get active validators")
respondError(w, http.StatusInternalServerError, "Failed to get active validators: "+err.Error())
return
}
if len(activeValidators) == 0 {
respondError(w, http.StatusInternalServerError, "No active validators found in network")

// Filter and verify signatures
validSignatures := filterAndVerifySignatures(requestsValidatedAndDecoded, validatorsStatusMap)
if len(validSignatures) == 0 {
respondError(w, http.StatusBadRequest, "No valid signatures")
return
}

// Insert valid signatures into MongoDB
if err := insertSignaturesIntoDB(validSignatures, dbCollection); err != nil {
logger.Error("Failed to insert signatures into MongoDB: " + err.Error())
respondError(w, http.StatusInternalServerError, "Failed to insert signatures into MongoDB")
return
}

validSignatures := []types.SignatureRequestDecodedWithActive{}
for _, req := range activeValidators {
isValidSignature, err := validation.VerifySignature(req)
if err != nil {
logger.Error("Failed to validate signature: " + err.Error())
respondOK(w, "Finished processing signatures")
}

func getPubkeys(requests []types.SignatureRequestDecoded) []string {
pubkeys := make([]string, len(requests))
for i, req := range requests {
pubkeys[i] = req.Pubkey
}
return pubkeys
}

func filterAndVerifySignatures(requests []types.SignatureRequestDecoded, validatorsStatusMap map[string]types.Status) []types.SignatureRequestDecodedWithStatus {
validSignatures := []types.SignatureRequestDecodedWithStatus{}
for _, req := range requests {
status, ok := validatorsStatusMap[req.Pubkey]
if !ok {
logger.Warn("Validator not found: " + req.Pubkey)
continue
}
if !isValidSignature {
logger.Warn("Invalid signature: " + req.Signature)
if status == types.Inactive {
logger.Warn("Inactive validator: " + req.Pubkey)
continue
}
validSignatures = append(validSignatures, req)
}
// Respond with an error if no valid signatures were found
if len(validSignatures) == 0 {
respondError(w, http.StatusBadRequest, "No valid signatures")
return
reqWithStatus := types.SignatureRequestDecodedWithStatus{
SignatureRequestDecoded: req,
Status: status,
}
if isValid, err := validation.VerifySignature(reqWithStatus); err == nil && isValid {
validSignatures = append(validSignatures, reqWithStatus)
} else {
logger.Warn("Invalid signature: " + req.Signature)
}
}
return validSignatures
}

// Iterate over all valid signatures and insert the signature
for _, req := range validSignatures {
func insertSignaturesIntoDB(signatures []types.SignatureRequestDecodedWithStatus, dbCollection *mongo.Collection) error {
for _, req := range signatures {
filter := bson.M{
"pubkey": req.Pubkey,
"tag": req.Tag,
"network": req.Network,
}
update := bson.M{
"$set": bson.M{
"status": req.Status, // Only save the last status
},
"$setOnInsert": bson.M{"status": req.Status}, // do not update status if already exists
"$push": bson.M{
"entries": bson.M{
"payload": req.Payload,
"signature": req.Signature,
"decodedPayload": bson.M{
"type": req.DecodedPayload.Type,
"platform": req.DecodedPayload.Platform,
"timestamp": req.DecodedPayload.Timestamp, // Needed to filter out old signatures
"timestamp": req.DecodedPayload.Timestamp,
},
},
},
}
options := options.Update().SetUpsert(true)
_, err := dbCollection.UpdateOne(context.TODO(), filter, update, options)
if err != nil {
logger.Error("Failed to insert signature into MongoDB: " + err.Error())
continue
if _, err := dbCollection.UpdateOne(context.Background(), filter, update, options); err != nil {
return err
}
logger.Debug("New Signature " + req.Signature + " inserted into MongoDB")
}

respondOK(w, "Finished processing signatures")
return nil
}
3 changes: 2 additions & 1 deletion listener/internal/api/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"net/http"

"github.com/dappnode/validator-monitoring/listener/internal/api/handlers"
"github.com/dappnode/validator-monitoring/listener/internal/api/types"
"github.com/gorilla/mux"
"go.mongodb.org/mongo-driver/mongo"
)

func SetupRouter(dbCollection *mongo.Collection, beaconNodeUrls map[string]string) *mux.Router {
func SetupRouter(dbCollection *mongo.Collection, beaconNodeUrls map[types.Network]string) *mux.Router {
r := mux.NewRouter()

// Define routes
Expand Down
51 changes: 38 additions & 13 deletions listener/internal/api/types/types.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,42 @@
package types

// In sync with brain
type Network string // "mainnet" | "holesky" | "gnosis" | "lukso"

const (
Mainnet Network = "mainnet"
Holesky Network = "holesky"
Gnosis Network = "gnosis"
Lukso Network = "lukso"
)

// In sync with brain
// @see https://github.com/dappnode/StakingBrain/blob/0aaeefa8aec1b21ba2f2882cb444747419a3ff5d/packages/common/src/types/db/types.ts#L27
type Tag string // "obol" | "diva" | "ssv" | "rocketpool" | "stakewise" | "stakehouse" | "solo" | "stader"

const (
Obol Tag = "obol"
Diva Tag = "diva"
Ssv Tag = "ssv"
Rocketpool Tag = "rocketpool"
Stakewise Tag = "stakewise"
Stakehouse Tag = "stakehouse"
Solo Tag = "solo"
Stader Tag = "stader"
)

type SignatureRequest struct {
Payload string `json:"payload"`
Pubkey string `json:"pubkey"`
Signature string `json:"signature"`
Network string `json:"network"`
Tag string `json:"tag"`
Payload string `json:"payload"`
Pubkey string `json:"pubkey"`
Signature string `json:"signature"`
Network Network `json:"network"`
Tag string `json:"tag"`
}

type DecodedPayload struct {
Type string `json:"type"`
Platform string `json:"platform"`
Timestamp string `json:"timestamp"`
}

type SignatureRequestDecoded struct {
Expand All @@ -19,16 +50,10 @@ type Status string
const (
Unknown Status = "unknown"
Active Status = "active"
Inactive Status = "inactive"
Inactive Status = "inactive" // means any response from beacon node that is not active
)

type SignatureRequestDecodedWithActive struct {
type SignatureRequestDecodedWithStatus struct {
SignatureRequestDecoded
Status Status `json:"status"` // "unknown" | "active" | "inactive"
}

type DecodedPayload struct {
Type string `json:"type"`
Platform string `json:"platform"`
Timestamp string `json:"timestamp"`
}
Loading

0 comments on commit 9cb6be4

Please sign in to comment.