Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

chore: better resources closing for orchestrator and relayer #638

Merged
merged 3 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions cmd/blobstream/orchestrator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,30 @@ func Start() *cobra.Command {
defer cancel()

stopFuncs := make([]func() error, 0)
defer func() {
for _, f := range stopFuncs {
err := f()
if err != nil {
logger.Error(err.Error())
}
}
}()

tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GRPCInsecure)
stopFuncs = append(stopFuncs, stops...)
tmQuerier, appQuerier, storeStops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GRPCInsecure)
stopFuncs = append(stopFuncs, storeStops...)
if err != nil {
return err
}

s, stops, err := common.OpenStore(logger, config.Home, store.OpenOptions{
s, storeStops, err := common.OpenStore(logger, config.Home, store.OpenOptions{
HasDataStore: true,
BadgerOptions: store.DefaultBadgerOptions(config.Home),
HasSignatureStore: false,
HasEVMKeyStore: true,
HasP2PKeyStore: true,
})
stopFuncs = append(stopFuncs, stops...)
if err != nil {
stopFuncs = append(stopFuncs, storeStops...)
rach-id marked this conversation as resolved.
Show resolved Hide resolved
return err
}

Expand All @@ -110,25 +118,14 @@ func Start() *cobra.Command {
return err
}
stopFuncs = append(stopFuncs, func() error { return dht.Close() })
stopFuncs = append(stopFuncs, storeStops...)

// creating the p2p querier
p2pQuerier := p2p.NewQuerier(dht, logger)
retrier := helpers.NewRetrier(logger, 6, time.Minute)

defer func() {
for _, f := range stopFuncs {
err := f()
if err != nil {
logger.Error(err.Error())
}
}
}()

// creating the broadcaster
broadcaster := orchestrator.NewBroadcaster(p2pQuerier.BlobstreamDHT)
if err != nil {
return err
}

// creating the orchestrator
orch := orchestrator.New(
Expand Down
25 changes: 12 additions & 13 deletions cmd/blobstream/relayer/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,28 @@ func Start() *cobra.Command {
defer cancel()

stopFuncs := make([]func() error, 0)
defer func() {
for _, f := range stopFuncs {
err := f()
if err != nil {
logger.Error(err.Error())
}
}
}()

tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GrpcInsecure)
stopFuncs = append(stopFuncs, stops...)
tmQuerier, appQuerier, storeStops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GrpcInsecure)
stopFuncs = append(stopFuncs, storeStops...)
if err != nil {
return err
}

s, stops, err := common.OpenStore(logger, config.Home, store.OpenOptions{
s, storeStops, err := common.OpenStore(logger, config.Home, store.OpenOptions{
HasDataStore: true,
BadgerOptions: store.DefaultBadgerOptions(config.Home),
HasSignatureStore: true,
HasEVMKeyStore: true,
HasP2PKeyStore: true,
})
stopFuncs = append(stopFuncs, stops...)
if err != nil {
return err
}
Expand All @@ -163,20 +170,12 @@ func Start() *cobra.Command {
return err
}
stopFuncs = append(stopFuncs, func() error { return dht.Close() })
stopFuncs = append(stopFuncs, storeStops...)

// creating the p2p querier
p2pQuerier := p2p.NewQuerier(dht, logger)
retrier := helpers.NewRetrier(logger, 6, time.Minute)

defer func() {
for _, f := range stopFuncs {
err := f()
if err != nil {
logger.Error(err.Error())
}
}
}()

// connecting to a Blobstream contract
ethClient, err := ethclient.Dial(config.EvmRPC)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package orchestrator

import (
"context"
goerrors "errors"
"fmt"
"math/big"
"strconv"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (orch Orchestrator) Start(ctx context.Context) {
orch.Logger.Error("error listening to new attestations", "err", err)
cancel()
}
orch.Logger.Error("stopping listening to new attestations")
orch.Logger.Info("stopping listening to new attestations")
}()

// go routine for processing nonces
Expand All @@ -94,7 +95,7 @@ func (orch Orchestrator) Start(ctx context.Context) {
orch.Logger.Error("error processing attestations", "err", err)
cancel()
}
orch.Logger.Error("stopping processing attestations")
orch.Logger.Info("stopping processing attestations")
}()

// go routine for handling the previous attestation nonces
Expand Down Expand Up @@ -137,8 +138,11 @@ func (orch Orchestrator) StartNewEventsListener(
for {
select {
case <-signalChan:
return ErrSignalChanNotif
return nil
case <-ctx.Done():
if goerrors.Is(ctx.Err(), context.Canceled) {
return nil
}
return ctx.Err()
case <-ticker.C:
running := orch.TmQuerier.IsRunning(ctx)
Expand Down Expand Up @@ -217,7 +221,7 @@ func (orch Orchestrator) EnqueueMissingEvents(
for i := uint64(0); i < latestNonce-uint64(earliestAttestationNonce)+1; i++ {
select {
case <-signalChan:
return ErrSignalChanNotif
return nil
case <-ctx.Done():
return ctx.Err()
default:
Expand Down
Loading