From dccfba39b5a8d8981ff3859d63755a4e4d3f3246 Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Wed, 1 Oct 2025 00:15:44 +0500 Subject: [PATCH] discovery: peers+balance --- pkg/lumera/client.go | 26 +++++++++++++++ pkg/lumera/interface.go | 2 ++ pkg/lumera/lumera_mock.go | 15 +++++++++ pkg/lumera/modules/bank/impl.go | 30 +++++++++++++++++ pkg/lumera/modules/bank/interface.go | 18 ++++++++++ pkg/testutil/lumera.go | 24 ++++++++++++-- sdk/task/helpers.go | 47 +------------------------- sdk/task/manager.go | 12 ++----- sdk/task/task.go | 49 +++++++++++++++++++++++++--- 9 files changed, 160 insertions(+), 63 deletions(-) create mode 100644 pkg/lumera/modules/bank/impl.go create mode 100644 pkg/lumera/modules/bank/interface.go diff --git a/pkg/lumera/client.go b/pkg/lumera/client.go index bac35d68..2e25877c 100644 --- a/pkg/lumera/client.go +++ b/pkg/lumera/client.go @@ -2,10 +2,12 @@ package lumera import ( "context" + "fmt" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/action" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/action_msg" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/auth" + "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/bank" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/node" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/supernode" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/tx" @@ -16,6 +18,7 @@ type lumeraClient struct { authMod auth.Module actionMod action.Module actionMsgMod action_msg.Module + bankMod bank.Module supernodeMod supernode.Module txMod tx.Module nodeMod node.Module @@ -53,12 +56,30 @@ func newClient(ctx context.Context, cfg *Config) (Client, error) { return nil, err } + bankModule, err := bank.NewModule(conn.GetConn()) + if err != nil { + conn.Close() + return nil, err + } + nodeModule, err := node.NewModule(conn.GetConn(), cfg.keyring) if err != nil { conn.Close() return nil, err } + // Preflight: verify configured ChainID matches node's reported network + if nodeInfo, nerr := nodeModule.GetNodeInfo(ctx); nerr != nil { + conn.Close() + return nil, fmt.Errorf("failed to get node info for chain verification: %w", nerr) + } else if nodeInfo != nil && nodeInfo.DefaultNodeInfo != nil { + // Cosmos SDK exposes chain-id in DefaultNodeInfo.Network + if reported := nodeInfo.DefaultNodeInfo.Network; reported != "" && reported != cfg.ChainID { + conn.Close() + return nil, fmt.Errorf("chain ID mismatch: configured=%s node=%s", cfg.ChainID, reported) + } + } + actionMsgModule, err := action_msg.NewModule( conn.GetConn(), authModule, // For account info @@ -77,6 +98,7 @@ func newClient(ctx context.Context, cfg *Config) (Client, error) { authMod: authModule, actionMod: actionModule, actionMsgMod: actionMsgModule, + bankMod: bankModule, supernodeMod: supernodeModule, txMod: txModule, nodeMod: nodeModule, @@ -96,6 +118,10 @@ func (c *lumeraClient) ActionMsg() action_msg.Module { return c.actionMsgMod } +func (c *lumeraClient) Bank() bank.Module { + return c.bankMod +} + func (c *lumeraClient) SuperNode() supernode.Module { return c.supernodeMod } diff --git a/pkg/lumera/interface.go b/pkg/lumera/interface.go index eba47684..2fb25c13 100644 --- a/pkg/lumera/interface.go +++ b/pkg/lumera/interface.go @@ -7,6 +7,7 @@ import ( "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/action" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/action_msg" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/auth" + "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/bank" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/node" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/supernode" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/tx" @@ -18,6 +19,7 @@ type Client interface { Action() action.Module ActionMsg() action_msg.Module SuperNode() supernode.Module + Bank() bank.Module Tx() tx.Module Node() node.Module diff --git a/pkg/lumera/lumera_mock.go b/pkg/lumera/lumera_mock.go index 25d30789..e19ddfdb 100644 --- a/pkg/lumera/lumera_mock.go +++ b/pkg/lumera/lumera_mock.go @@ -15,6 +15,7 @@ import ( action "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/action" action_msg "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/action_msg" auth "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/auth" + bank "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/bank" node "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/node" supernode "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/supernode" tx "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/tx" @@ -87,6 +88,20 @@ func (mr *MockClientMockRecorder) Auth() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Auth", reflect.TypeOf((*MockClient)(nil).Auth)) } +// Bank mocks base method. +func (m *MockClient) Bank() bank.Module { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Bank") + ret0, _ := ret[0].(bank.Module) + return ret0 +} + +// Bank indicates an expected call of Bank. +func (mr *MockClientMockRecorder) Bank() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bank", reflect.TypeOf((*MockClient)(nil).Bank)) +} + // Close mocks base method. func (m *MockClient) Close() error { m.ctrl.T.Helper() diff --git a/pkg/lumera/modules/bank/impl.go b/pkg/lumera/modules/bank/impl.go new file mode 100644 index 00000000..157eb97f --- /dev/null +++ b/pkg/lumera/modules/bank/impl.go @@ -0,0 +1,30 @@ +package bank + +import ( + "context" + "fmt" + + banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" + "google.golang.org/grpc" +) + +type module struct { + client banktypes.QueryClient +} + +func newModule(conn *grpc.ClientConn) (Module, error) { + if conn == nil { + return nil, fmt.Errorf("connection cannot be nil") + } + return &module{client: banktypes.NewQueryClient(conn)}, nil +} + +func (m *module) Balance(ctx context.Context, address string, denom string) (*banktypes.QueryBalanceResponse, error) { + if address == "" { + return nil, fmt.Errorf("address cannot be empty") + } + if denom == "" { + return nil, fmt.Errorf("denom cannot be empty") + } + return m.client.Balance(ctx, &banktypes.QueryBalanceRequest{Address: address, Denom: denom}) +} diff --git a/pkg/lumera/modules/bank/interface.go b/pkg/lumera/modules/bank/interface.go new file mode 100644 index 00000000..b88093cf --- /dev/null +++ b/pkg/lumera/modules/bank/interface.go @@ -0,0 +1,18 @@ +//go:generate mockgen -destination=bank_mock.go -package=bank -source=interface.go +package bank + +import ( + "context" + + banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" + "google.golang.org/grpc" +) + +// Module provides access to Cosmos SDK bank queries. +type Module interface { + // Balance returns the balance for a specific denom at an address. + Balance(ctx context.Context, address string, denom string) (*banktypes.QueryBalanceResponse, error) +} + +// NewModule constructs a bank Module backed by the given gRPC connection. +func NewModule(conn *grpc.ClientConn) (Module, error) { return newModule(conn) } diff --git a/pkg/testutil/lumera.go b/pkg/testutil/lumera.go index 3f556a97..a4d09814 100644 --- a/pkg/testutil/lumera.go +++ b/pkg/testutil/lumera.go @@ -9,15 +9,18 @@ import ( "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/action" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/action_msg" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/auth" + bankmod "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/bank" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/node" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/supernode" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/tx" + sdkmath "cosmossdk.io/math" cmtservice "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" "github.com/cosmos/cosmos-sdk/crypto/keyring" sdktypes "github.com/cosmos/cosmos-sdk/types" sdktx "github.com/cosmos/cosmos-sdk/types/tx" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" + banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" ) // MockLumeraClient implements the lumera.Client interface for testing purposes @@ -25,6 +28,7 @@ type MockLumeraClient struct { authMod *MockAuthModule actionMod *MockActionModule actionMsgMod *MockActionMsgModule + bankMod *MockBankModule supernodeMod *MockSupernodeModule txMod *MockTxModule nodeMod *MockNodeModule @@ -36,6 +40,7 @@ type MockLumeraClient struct { func NewMockLumeraClient(kr keyring.Keyring, addresses []string) (lumera.Client, error) { actionMod := &MockActionModule{} actionMsgMod := &MockActionMsgModule{} + bankMod := &MockBankModule{} supernodeMod := &MockSupernodeModule{addresses: addresses} txMod := &MockTxModule{} nodeMod := &MockNodeModule{} @@ -44,6 +49,7 @@ func NewMockLumeraClient(kr keyring.Keyring, addresses []string) (lumera.Client, authMod: &MockAuthModule{}, actionMod: actionMod, actionMsgMod: actionMsgMod, + bankMod: bankMod, supernodeMod: supernodeMod, txMod: txMod, nodeMod: nodeMod, @@ -67,6 +73,11 @@ func (c *MockLumeraClient) ActionMsg() action_msg.Module { return c.actionMsgMod } +// Bank returns the Bank module client +func (c *MockLumeraClient) Bank() bankmod.Module { + return c.bankMod +} + // SuperNode returns the SuperNode module client func (c *MockLumeraClient) SuperNode() supernode.Module { return c.supernodeMod @@ -87,6 +98,15 @@ func (c *MockLumeraClient) Close() error { return nil } +// MockBankModule implements the bank.Module interface for testing +type MockBankModule struct{} + +// Balance returns a positive balance for any address/denom to pass checks by default +func (m *MockBankModule) Balance(ctx context.Context, address string, denom string) (*banktypes.QueryBalanceResponse, error) { + // Return >= 1 LUME in micro units to satisfy threshold checks + return &banktypes.QueryBalanceResponse{Balance: &sdktypes.Coin{Denom: denom, Amount: sdkmath.NewInt(1_000_000)}}, nil +} + // MockAuthModule implements the auth.Module interface for testing type MockAuthModule struct{} @@ -124,8 +144,8 @@ type MockActionMsgModule struct{} // RequestAction mocks the behavior of requesting an action. func (m *MockActionMsgModule) RequestAction(ctx context.Context, actionType, metadata, price, expirationTime string) (*sdktx.BroadcastTxResponse, error) { - // Mock implementation returns success with empty result - return &sdktx.BroadcastTxResponse{}, nil + // Mock implementation returns success with empty result + return &sdktx.BroadcastTxResponse{}, nil } // FinalizeCascadeAction implements the required method from action_msg.Module interface diff --git a/sdk/task/helpers.go b/sdk/task/helpers.go index f887aeb2..2ea8bcaa 100644 --- a/sdk/task/helpers.go +++ b/sdk/task/helpers.go @@ -3,21 +3,16 @@ package task import ( "context" "encoding/base64" - "errors" "fmt" "os" "path/filepath" "strings" "github.com/LumeraProtocol/supernode/v2/sdk/adapters/lumera" - snsvc "github.com/LumeraProtocol/supernode/v2/sdk/adapters/supernodeservice" - "github.com/LumeraProtocol/supernode/v2/sdk/net" ) const maxFileSize = 1 * 1024 * 1024 * 1024 // 1GB limit -var ErrNoPeersConnected = errors.New("no P2P peers connected on available supernodes") - // ValidateFileSize checks if a file size is within the allowed 1GB limit func ValidateFileSize(filePath string) error { fileInfo, err := os.Stat(filePath) @@ -105,47 +100,7 @@ func (m *ManagerImpl) validateSignature(ctx context.Context, action lumera.Actio return nil } -// checkSupernodesPeerConnectivity verifies that at least one supernode has P2P peers connected -func (m *ManagerImpl) checkSupernodesPeerConnectivity(ctx context.Context, blockHeight int64) error { - // Fetch supernodes for the action's block height - supernodes, err := m.lumeraClient.GetSupernodes(ctx, blockHeight) - if err != nil { - return fmt.Errorf("failed to get supernodes: %w", err) - } - - if len(supernodes) == 0 { - return fmt.Errorf("no supernodes available for block height %d", blockHeight) - } - - // Check each supernode for peer connectivity - factoryCfg := net.FactoryConfig{ - LocalCosmosAddress: m.config.Account.LocalCosmosAddress, - PeerType: m.config.Account.PeerType, - } - clientFactory := net.NewClientFactory(ctx, m.logger, m.keyring, m.lumeraClient, factoryCfg) - - for _, sn := range supernodes { - client, err := clientFactory.CreateClient(ctx, sn) - if err != nil { - continue // Skip this supernode if we can't connect - } - - // Request peer info and P2P metrics to assess connectivity - ctxWithMetrics := snsvc.WithIncludeP2PMetrics(ctx) - status, err := client.GetSupernodeStatus(ctxWithMetrics) - client.Close(ctx) - if err != nil { - continue // Skip this supernode if we can't get status - } - - // Check if this supernode has peers - if status.Network.PeersCount > 1 { - return nil // Found at least one supernode with peers - } - } - - return ErrNoPeersConnected -} +// (Removed) Peers connectivity preflight is now enforced during discovery in isServing. func (m *ManagerImpl) validateDownloadAction(ctx context.Context, actionID string) (lumera.Action, error) { action, err := m.lumeraClient.GetAction(ctx, actionID) diff --git a/sdk/task/manager.go b/sdk/task/manager.go index 052088f3..c5a65bf4 100644 --- a/sdk/task/manager.go +++ b/sdk/task/manager.go @@ -107,11 +107,7 @@ func (m *ManagerImpl) CreateCascadeTask(ctx context.Context, filePath string, ac return "", err } - // Check peer connectivity before creating task - if err := m.checkSupernodesPeerConnectivity(taskCtx, action.Height); err != nil { - cancel() // Clean up if peer check fails - return "", err - } + // Peer connectivity is now validated during discovery health checks taskID := uuid.New().String()[:8] @@ -280,11 +276,7 @@ func (m *ManagerImpl) CreateDownloadTask(ctx context.Context, actionID string, o return "", fmt.Errorf("no filename found in cascade metadata") } - // Check peer connectivity before creating task - if err := m.checkSupernodesPeerConnectivity(taskCtx, action.Height); err != nil { - cancel() // Clean up if peer check fails - return "", err - } + // Peer connectivity is now validated during discovery health checks // Ensure the output path includes the correct filename finalOutputPath := path.Join(outputDir, action.ID, metadata.FileName) diff --git a/sdk/task/task.go b/sdk/task/task.go index 976725a0..97295902 100644 --- a/sdk/task/task.go +++ b/sdk/task/task.go @@ -6,9 +6,13 @@ import ( "fmt" "sync" + sdkmath "cosmossdk.io/math" "github.com/LumeraProtocol/supernode/v2/pkg/errgroup" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" + plumera "github.com/LumeraProtocol/supernode/v2/pkg/lumera" + txmod "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/tx" "github.com/LumeraProtocol/supernode/v2/sdk/adapters/lumera" + snsvc "github.com/LumeraProtocol/supernode/v2/sdk/adapters/supernodeservice" "github.com/LumeraProtocol/supernode/v2/sdk/config" "github.com/LumeraProtocol/supernode/v2/sdk/event" "github.com/LumeraProtocol/supernode/v2/sdk/log" @@ -85,10 +89,6 @@ func (t *BaseTask) fetchSupernodes(ctx context.Context, height int64) (lumera.Su return nil, errors.New("no supernodes found") } - if len(sns) > 10 { - sns = sns[:10] - } - // Keep only SERVING nodes (done in parallel – keeps latency flat) healthy := make(lumera.Supernodes, 0, len(sns)) eg, ctx := errgroup.WithContext(ctx) @@ -131,6 +131,45 @@ func (t *BaseTask) isServing(parent context.Context, sn lumera.Supernode) bool { } defer client.Close(ctx) + // First check gRPC health resp, err := client.HealthCheck(ctx) - return err == nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING + if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { + return false + } + + // Then check P2P peers count via status (include P2P metrics) + status, err := client.GetSupernodeStatus(snsvc.WithIncludeP2PMetrics(ctx)) + if err != nil { + return false + } + if status.Network.PeersCount <= 1 { + return false + } + + // Finally, ensure the supernode account has a positive balance in the default fee denom. + // Use pkg/lumera to query bank balance from the chain. + cfg, err := plumera.NewConfig(t.config.Lumera.GRPCAddr, t.config.Lumera.ChainID, t.config.Account.KeyName, t.keyring) + if err != nil { + logtrace.Debug(ctx, "Failed to build lumera client config for balance check", logtrace.Fields{"error": err.Error()}) + return false + } + lc, err := plumera.NewClient(ctx, cfg) + if err != nil { + logtrace.Debug(ctx, "Failed to create lumera client for balance check", logtrace.Fields{"error": err.Error()}) + return false + } + defer lc.Close() + + denom := txmod.DefaultFeeDenom // base denom (micro), e.g., "ulume" + bal, err := lc.Bank().Balance(ctx, sn.CosmosAddress, denom) + if err != nil || bal == nil || bal.Balance == nil { + return false + } + // Require at least 1 LUME = 10^6 micro (ulume) + min := sdkmath.NewInt(1_000_000) + if bal.Balance.Amount.LT(min) { + return false + } + + return true }