Skip to content

Commit

Permalink
Add chain hash support on CLI graph client (#892)
Browse files Browse the repository at this point in the history
* add chain hash support on grpc CLI clients

* rename maps on drand daemon

* refactor beacon id reading functions to make them more standard
  • Loading branch information
emmanuelm41 committed Dec 31, 2021
1 parent e26d09e commit e1ec076
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 32 deletions.
38 changes: 36 additions & 2 deletions core/drand_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
type DrandDaemon struct {
initialStores map[string]*key.Store
beaconProcesses map[string]*BeaconProcess
chainHashes map[string]string

privGateway *net.PrivateGateway
pubGateway *net.PublicGateway
Expand Down Expand Up @@ -55,6 +56,7 @@ func NewDrandDaemon(c *Config) (*DrandDaemon, error) {
version: common.GetAppVersion(),
initialStores: make(map[string]*key.Store),
beaconProcesses: make(map[string]*BeaconProcess),
chainHashes: make(map[string]string),
}

// Add callback to registera new handler for http server after finishing DKG successfully
Expand All @@ -69,6 +71,7 @@ func NewDrandDaemon(c *Config) (*DrandDaemon, error) {
drandDaemon.state.Unlock()

if isPresent {
drandDaemon.AddNewChainHash(beaconID, bp)
drandDaemon.AddBeaconHandler(beaconID, bp)
}
}
Expand All @@ -81,7 +84,12 @@ func NewDrandDaemon(c *Config) (*DrandDaemon, error) {
}

func (dd *DrandDaemon) RemoteStatus(ctx context.Context, request *drand.RemoteStatusRequest) (*drand.RemoteStatusResponse, error) {
bp, _, err := dd.getBeaconProcess(request.Metadata)
beaconID, err := dd.readBeaconID(request.Metadata)
if err != nil {
return nil, err
}

bp, err := dd.getBeaconProcessByID(beaconID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,21 +166,44 @@ func (dd *DrandDaemon) InstantiateBeaconProcess(beaconID string, store key.Store
return bp, nil
}

func (dd *DrandDaemon) AddNewChainHash(beaconID string, bp *BeaconProcess) {
dd.state.Lock()
chainHash := chain.NewChainInfo(bp.group).HashString()
dd.chainHashes[chainHash] = beaconID
if common.IsDefaultBeaconID(beaconID) {
dd.chainHashes[common.DefaultChainHash] = beaconID
}
dd.state.Unlock()
}

// RemoveBeaconProcess remove a BeaconProcess linked to beacon with id 'beaconID'
func (dd *DrandDaemon) RemoveBeaconProcess(beaconID string) {
func (dd *DrandDaemon) RemoveBeaconProcess(beaconID string, bp *BeaconProcess) {
if beaconID == "" {
beaconID = common.DefaultBeaconID
}

chainHash := ""
if bp.group != nil {
info := chain.NewChainInfo(bp.group)
chainHash = info.HashString()
}

dd.state.Lock()

delete(dd.beaconProcesses, beaconID)
delete(dd.chainHashes, chainHash)
if common.IsDefaultBeaconID(beaconID) {
delete(dd.chainHashes, common.DefaultChainHash)
}

dd.state.Unlock()
}

// AddBeaconHandler adds a handler linked to beacon with chain hash from http server used to
// expose public services
func (dd *DrandDaemon) AddBeaconHandler(beaconID string, bp *BeaconProcess) {
info := chain.NewChainInfo(bp.group)

bh := dd.handler.RegisterNewBeaconHandler(&drandProxy{bp}, info.HashString())
if common.IsDefaultBeaconID(beaconID) {
dd.handler.RegisterDefaultBeaconHandler(bh)
Expand Down Expand Up @@ -215,6 +246,9 @@ func (dd *DrandDaemon) LoadBeacons(metricsFlag string) error {
} else {
fmt.Printf("beacon id [%s]: will start running randomness beacon.\n", beaconID)

// Add beacon chain hash as a new valid one
dd.AddNewChainHash(beaconID, bp)

// Add beacon handler from chain hash for http server
dd.AddBeaconHandler(beaconID, bp)

Expand Down
37 changes: 26 additions & 11 deletions core/drand_daemon_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ import (
// the DKG protocol to finish. If the request specifies this node is a leader,
// it starts the DKG protocol.
func (dd *DrandDaemon) InitDKG(c context.Context, in *drand.InitDKGPacket) (*drand.GroupPacket, error) {
bp, beaconID, err := dd.getBeaconProcess(in.GetMetadata())
beaconID, err := dd.readBeaconID(in.GetMetadata())
if err != nil {
return nil, err
}

bp, err := dd.getBeaconProcessByID(beaconID)
if err != nil {
store, isStoreLoaded := dd.initialStores[beaconID]
if !isStoreLoaded {
Expand All @@ -39,7 +44,12 @@ func (dd *DrandDaemon) InitDKG(c context.Context, in *drand.InitDKGPacket) (*dra
// InitReshare receives information about the old and new group from which to
// operate the resharing protocol.
func (dd *DrandDaemon) InitReshare(ctx context.Context, in *drand.InitResharePacket) (*drand.GroupPacket, error) {
bp, beaconID, err := dd.getBeaconProcess(in.GetMetadata())
beaconID, err := dd.readBeaconID(in.GetMetadata())
if err != nil {
return nil, err
}

bp, err := dd.getBeaconProcessByID(beaconID)
if err != nil {
store, isStoreLoaded := dd.initialStores[beaconID]
if !isStoreLoaded {
Expand Down Expand Up @@ -68,7 +78,7 @@ func (dd *DrandDaemon) PingPong(ctx context.Context, in *drand.Ping) (*drand.Pon

// Status responds with the actual status of drand process
func (dd *DrandDaemon) Status(ctx context.Context, in *drand.StatusRequest) (*drand.StatusResponse, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -84,7 +94,7 @@ func (dd *DrandDaemon) ListSchemes(ctx context.Context, in *drand.ListSchemesReq

// Share is a functionality of Control Service defined in protobuf/control that requests the private share of the drand node running locally
func (dd *DrandDaemon) Share(ctx context.Context, in *drand.ShareRequest) (*drand.ShareResponse, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -95,7 +105,7 @@ func (dd *DrandDaemon) Share(ctx context.Context, in *drand.ShareRequest) (*dran
// PublicKey is a functionality of Control Service defined in protobuf/control
// that requests the long term public key of the drand node running locally
func (dd *DrandDaemon) PublicKey(ctx context.Context, in *drand.PublicKeyRequest) (*drand.PublicKeyResponse, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -106,7 +116,7 @@ func (dd *DrandDaemon) PublicKey(ctx context.Context, in *drand.PublicKeyRequest
// PrivateKey is a functionality of Control Service defined in protobuf/control
// that requests the long term private key of the drand node running locally
func (dd *DrandDaemon) PrivateKey(ctx context.Context, in *drand.PrivateKeyRequest) (*drand.PrivateKeyResponse, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -116,7 +126,7 @@ func (dd *DrandDaemon) PrivateKey(ctx context.Context, in *drand.PrivateKeyReque

// GroupFile replies with the distributed key in the response
func (dd *DrandDaemon) GroupFile(ctx context.Context, in *drand.GroupRequest) (*drand.GroupPacket, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -130,15 +140,20 @@ func (dd *DrandDaemon) Shutdown(ctx context.Context, in *drand.ShutdownRequest)
if in.GetMetadata().GetBeaconID() == "" {
dd.Stop(ctx)
} else {
bp, beaconID, err := dd.getBeaconProcess(in.GetMetadata())
beaconID, err := dd.readBeaconID(in.GetMetadata())
if err != nil {
return nil, err
}

bp, err := dd.getBeaconProcessByID(beaconID)
if err != nil {
return nil, err
}

bp.Stop(ctx)

dd.RemoveBeaconHandler(beaconID, bp)
dd.RemoveBeaconProcess(beaconID)
dd.RemoveBeaconProcess(beaconID, bp)
}

metadata := common.NewMetadata(dd.version.ToProto())
Expand All @@ -147,7 +162,7 @@ func (dd *DrandDaemon) Shutdown(ctx context.Context, in *drand.ShutdownRequest)

// BackupDatabase triggers a backup of the primary database.
func (dd *DrandDaemon) BackupDatabase(ctx context.Context, in *drand.BackupDBRequest) (*drand.BackupDBResponse, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -156,7 +171,7 @@ func (dd *DrandDaemon) BackupDatabase(ctx context.Context, in *drand.BackupDBReq
}

func (dd *DrandDaemon) StartFollowChain(in *drand.StartFollowRequest, stream drand.Control_StartFollowChainServer) error {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return err
}
Expand Down
46 changes: 38 additions & 8 deletions core/drand_daemon_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,52 @@ import (
"fmt"

"github.com/drand/drand/common"

protoCommon "github.com/drand/drand/protobuf/common"
)

func (dd *DrandDaemon) getBeaconProcess(metadata *protoCommon.Metadata) (*BeaconProcess, string, error) {
beaconID := ""
if beaconID = metadata.GetBeaconID(); beaconID == "" {
beaconID = common.DefaultBeaconID
func (dd *DrandDaemon) readBeaconID(metadata *protoCommon.Metadata) (string, error) {
rcvBeaconID := metadata.GetBeaconID()

if chainHashHex := metadata.GetChainHash(); len(chainHashHex) != 0 {
chainHash := fmt.Sprintf("%x", chainHashHex)

dd.state.Lock()
beaconIDByHash, isChainHashFound := dd.chainHashes[chainHash]
dd.state.Unlock()

if isChainHashFound {
if rcvBeaconID != "" && rcvBeaconID != beaconIDByHash {
return "", fmt.Errorf("invalid chain hash")
}
rcvBeaconID = beaconIDByHash
}
}

if rcvBeaconID == "" {
rcvBeaconID = common.DefaultBeaconID
}

return rcvBeaconID, nil
}

func (dd *DrandDaemon) getBeaconProcessByID(beaconID string) (*BeaconProcess, error) {
dd.state.Lock()
bp, isBpCreated := dd.beaconProcesses[beaconID]
bp, isBeaconIDFound := dd.beaconProcesses[beaconID]
dd.state.Unlock()

if !isBpCreated {
return nil, beaconID, fmt.Errorf("beacon id [%s] is not running", beaconID)
if isBeaconIDFound {
return bp, nil
}

return nil, fmt.Errorf("beacon id [%s] is not running", beaconID)
}

func (dd *DrandDaemon) getBeaconProcessFromRequest(metadata *protoCommon.Metadata) (*BeaconProcess, error) {
beaconID, err := dd.readBeaconID(metadata)
if err != nil {
return nil, err
}

return bp, beaconID, nil
return dd.getBeaconProcessByID(beaconID)
}
22 changes: 11 additions & 11 deletions core/drand_daemon_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// BroadcastDKG is the public method to call during a DKG protocol.
func (dd *DrandDaemon) BroadcastDKG(c context.Context, in *drand.DKGPacket) (*drand.Empty, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -19,7 +19,7 @@ func (dd *DrandDaemon) BroadcastDKG(c context.Context, in *drand.DKGPacket) (*dr
// PartialBeacon receives a beacon generation request and answers
// with the partial signature from this drand node.
func (dd *DrandDaemon) PartialBeacon(c context.Context, in *drand.PartialBeaconPacket) (*drand.Empty, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -30,7 +30,7 @@ func (dd *DrandDaemon) PartialBeacon(c context.Context, in *drand.PartialBeaconP
// PublicRand returns a public random beacon according to the request. If the Round
// field is 0, then it returns the last one generated.
func (dd *DrandDaemon) PublicRand(c context.Context, in *drand.PublicRandRequest) (*drand.PublicRandResponse, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -40,7 +40,7 @@ func (dd *DrandDaemon) PublicRand(c context.Context, in *drand.PublicRandRequest

// PublicRandStream exports a stream of new beacons as they are generated over gRPC
func (dd *DrandDaemon) PublicRandStream(in *drand.PublicRandRequest, stream drand.Public_PublicRandStreamServer) error {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return err
}
Expand All @@ -50,7 +50,7 @@ func (dd *DrandDaemon) PublicRandStream(in *drand.PublicRandRequest, stream dran

// PrivateRand returns an ECIES encrypted random blob of 32 bytes from /dev/urandom
func (dd *DrandDaemon) PrivateRand(c context.Context, in *drand.PrivateRandRequest) (*drand.PrivateRandResponse, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -60,7 +60,7 @@ func (dd *DrandDaemon) PrivateRand(c context.Context, in *drand.PrivateRandReque

// Home provides the address the local node is listening
func (dd *DrandDaemon) Home(c context.Context, in *drand.HomeRequest) (*drand.HomeResponse, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -70,7 +70,7 @@ func (dd *DrandDaemon) Home(c context.Context, in *drand.HomeRequest) (*drand.Ho

// ChainInfo replies with the chain information this node participates to
func (dd *DrandDaemon) ChainInfo(ctx context.Context, in *drand.ChainInfoRequest) (*drand.ChainInfoPacket, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -80,7 +80,7 @@ func (dd *DrandDaemon) ChainInfo(ctx context.Context, in *drand.ChainInfoRequest

// SignalDKGParticipant receives a dkg signal packet from another member
func (dd *DrandDaemon) SignalDKGParticipant(ctx context.Context, in *drand.SignalDKGPacket) (*drand.Empty, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -90,7 +90,7 @@ func (dd *DrandDaemon) SignalDKGParticipant(ctx context.Context, in *drand.Signa

// PushDKGInfo triggers sending DKG info to other members
func (dd *DrandDaemon) PushDKGInfo(ctx context.Context, in *drand.DKGInfoPacket) (*drand.Empty, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand All @@ -101,7 +101,7 @@ func (dd *DrandDaemon) PushDKGInfo(ctx context.Context, in *drand.DKGInfoPacket)
// SyncChain is a inter-node protocol that replies to a syncing request from a
// given round
func (dd *DrandDaemon) SyncChain(in *drand.SyncRequest, stream drand.Protocol_SyncChainServer) error {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return err
}
Expand All @@ -111,7 +111,7 @@ func (dd *DrandDaemon) SyncChain(in *drand.SyncRequest, stream drand.Protocol_Sy

// GetIdentity returns the identity of this drand node
func (dd *DrandDaemon) GetIdentity(ctx context.Context, in *drand.IdentityRequest) (*drand.IdentityResponse, error) {
bp, _, err := dd.getBeaconProcess(in.GetMetadata())
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
if err != nil {
return nil, err
}
Expand Down

0 comments on commit e1ec076

Please sign in to comment.