From ef0a4979647a6bd4319338d33afb09799c75015e Mon Sep 17 00:00:00 2001 From: Ryan Date: Thu, 1 Feb 2024 10:04:30 +0100 Subject: [PATCH] feat: separating core.ip into core.grpc.ip and core.rpc.ip --- nodebuilder/core/config.go | 21 +++++--- nodebuilder/core/constructors.go | 2 +- nodebuilder/core/flags.go | 86 ++++++++++++++++++++++--------- nodebuilder/state/core.go | 2 +- nodebuilder/tests/helpers_test.go | 6 +-- nodebuilder/tests/swamp/swamp.go | 3 +- state/core_access.go | 13 +++-- state/core_access_test.go | 3 +- state/integration_test.go | 2 +- 9 files changed, 96 insertions(+), 42 deletions(-) diff --git a/nodebuilder/core/config.go b/nodebuilder/core/config.go index bb5eea5b83..f813f3a2c4 100644 --- a/nodebuilder/core/config.go +++ b/nodebuilder/core/config.go @@ -11,7 +11,8 @@ var MetricsEnabled bool // Config combines all configuration fields for managing the relationship with a Core node. type Config struct { - IP string + RPCIP string + GRPCIP string RPCPort string GRPCPort string } @@ -20,7 +21,8 @@ type Config struct { // node's connection to a Celestia-Core endpoint. func DefaultConfig() Config { return Config{ - IP: "", + RPCIP: "", + GRPCIP: "", RPCPort: "26657", GRPCPort: "9090", } @@ -32,11 +34,18 @@ func (cfg *Config) Validate() error { return nil } - ip, err := utils.ValidateAddr(cfg.IP) + rpcIP, err := utils.ValidateAddr(cfg.RPCIP) if err != nil { - return err + return fmt.Errorf("nodebuilder/core: invalid rpc ip: %s", err.Error()) } - cfg.IP = ip + cfg.RPCIP = rpcIP + + grpcIP, err := utils.ValidateAddr(cfg.GRPCIP) + if err != nil { + return fmt.Errorf("nodebuilder/core: invalid grpc ip: %s", err.Error()) + } + cfg.GRPCIP = grpcIP + _, err = strconv.Atoi(cfg.RPCPort) if err != nil { return fmt.Errorf("nodebuilder/core: invalid rpc port: %s", err.Error()) @@ -51,5 +60,5 @@ func (cfg *Config) Validate() error { // IsEndpointConfigured returns whether a core endpoint has been set // on the config (true if set). func (cfg *Config) IsEndpointConfigured() bool { - return cfg.IP != "" + return cfg.RPCIP != "" && cfg.GRPCIP != "" } diff --git a/nodebuilder/core/constructors.go b/nodebuilder/core/constructors.go index 53c914a041..189a18c3f8 100644 --- a/nodebuilder/core/constructors.go +++ b/nodebuilder/core/constructors.go @@ -5,5 +5,5 @@ import ( ) func remote(cfg Config) (core.Client, error) { - return core.NewRemote(cfg.IP, cfg.RPCPort) + return core.NewRemote(cfg.RPCIP, cfg.RPCPort) } diff --git a/nodebuilder/core/flags.go b/nodebuilder/core/flags.go index 9cbed9b277..41d3c8426e 100644 --- a/nodebuilder/core/flags.go +++ b/nodebuilder/core/flags.go @@ -8,9 +8,11 @@ import ( ) var ( - coreFlag = "core.ip" - coreRPCFlag = "core.rpc.port" - coreGRPCFlag = "core.grpc.port" + ipFlag = "core.ip" + rpcIPFlag = "core.rpc.ip" + grpcIPFlag = "core.grpc.ip" + rpcPortFlag = "core.rpc.port" + grpcPortFlag = "core.grpc.port" ) // Flags gives a set of hardcoded Core flags. @@ -18,43 +20,81 @@ func Flags() *flag.FlagSet { flags := &flag.FlagSet{} flags.String( - coreFlag, + ipFlag, "", - "Indicates node to connect to the given core node. "+ + "Indicates node to connect to the given core node's RPC and gRPC. "+ + "NOTE: If this flag is set, the core.rpc.ip and core.grpc.ip flags cannot be set. "+ "Example: , 127.0.0.1. , subdomain.domain.tld "+ "Assumes RPC port 26657 and gRPC port 9090 as default unless otherwise specified.", ) flags.String( - coreRPCFlag, + rpcIPFlag, + "", + "Indicates node to connect to the given core node's RPC. "+ + "NOTE: If this flag is set, the core.ip flag cannot be set. "+ + "Example: , 127.0.0.1. , subdomain.domain.tld "+ + "Assumes RPC port 26657 and gRPC port 9090 as default unless otherwise specified.", + ) + flags.String( + grpcIPFlag, + "", + "Indicates node to connect to the given core node's gRPC. "+ + "NOTE: If this flag is set, the core.ip flag cannot be set. "+ + "Example: , 127.0.0.1. , subdomain.domain.tld "+ + "Assumes RPC port 26657 and gRPC port 9090 as default unless otherwise specified.", + ) + flags.String( + rpcPortFlag, "26657", - "Set a custom RPC port for the core node connection. The --core.ip flag must also be provided.", + "Set a custom RPC port for the core node connection. The --core.rpc.ip or --core.ip flag must also be provided.", ) flags.String( - coreGRPCFlag, + grpcPortFlag, "9090", - "Set a custom gRPC port for the core node connection. The --core.ip flag must also be provided.", + "Set a custom gRPC port for the core node connection. The --core.grpc.ip or --core.ip flag must also be provided.", ) return flags } // ParseFlags parses Core flags from the given cmd and saves them to the passed config. -func ParseFlags( - cmd *cobra.Command, - cfg *Config, -) error { - coreIP := cmd.Flag(coreFlag).Value.String() - if coreIP == "" { - if cmd.Flag(coreGRPCFlag).Changed || cmd.Flag(coreRPCFlag).Changed { - return fmt.Errorf("cannot specify RPC/gRPC ports without specifying an IP address for --core.ip") +func ParseFlags(cmd *cobra.Command, cfg *Config) error { + coreIP := cmd.Flag(ipFlag).Value.String() + coreRPCIP, coreGRPCIP := cmd.Flag(rpcIPFlag).Value.String(), cmd.Flag(grpcIPFlag).Value.String() + + // Check if core.ip is specified along with core.rpc.ip or core.grpc.ip + if coreIP != "" && (cmd.Flag(rpcIPFlag).Changed || cmd.Flag(grpcIPFlag).Changed) { + return fmt.Errorf("cannot specify core.ip and core.rpc.ip or core.grpc.ip flags together") + } + + // Validate IP addresses and port settings + if coreIP == "" { // No core.ip specified + if coreRPCIP == "" { + if cmd.Flag(rpcPortFlag).Changed { + return fmt.Errorf("cannot specify RPC ports without specifying an IP address for --core.rpc.ip") + } + if cmd.Flag(grpcIPFlag).Changed { + return fmt.Errorf("setting gRPC IP requires also specifying an RPC IP. If they are identical use --core.ip instead") + } + } + if coreGRPCIP == "" { + if cmd.Flag(grpcPortFlag).Changed { + return fmt.Errorf("cannot specify gRPC ports without specifying an IP address for --core.grpc.ip") + } + if cmd.Flag(rpcIPFlag).Changed { + return fmt.Errorf("setting RPC IP requires also specifying a gRPC IP. If they are identical use --core.ip instead") + } } - return nil } - rpc := cmd.Flag(coreRPCFlag).Value.String() - grpc := cmd.Flag(coreGRPCFlag).Value.String() + // Assign IP addresses + if coreIP != "" { + cfg.RPCIP, cfg.GRPCIP = coreIP, coreIP + } else { + cfg.RPCIP, cfg.GRPCIP = coreRPCIP, coreGRPCIP + } + + // Assign ports + cfg.RPCPort, cfg.GRPCPort = cmd.Flag(rpcPortFlag).Value.String(), cmd.Flag(grpcPortFlag).Value.String() - cfg.IP = coreIP - cfg.RPCPort = rpc - cfg.GRPCPort = grpc return cfg.Validate() } diff --git a/nodebuilder/state/core.go b/nodebuilder/state/core.go index f8f8508540..06e16b8034 100644 --- a/nodebuilder/state/core.go +++ b/nodebuilder/state/core.go @@ -20,7 +20,7 @@ func coreAccessor( sync *sync.Syncer[*header.ExtendedHeader], fraudServ libfraud.Service[*header.ExtendedHeader], ) (*state.CoreAccessor, Module, *modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader]) { - ca := state.NewCoreAccessor(signer, sync, corecfg.IP, corecfg.RPCPort, corecfg.GRPCPort) + ca := state.NewCoreAccessor(signer, sync, corecfg.RPCIP, corecfg.GRPCIP, corecfg.RPCPort, corecfg.GRPCPort) return ca, ca, &modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader]{ Service: ca, diff --git a/nodebuilder/tests/helpers_test.go b/nodebuilder/tests/helpers_test.go index 1e7f14d823..978b66553d 100644 --- a/nodebuilder/tests/helpers_test.go +++ b/nodebuilder/tests/helpers_test.go @@ -6,12 +6,12 @@ import ( "testing" "time" + "github.com/filecoin-project/go-jsonrpc/auth" + "github.com/stretchr/testify/require" + "github.com/celestiaorg/celestia-node/api/rpc/client" "github.com/celestiaorg/celestia-node/libs/authtoken" "github.com/celestiaorg/celestia-node/nodebuilder" - - "github.com/filecoin-project/go-jsonrpc/auth" - "github.com/stretchr/testify/require" ) func getAdminClient(ctx context.Context, nd *nodebuilder.Node, t *testing.T) *client.Client { diff --git a/nodebuilder/tests/swamp/swamp.go b/nodebuilder/tests/swamp/swamp.go index 9faf69744d..55bc62750f 100644 --- a/nodebuilder/tests/swamp/swamp.go +++ b/nodebuilder/tests/swamp/swamp.go @@ -196,7 +196,8 @@ func (s *Swamp) DefaultTestConfig(tp node.Type) *nodebuilder.Config { ip, port, err := net.SplitHostPort(s.cfg.AppConfig.GRPC.Address) require.NoError(s.t, err) - cfg.Core.IP = ip + cfg.Core.RPCIP = ip + cfg.Core.GRPCIP = ip cfg.Core.GRPCPort = port return cfg } diff --git a/state/core_access.go b/state/core_access.go index c3fbd4836a..bf662a056d 100644 --- a/state/core_access.go +++ b/state/core_access.go @@ -58,7 +58,8 @@ type CoreAccessor struct { prt *merkle.ProofRuntime coreConn *grpc.ClientConn - coreIP string + rpcIP string + grpcIP string rpcPort string grpcPort string @@ -79,7 +80,8 @@ type CoreAccessor struct { func NewCoreAccessor( signer *apptypes.KeyringSigner, getter libhead.Head[*header.ExtendedHeader], - coreIP, + rpcIP, + grpcIP, rpcPort string, grpcPort string, ) *CoreAccessor { @@ -90,7 +92,8 @@ func NewCoreAccessor( return &CoreAccessor{ signer: signer, getter: getter, - coreIP: coreIP, + rpcIP: rpcIP, + grpcIP: grpcIP, rpcPort: rpcPort, grpcPort: grpcPort, prt: prt, @@ -104,7 +107,7 @@ func (ca *CoreAccessor) Start(ctx context.Context) error { ca.ctx, ca.cancel = context.WithCancel(context.Background()) // dial given celestia-core endpoint - endpoint := fmt.Sprintf("%s:%s", ca.coreIP, ca.grpcPort) + endpoint := fmt.Sprintf("%s:%s", ca.grpcIP, ca.grpcPort) client, err := grpc.DialContext( ctx, endpoint, @@ -121,7 +124,7 @@ func (ca *CoreAccessor) Start(ctx context.Context) error { stakingCli := stakingtypes.NewQueryClient(ca.coreConn) ca.stakingCli = stakingCli // create ABCI query client - cli, err := http.New(fmt.Sprintf("http://%s:%s", ca.coreIP, ca.rpcPort), "/websocket") + cli, err := http.New(fmt.Sprintf("http://%s:%s", ca.rpcIP, ca.rpcPort), "/websocket") if err != nil { return err } diff --git a/state/core_access_test.go b/state/core_access_test.go index ad7b916ea3..829758fb55 100644 --- a/state/core_access_test.go +++ b/state/core_access_test.go @@ -37,7 +37,8 @@ func TestSubmitPayForBlob(t *testing.T) { defer cancel() signer := blobtypes.NewKeyringSigner(cctx.Keyring, accounts[0], cctx.ChainID) - ca := NewCoreAccessor(signer, nil, "127.0.0.1", extractPort(rpcAddr), extractPort(grpcAddr)) + coreIP := "127.0.0.1" + ca := NewCoreAccessor(signer, nil, coreIP, coreIP, extractPort(rpcAddr), extractPort(grpcAddr)) // start the accessor err := ca.Start(ctx) require.NoError(t, err) diff --git a/state/integration_test.go b/state/integration_test.go index 193e7bddc7..5fd60c00f3 100644 --- a/state/integration_test.go +++ b/state/integration_test.go @@ -51,7 +51,7 @@ func (s *IntegrationTestSuite) SetupSuite() { s.accounts = cfg.Accounts signer := blobtypes.NewKeyringSigner(s.cctx.Keyring, s.accounts[0], s.cctx.ChainID) - accessor := NewCoreAccessor(signer, localHeader{s.cctx.Client}, "", "", "") + accessor := NewCoreAccessor(signer, localHeader{s.cctx.Client}, "", "", "", "") setClients(accessor, s.cctx.GRPCClient, s.cctx.Client) s.accessor = accessor