Skip to content

Commit

Permalink
Add new cli cmd to resurrect stopped beacon processes (#893)
Browse files Browse the repository at this point in the history
* add new cli cnd to resurrect stopped beacon processes
* update protobuf definitions
* apply some improvements on makefile on protobuf generation process
* add test case for new feature
  • Loading branch information
emmanuelm41 committed Jan 4, 2022
1 parent e1ec076 commit ef80ae4
Show file tree
Hide file tree
Showing 11 changed files with 588 additions and 218 deletions.
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ CLI_PACKAGE=github.com/drand/drand/cmd/drand-cli
GIT_REVISION := $(shell git rev-parse HEAD)
BUILD_DATE := $(shell date -u +%d/%m/%Y@%H:%M:%S)

PROTOC_VERSION=3.17.3
PROTOC_ZIP=protoc-$(PROTOC_VERSION)-linux-x86_64.zip

drand: build

#################### Lint and fmt process ##################
Expand Down Expand Up @@ -119,11 +122,13 @@ build_docker_dev:
############################################ Deps ############################################

install_deps_linux:
PROTOC_ZIP=protoc-3.14.0-linux-x86_64.zip
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v3.14.0/$PROTOC_ZIP
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v$(PROTOC_VERSION)/protoc-$(PROTOC_VERSION)-linux-x86_64.zip
sudo unzip -o $PROTOC_ZIP -d /usr/local bin/protoc
sudo unzip -o $PROTOC_ZIP -d /usr/local 'include/*'
rm -f $PROTOC_ZIP

install_deps_darwin:
brew install protobuf
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v$(PROTOC_VERSION)/protoc-$(PROTOC_VERSION)-osx-x86_64.zip
sudo unzip -o $PROTOC_ZIP -d /usr/local bin/protoc
sudo unzip -o $PROTOC_ZIP -d /usr/local 'include/*'
rm -f $PROTOC_ZIP
6 changes: 6 additions & 0 deletions cmd/drand-cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,12 @@ var appCommands = []*cli.Command{
return shareCmd(c)
},
},
{
Name: "reload",
Usage: "Launch a sharing protocol which has been previously stopped",
Flags: toArray(controlFlag, beaconIDFlag),
Action: reloadCmd,
},
{
Name: "follow",
Usage: "follow and store a randomness chain",
Expand Down
126 changes: 117 additions & 9 deletions cmd/drand-cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func TestStartWithoutGroup(t *testing.T) {

func testStartedDrandFunctional(t *testing.T, ctrlPort, rootPath, address string, group *key.Group, fileStore key.Store, beaconID string) {
testPing(t, ctrlPort)
testStatus(t, ctrlPort)
testStatus(t, ctrlPort, beaconID)
testListSchemes(t, ctrlPort)

require.NoError(t, toml.NewEncoder(os.Stdout).Encode(group))
Expand Down Expand Up @@ -489,12 +489,12 @@ func testPing(t *testing.T, ctrlPort string) {
require.NoError(t, err)
}

func testStatus(t *testing.T, ctrlPort string) {
func testStatus(t *testing.T, ctrlPort, beaconID string) {
var err error

fmt.Println(" + running STATUS command with ", ctrlPort)
fmt.Println(" + running STATUS command with ", ctrlPort, " on beacon [", beaconID, "]")
for i := 0; i < 3; i++ {
status := []string{"drand", "util", "status", "--control", ctrlPort}
status := []string{"drand", "util", "status", "--control", ctrlPort, "--id", beaconID}
err = CLI().Run(status)
if err == nil {
break
Expand Down Expand Up @@ -664,6 +664,64 @@ func getSBFolderStructure() string {
return tmp
}

func TestDrandReloadBeacon(t *testing.T) {
sch := scheme.GetSchemeFromEnv()
beaconID := common.GetBeaconIDFromEnv()

// beacon id need to have a value in order to stop one beacon process
// if beacon id is empty, the id will be "default" internally
if beaconID == "" {
beaconID = common.DefaultBeaconID
}

n := 4
instances, tempPath := launchDrandInstances(t, n)
defer os.RemoveAll(tempPath)

for i, inst := range instances {
if i == 0 {
inst.shareLeader(t, n, n, 2, beaconID, sch)
} else {
inst.share(t, instances[0].addr, beaconID)
}
time.Sleep(500 * time.Millisecond)
}

defer func() {
for _, inst := range instances {
inst.stopAll()
}
}()

time.Sleep(1 * time.Second)

// try to reload a beacon which is already loaded
err := instances[3].reload(beaconID)
require.Error(t, err)

// wait some time to generate some randomness
time.Sleep(1 * time.Minute)

// Stop beacon process... not the entire node
err = instances[3].stop(beaconID)
require.NoError(t, err)

// check the node is still alive
testPing(t, instances[3].ctrlPort)

// reload a beacon
err = instances[3].reload(beaconID)
require.NoError(t, err)

// test beacon process status
testStatus(t, instances[3].ctrlPort, beaconID)

time.Sleep(5 * time.Second)

// test beacon process status
testStatus(t, instances[3].ctrlPort, beaconID)
}

func TestDrandStatus(t *testing.T) {
n := 4
instances, tempPath := launchDrandInstances(t, n)
Expand Down Expand Up @@ -694,7 +752,7 @@ func TestDrandStatus(t *testing.T) {
// stop one and check that all nodes report this node down
toStop := 2
insToStop := instances[toStop]
insToStop.stop()
insToStop.stopAll()

for i, instance := range instances {
if i == toStop {
Expand Down Expand Up @@ -730,11 +788,60 @@ type drandInstance struct {
certsDir string
}

func (d *drandInstance) stop() error {
func (d *drandInstance) stopAll() error {
return CLI().Run([]string{"drand", "stop", "--control", d.ctrlPort})
}

func (d *drandInstance) run(t *testing.T) {
func (d *drandInstance) stop(beaconID string) error {
return CLI().Run([]string{"drand", "stop", "--control", d.ctrlPort, "--id", beaconID})
}

func (d *drandInstance) shareLeader(t *testing.T, nodes, threshold, period int, beaconID string, sch scheme.Scheme) {
shareArgs := []string{
"drand",
"share",
"--leader",
"--nodes", strconv.Itoa(nodes),
"--threshold", strconv.Itoa(threshold),
"--period", fmt.Sprintf("%ds", period),
"--control", d.ctrlPort,
"--scheme", sch.ID,
"--id", beaconID,
}

go func() {
err := CLI().Run(shareArgs)
require.NoError(t, err)
}()
}

func (d *drandInstance) share(t *testing.T, leaderURL, beaconID string) {
shareArgs := []string{
"drand",
"share",
"--connect", leaderURL,
"--control", d.ctrlPort,
"--id", beaconID,
}

go func() {
err := CLI().Run(shareArgs)
require.NoError(t, err)
}()
}

func (d *drandInstance) reload(beaconID string) error {
reloadArgs := []string{
"drand",
"reload",
"--control", d.ctrlPort,
"--id", beaconID,
}

return CLI().Run(reloadArgs)
}

func (d *drandInstance) run(t *testing.T, beaconID string) {
startArgs := []string{
"drand",
"start",
Expand All @@ -753,7 +860,7 @@ func (d *drandInstance) run(t *testing.T) {
}()

// make sure we run each one sequentially
testStatus(t, d.ctrlPort)
testStatus(t, d.ctrlPort, beaconID)
}

//nolint: gocritic
Expand Down Expand Up @@ -806,8 +913,9 @@ func launchDrandInstances(t *testing.T, n int) ([]*drandInstance, string) {
})
}

os.Setenv("DRAND_SHARE_SECRET", "testtesttestesttesttesttestesttesttesttestesttesttesttestest")
for _, instance := range ins {
instance.run(t)
instance.run(t, beaconID)
}
return ins, tmpPath
}
16 changes: 16 additions & 0 deletions cmd/drand-cli/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,22 @@ func leadShareCmd(c *cli.Context) error {
return groupOut(c, group)
}

func reloadCmd(c *cli.Context) error {
client, err := controlClient(c)
if err != nil {
return err
}

beaconID := getBeaconID(c)
_, err = client.ReloadBeacon(beaconID)
if err != nil {
return fmt.Errorf("could not reload the beacon process [%s]: %s", beaconID, err)
}

fmt.Fprintf(output, "Beacon process [%s] is alive again \n", beaconID)
return nil
}

func reshareCmd(c *cli.Context) error {
if c.Bool(leaderFlag.Name) {
return leadReshareCmd(c)
Expand Down
72 changes: 44 additions & 28 deletions core/drand_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,12 @@ func (dd *DrandDaemon) AddBeaconHandler(beaconID string, bp *BeaconProcess) {
// RemoveBeaconHandler removes a handler linked to beacon with chain hash from http server used to
// expose public services
func (dd *DrandDaemon) RemoveBeaconHandler(beaconID string, bp *BeaconProcess) {
info := chain.NewChainInfo(bp.group)
dd.handler.RemoveBeaconHandler(info.HashString())
if common.IsDefaultBeaconID(beaconID) {
dd.handler.RemoveBeaconHandler(common.DefaultChainHash)
if bp.group != nil {
info := chain.NewChainInfo(bp.group)
dd.handler.RemoveBeaconHandler(info.HashString())
if common.IsDefaultBeaconID(beaconID) {
dd.handler.RemoveBeaconHandler(common.DefaultChainHash)
}
}
}

Expand All @@ -230,40 +232,54 @@ func (dd *DrandDaemon) LoadBeacons(metricsFlag string) error {
}

for beaconID, fs := range stores {
bp, err := dd.InstantiateBeaconProcess(beaconID, fs)
bp, err := dd.LoadBeacon(beaconID, fs)
if err != nil {
fmt.Printf("beacon id [%s]: can't instantiate randomness beacon. err: %s \n", beaconID, err)
return err
}

freshRun, err := bp.Load()
if err != nil {
return err
// Start metrics server
if metricsFlag != "" {
_ = metrics.Start(metricsFlag, pprof.WithProfile(), bp.PeerMetrics)
}
}

return nil
}

func (dd *DrandDaemon) ReloadBeaconFromDisk(beaconID string) (*BeaconProcess, error) {
store := key.NewFileStore(dd.opts.ConfigFolderMB(), beaconID)
return dd.LoadBeacon(beaconID, store)
}

if freshRun {
fmt.Printf("beacon id [%s]: will run as fresh install -> expect to run DKG.\n", beaconID)
} else {
fmt.Printf("beacon id [%s]: will start running randomness beacon.\n", beaconID)
func (dd *DrandDaemon) LoadBeacon(beaconID string, store key.Store) (*BeaconProcess, error) {
bp, err := dd.InstantiateBeaconProcess(beaconID, store)
if err != nil {
fmt.Printf("beacon id [%s]: can't instantiate randomness beacon. err: %s \n", beaconID, err)
return nil, err
}

// Add beacon chain hash as a new valid one
dd.AddNewChainHash(beaconID, bp)
freshRun, err := bp.Load()
if err != nil {
return nil, err
}

// Add beacon handler from chain hash for http server
dd.AddBeaconHandler(beaconID, bp)
if freshRun {
fmt.Printf("beacon id [%s]: will run as fresh install -> expect to run DKG.\n", beaconID)
} else {
fmt.Printf("beacon id [%s]: will start running randomness beacon.\n", beaconID)

// XXX make it configurable so that new share holder can still start if
// nobody started.
// drand.StartBeacon(!c.Bool(pushFlag.Name))
catchup := true
bp.StartBeacon(catchup)
}
// Add beacon chain hash as a new valid one
dd.AddNewChainHash(beaconID, bp)

// Start metrics server
if metricsFlag != "" {
_ = metrics.Start(metricsFlag, pprof.WithProfile(), bp.PeerMetrics)
}
// Add beacon handler from chain has for http server
dd.AddBeaconHandler(beaconID, bp)

// XXX make it configurable so that new share holder can still start if
// nobody started.
// drand.StartBeacon(!c.Bool(pushFlag.Name))
catchup := true
bp.StartBeacon(catchup)
}

return nil
return bp, nil
}
21 changes: 21 additions & 0 deletions core/drand_daemon_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,27 @@ func (dd *DrandDaemon) Shutdown(ctx context.Context, in *drand.ShutdownRequest)
return &drand.ShutdownResponse{Metadata: metadata}, nil
}

// ReloadBeacon
func (dd *DrandDaemon) ReloadBeacon(ctx context.Context, in *drand.ReloadBeaconRequest) (*drand.ReloadBeaconResponse, error) {
beaconID, err := dd.readBeaconID(in.GetMetadata())
if err != nil {
return nil, err
}

_, err = dd.getBeaconProcessByID(beaconID)
if err == nil {
return nil, fmt.Errorf("beacon id [%s] is already running", beaconID)
}

_, err = dd.ReloadBeaconFromDisk(beaconID)
if err != nil {
return nil, err
}

metadata := common.NewMetadata(dd.version.ToProto())
return &drand.ReloadBeaconResponse{Metadata: metadata}, nil
}

// BackupDatabase triggers a backup of the primary database.
func (dd *DrandDaemon) BackupDatabase(ctx context.Context, in *drand.BackupDBRequest) (*drand.BackupDBResponse, error) {
bp, err := dd.getBeaconProcessFromRequest(in.GetMetadata())
Expand Down
10 changes: 10 additions & 0 deletions net/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ func (c *ControlClient) Ping() error {
return err
}

// Reload
func (c *ControlClient) ReloadBeacon(beaconID string) (*control.ReloadBeaconResponse, error) {
metadata := protoCommon.Metadata{
NodeVersion: c.version.ToProto(), BeaconID: beaconID,
}

resp, err := c.client.ReloadBeacon(ctx.Background(), &control.ReloadBeaconRequest{Metadata: &metadata})
return resp, err
}

// Status gets the current daemon status
func (c *ControlClient) Status(beaconID string) (*control.StatusResponse, error) {
metadata := protoCommon.Metadata{
Expand Down
Loading

0 comments on commit ef80ae4

Please sign in to comment.