Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions pkg/cascadekit/signatures.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import (

"github.com/LumeraProtocol/supernode/v2/pkg/codec"
"github.com/LumeraProtocol/supernode/v2/pkg/errors"

actionkeeper "github.com/LumeraProtocol/lumera/x/action/v1/keeper"

keyringpkg "github.com/LumeraProtocol/supernode/v2/pkg/keyring"

sdkkeyring "github.com/cosmos/cosmos-sdk/crypto/keyring"
)

// Signer is a function that signs the provided message and returns the raw signature bytes.
Expand Down Expand Up @@ -83,3 +89,43 @@ func CreateSignatures(layout codec.Layout, signer Signer, ic, max uint32) (index
}
return indexSignatureFormat, indexIDs, nil
}

// adr36SignerForKeyring creates a signer that signs ADR-36 doc bytes
// for the given signer address. The "msg" we pass in is the *message*
// (layoutB64, indexJSON, etc.), and this helper wraps it into ADR-36.
func adr36SignerForKeyring(
kr sdkkeyring.Keyring,
keyName string,
signerAddr string,
) Signer {
return func(msg []byte) ([]byte, error) {
// msg is the cleartext message we want to sign (e.g., layoutB64 or index JSON string)
dataB64 := base64.StdEncoding.EncodeToString(msg)

// Build ADR-36 sign bytes: signerAddr + base64(message)
doc, err := actionkeeper.MakeADR36AminoSignBytes(signerAddr, dataB64)
if err != nil {
return nil, err
}

// Now sign the ADR-36 doc bytes with the keyring (direct secp256k1)
return keyringpkg.SignBytes(kr, keyName, doc)
}
}

func CreateSignaturesWithKeyringADR36(
layout codec.Layout,
kr sdkkeyring.Keyring,
keyName string,
ic, max uint32,
) (string, []string, error) {
// Resolve signer bech32 address from keyring
addr, err := keyringpkg.GetAddress(kr, keyName)
if err != nil {
return "", nil, fmt.Errorf("resolve signer address: %w", err)
}

signer := adr36SignerForKeyring(kr, keyName, addr.String())

return CreateSignatures(layout, signer, ic, max)
}
18 changes: 17 additions & 1 deletion pkg/cascadekit/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,23 @@ func VerifyStringRawOrADR36(message string, sigB64 string, signer string, verify

// VerifyIndex verifies the creator's signature over indexB64 (string), using the given verifier.
func VerifyIndex(indexB64 string, sigB64 string, signer string, verify Verifier) error {
return VerifyStringRawOrADR36(indexB64, sigB64, signer, verify)
// 1) Legacy: message = indexB64
if err := VerifyStringRawOrADR36(indexB64, sigB64, signer, verify); err == nil {
return nil
}

// 2) JS-style: message = index JSON string (decoded from indexB64)
raw, err := base64.StdEncoding.DecodeString(indexB64)
if err != nil {
return fmt.Errorf("invalid indexB64: %w", err)
}
indexJSON := string(raw)

if err := VerifyStringRawOrADR36(indexJSON, sigB64, signer, verify); err == nil {
return nil
}

return fmt.Errorf("signature verification failed for both legacy and ADR-36 index schemes")
}

// VerifyLayout verifies the layout signature over base64(JSON(layout)) bytes.
Expand Down
8 changes: 8 additions & 0 deletions pkg/keyring/keyring.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,11 @@ func SignBytes(kr sdkkeyring.Keyring, name string, bz []byte) ([]byte, error) {
sig, _, err := kr.SignByAddress(addr, bz, signing.SignMode_SIGN_MODE_DIRECT)
return sig, err
}

func GetAddress(kr sdkkeyring.Keyring, name string) (types.AccAddress, error) {
rec, err := kr.Key(name)
if err != nil {
return nil, err
}
return rec.GetAddress()
}
28 changes: 26 additions & 2 deletions pkg/net/grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/LumeraProtocol/supernode/v2/pkg/logtrace"
ltc "github.com/LumeraProtocol/supernode/v2/pkg/net/credentials"
"github.com/LumeraProtocol/supernode/v2/pkg/random"
"github.com/LumeraProtocol/supernode/v2/sdk/log"
)

const (
Expand All @@ -29,7 +30,7 @@ const (

const (
defaultTimeout = 30 * time.Second
defaultConnWaitTime = 10 * time.Second
defaultConnWaitTime = 12 * time.Second
defaultRetryWaitTime = 1 * time.Second
maxRetries = 3

Expand Down Expand Up @@ -69,6 +70,7 @@ type Client struct {
creds credentials.TransportCredentials
builder DialOptionBuilder
connHandler ConnectionHandler
logger log.Logger
}

// ClientOptions contains options for creating a new client
Expand Down Expand Up @@ -97,6 +99,8 @@ type ClientOptions struct {
UserAgent string // User-Agent header value for all requests
Authority string // Value to use as the :authority pseudo-header
MinConnectTimeout time.Duration // Minimum time to attempt connection before failing

Logger log.Logger
}

// Exponential backoff configuration
Expand Down Expand Up @@ -213,16 +217,36 @@ var waitForConnection = func(ctx context.Context, conn ClientConn, timeout time.

for {
state := conn.GetState()

// Debug log for every state observation
logtrace.Debug(timeoutCtx, "gRPC connection state",
logtrace.Fields{
"state": state.String(),
},
)

switch state {
case connectivity.Ready:
logtrace.Debug(timeoutCtx, "gRPC connection is READY", nil)
return nil

case connectivity.Shutdown:
logtrace.Error(timeoutCtx, "gRPC connection is SHUTDOWN", nil)
return fmt.Errorf("grpc connection is shutdown")

case connectivity.TransientFailure:
logtrace.Error(timeoutCtx, "gRPC connection in TRANSIENT_FAILURE", nil)
return fmt.Errorf("grpc connection is in transient failure")

default:
// For Idle and Connecting states, wait for state change
// Idle / Connecting wait for state change
if !conn.WaitForStateChange(timeoutCtx, state) {
logtrace.Error(timeoutCtx, "Timeout waiting for gRPC connection state change",
logtrace.Fields{
"last_state": state.String(),
"timeout": timeout.String(),
},
)
return fmt.Errorf("timeout waiting for grpc connection state change")
}
}
Expand Down
14 changes: 9 additions & 5 deletions sdk/action/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,15 @@ func (c *ClientImpl) BuildCascadeMetadataFromFile(ctx context.Context, filePath
rnd, _ := crand.Int(crand.Reader, big.NewInt(100))
ic := uint32(rnd.Int64() + 1) // 1..100
// Create signatures from the layout struct
indexSignatureFormat, _, err := cascadekit.CreateSignaturesWithKeyring(layout, c.keyring, c.config.Account.KeyName, ic, max)
if err != nil {
return actiontypes.CascadeMetadata{}, "", "", fmt.Errorf("create signatures: %w", err)
}

// get bech32 address for this key

indexSignatureFormat, _, err := cascadekit.CreateSignaturesWithKeyringADR36(
layout,
c.keyring,
c.config.Account.KeyName,
ic,
max,
)
// Compute data hash (blake3) as base64 using a streaming file hash to avoid loading entire file
h, err := utils.Blake3HashFile(filePath)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions sdk/adapters/lumera/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strings"
"time"

"github.com/LumeraProtocol/supernode/v2/sdk/log"
Expand Down Expand Up @@ -404,8 +405,8 @@ func toSdkSupernodes(resp *sntypes.QueryGetTopSuperNodesForBlockResponse) []Supe
}

result = append(result, Supernode{
CosmosAddress: sn.SupernodeAccount,
GrpcEndpoint: ipAddress,
CosmosAddress: strings.TrimSpace(sn.SupernodeAccount),
GrpcEndpoint: strings.TrimSpace(ipAddress),
State: SUPERNODE_STATE_ACTIVE,
})
}
Expand Down
13 changes: 13 additions & 0 deletions sdk/adapters/lumera/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,16 @@ type Supernode struct {
GrpcEndpoint string // Network endpoint for gRPC communication
State SUPERNODE_STATE // Current state of the supernode
}

func (s Supernodes) String() string {
result := "["
for i, sn := range s {
result += sn.CosmosAddress + "@" + sn.GrpcEndpoint

if i < len(s)-1 {
result += ", "
}
}
result += "]"
return result
}
1 change: 1 addition & 0 deletions sdk/net/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func NewClientFactory(ctx context.Context, logger log.Logger, keyring keyring.Ke
// Increase per-stream window to provide headroom for first data chunk + events
opts.InitialWindowSize = 12 * 1024 * 1024 // 8MB per-stream window
opts.InitialConnWindowSize = 64 * 1024 * 1024 // 64MB per-connection window
opts.Logger = logger

return &ClientFactory{
logger: logger,
Expand Down
12 changes: 6 additions & 6 deletions sdk/net/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ func NewSupernodeClient(ctx context.Context, logger log.Logger, keyring keyring.
targetSupernode.GrpcEndpoint,
)

logger.Info(ctx, "Connecting to supernode securely", "endpoint", targetSupernode.GrpcEndpoint, "target_id", targetSupernode.CosmosAddress, "local_id", factoryConfig.LocalCosmosAddress, "peer_type", factoryConfig.PeerType)
logger.Debug(ctx, "Preparing to connect to supernode securely",
"endpoint", targetSupernode.GrpcEndpoint, "target_id", targetSupernode.CosmosAddress,
"local_id", factoryConfig.LocalCosmosAddress, "peer_type", factoryConfig.PeerType)

// Use provided client options or defaults
options := clientOptions
Expand All @@ -93,7 +95,7 @@ func NewSupernodeClient(ctx context.Context, logger log.Logger, keyring keyring.
targetSupernode.CosmosAddress, err)
}

logger.Info(ctx, "Connected to supernode securely", "address", targetSupernode.CosmosAddress, "endpoint", targetSupernode.GrpcEndpoint)
logger.Debug(ctx, "Connected to supernode securely", "address", targetSupernode.CosmosAddress, "endpoint", targetSupernode.GrpcEndpoint)

// Create service clients
cascadeClient := supernodeservice.NewCascadeAdapter(
Expand All @@ -116,9 +118,7 @@ func (c *supernodeClient) RegisterCascade(ctx context.Context, in *supernodeserv
if err != nil {
return nil, fmt.Errorf("cascade registration failed: %w", err)
}

c.logger.Info(ctx, "Cascade registered successfully",
"actionID", in.ActionID, "taskId", in.TaskId)
c.logger.Info(ctx, "Cascade Registration request sent to supernode", "actionID", in.ActionID, "taskId", in.TaskId)

return resp, nil
}
Expand All @@ -140,7 +140,7 @@ func (c *supernodeClient) GetSupernodeStatus(ctx context.Context) (*pb.StatusRes
return nil, fmt.Errorf("failed to get supernode status: %w", err)
}

c.logger.Debug(ctx, "Supernode status retrieved successfully")
c.logger.Debug(ctx, "Supernode status retrieved successfully", "response", resp.String())
return resp, nil
}

Expand Down
21 changes: 21 additions & 0 deletions sdk/task/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ func (t *CascadeTask) Run(ctx context.Context) error {
return err
}

t.logger.Debug(ctx, "Fetched supernodes",
map[string]interface{}{
"count": len(supernodes),
"list": supernodes.String(),
},
)

// 2 - Pre-filter: balance & health concurrently -> XOR rank, then hand over
originalCount := len(supernodes)
supernodes, preClients := t.filterEligibleSupernodesParallel(ctx, supernodes)
Expand Down Expand Up @@ -161,9 +168,23 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.
// Use ctx directly; per-phase timers are applied inside the adapter
resp, err := client.RegisterCascade(ctx, req)
if err != nil {
t.logger.Error(ctx, "RegisterCascade RPC failed",
map[string]interface{}{
"supernode": sn.GrpcEndpoint,
"address": sn.CosmosAddress,
"error": err.Error(),
},
)
return fmt.Errorf("upload to %s: %w", sn.CosmosAddress, err)
}
if !resp.Success {
t.logger.Error(ctx, "RegisterCascade RPC rejected",
map[string]interface{}{
"supernode": sn.GrpcEndpoint,
"address": sn.CosmosAddress,
"message": resp.Message,
},
)
return fmt.Errorf("upload rejected by %s: %s", sn.CosmosAddress, resp.Message)
}

Expand Down
Loading