Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 56 additions & 6 deletions cmd/mpcium/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -122,11 +123,17 @@ func runNode(ctx context.Context, c *cli.Command) error {
defer natsConn.Close()

pubsub := messaging.NewNATSPubSub(natsConn)
signingStream, err := messaging.NewJetStreamPubSub(natsConn, event.SigningPublisherStream, []string{
keygenBroker, err := messaging.NewJetStreamBroker(ctx, natsConn, event.KeygenBrokerStream, []string{
event.KeygenRequestTopic,
})
if err != nil {
logger.Fatal("Failed to create keygen jetstream broker", err)
}
signingBroker, err := messaging.NewJetStreamBroker(ctx, natsConn, event.SigningPublisherStream, []string{
event.SigningRequestTopic,
})
if err != nil {
logger.Fatal("Failed to create JetStream PubSub", err)
logger.Fatal("Failed to create signing jetstream broker", err)
}

directMessaging := messaging.NewNatsDirectMessaging(natsConn)
Expand Down Expand Up @@ -178,7 +185,8 @@ func runNode(ctx context.Context, c *cli.Command) error {

timeoutConsumer.Run()
defer timeoutConsumer.Close()
signingConsumer := eventconsumer.NewSigningConsumer(natsConn, signingStream, pubsub, peerRegistry)
keygenConsumer := eventconsumer.NewKeygenConsumer(natsConn, keygenBroker, pubsub, peerRegistry)
signingConsumer := eventconsumer.NewSigningConsumer(natsConn, signingBroker, pubsub, peerRegistry)

// Make the node ready before starting the signing consumer
if err := peerRegistry.Ready(); err != nil {
Expand All @@ -193,12 +201,54 @@ func runNode(ctx context.Context, c *cli.Command) error {
<-sigChan
logger.Warn("Shutdown signal received, canceling context...")
cancel()

// Gracefully close consumers
if err := keygenConsumer.Close(); err != nil {
logger.Error("Failed to close keygen consumer", err)
}
if err := signingConsumer.Close(); err != nil {
logger.Error("Failed to close signing consumer", err)
}
}()

if err := signingConsumer.Run(appContext); err != nil {
logger.Error("error running consumer:", err)
}
var wg sync.WaitGroup
errChan := make(chan error, 2)

wg.Add(1)
go func() {
defer wg.Done()
if err := keygenConsumer.Run(appContext); err != nil {
logger.Error("error running keygen consumer", err)
errChan <- fmt.Errorf("keygen consumer error: %w", err)
return
}
logger.Info("Keygen consumer finished successfully")
}()

wg.Add(1)
go func() {
defer wg.Done()
if err := signingConsumer.Run(appContext); err != nil {
logger.Error("error running signing consumer", err)
errChan <- fmt.Errorf("signing consumer error: %w", err)
return
}
logger.Info("Signing consumer finished successfully")
}()

go func() {
wg.Wait()
logger.Info("All consumers have finished")
close(errChan)
}()

for err := range errChan {
if err != nil {
logger.Error("Consumer error received", err)
cancel()
return err
}
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions examples/generate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func main() {

// STEP 2: Register the result handler AFTER all walletIDs are stored
err = mpcClient.OnWalletCreationResult(func(event event.KeygenResultEvent) {
logger.Info("Received wallet creation result", "event", event)
now := time.Now()
startTimeAny, ok := walletStartTimes.Load(event.WalletID)
if ok {
Expand Down
21 changes: 15 additions & 6 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"context"
"crypto/ed25519"
"encoding/hex"
"encoding/json"
Expand Down Expand Up @@ -36,7 +37,8 @@ type MPCClient interface {
}

type mpcClient struct {
signingStream messaging.StreamPubsub
signingBroker messaging.MessageBroker
keygenBroker messaging.MessageBroker
pubsub messaging.PubSub
genKeySuccessQueue messaging.MessageQueue
signResultQueue messaging.MessageQueue
Expand Down Expand Up @@ -115,11 +117,17 @@ func NewMPCClient(opts Options) MPCClient {
priv := ed25519.NewKeyFromSeed(privSeed)

// 2) Create the PubSub for both publish & subscribe
signingStream, err := messaging.NewJetStreamPubSub(opts.NatsConn, "mpc-signing", []string{
signingBroker, err := messaging.NewJetStreamBroker(context.Background(), opts.NatsConn, "mpc-signing", []string{
"mpc.signing_request.*",
})
if err != nil {
logger.Fatal("Failed to create JetStream PubSub", err)
logger.Fatal("Failed to create signing jetstream broker", err)
}
keygenBroker, err := messaging.NewJetStreamBroker(context.Background(), opts.NatsConn, "mpc-keygen", []string{
"mpc.keygen_request.*",
})
if err != nil {
logger.Fatal("Failed to create keygen jetstream broker", err)
}

pubsub := messaging.NewNATSPubSub(opts.NatsConn)
Expand All @@ -135,7 +143,8 @@ func NewMPCClient(opts Options) MPCClient {
reshareSuccessQueue := manager.NewMessageQueue("mpc_reshare_result")

return &mpcClient{
signingStream: signingStream,
signingBroker: signingBroker,
keygenBroker: keygenBroker,
pubsub: pubsub,
genKeySuccessQueue: genKeySuccessQueue,
signResultQueue: signResultQueue,
Expand Down Expand Up @@ -186,7 +195,7 @@ func (c *mpcClient) CreateWallet(walletID string) error {
return fmt.Errorf("CreateWallet: marshal error: %w", err)
}

if err := c.pubsub.Publish(eventconsumer.MPCGenerateEvent, bytes); err != nil {
if err := c.keygenBroker.PublishMessage(context.Background(), event.KeygenRequestTopic, bytes); err != nil {
return fmt.Errorf("CreateWallet: publish error: %w", err)
}
return nil
Expand Down Expand Up @@ -226,7 +235,7 @@ func (c *mpcClient) SignTransaction(msg *types.SignTxMessage) error {
return fmt.Errorf("SignTransaction: marshal error: %w", err)
}

if err := c.signingStream.Publish(event.SigningRequestEventTopic, bytes); err != nil {
if err := c.signingBroker.PublishMessage(context.Background(), event.SigningRequestEventTopic, bytes); err != nil {
return fmt.Errorf("SignTransaction: publish error: %w", err)
}
return nil
Expand Down
6 changes: 6 additions & 0 deletions pkg/event/keygen.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package event

const (
KeygenBrokerStream = "mpc-keygen"
KeygenConsumerStream = "mpc-keygen-consumer"
KeygenRequestTopic = "mpc.keygen_request.*"
)

type KeygenResultEvent struct {
WalletID string `json:"wallet_id"`
ECDSAPubKey []byte `json:"ecdsa_pub_key"`
Expand Down
1 change: 0 additions & 1 deletion pkg/event/sign.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const (
SigningRequestTopic = "mpc.signing_request.*"
SigningResultTopic = "mpc.mpc_signing_result.*"
SigningResultCompleteTopic = "mpc.mpc_signing_result.complete"
MPCSigningEventTopic = "mpc:sign"
SigningRequestEventTopic = "mpc.signing_request.event"
)

Expand Down
54 changes: 34 additions & 20 deletions pkg/eventconsumer/event_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,27 +124,27 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) {
var msg types.GenerateKeyMessage
if err := json.Unmarshal(raw, &msg); err != nil {
logger.Error("Failed to unmarshal keygen message", err)
ec.handleKeygenSessionError(msg.WalletID, err, "Failed to unmarshal keygen message")
ec.handleKeygenSessionError(msg.WalletID, err, "Failed to unmarshal keygen message", natMsg)
return
}

if err := ec.identityStore.VerifyInitiatorMessage(&msg); err != nil {
logger.Error("Failed to verify initiator message", err)
ec.handleKeygenSessionError(msg.WalletID, err, "Failed to verify initiator message")
ec.handleKeygenSessionError(msg.WalletID, err, "Failed to verify initiator message", natMsg)
return
}

walletID := msg.WalletID
ecdsaSession, err := ec.node.CreateKeyGenSession(mpc.SessionTypeECDSA, walletID, ec.mpcThreshold, ec.genKeyResultQueue)
if err != nil {
logger.Error("Failed to create ECDSA key generation session", err, "walletID", walletID)
ec.handleKeygenSessionError(walletID, err, "Failed to create ECDSA key generation session")
ec.handleKeygenSessionError(walletID, err, "Failed to create ECDSA key generation session", natMsg)
return
}
eddsaSession, err := ec.node.CreateKeyGenSession(mpc.SessionTypeEDDSA, walletID, ec.mpcThreshold, ec.genKeyResultQueue)
if err != nil {
logger.Error("Failed to create EdDSA key generation session", err, "walletID", walletID)
ec.handleKeygenSessionError(walletID, err, "Failed to create EdDSA key generation session")
ec.handleKeygenSessionError(walletID, err, "Failed to create EdDSA key generation session", natMsg)
return
}
ecdsaSession.Init()
Expand All @@ -167,7 +167,7 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) {
successEvent.ECDSAPubKey = ecdsaSession.GetPubKeyResult()
case err := <-ecdsaSession.ErrChan():
logger.Error("ECDSA keygen session error", err)
ec.handleKeygenSessionError(walletID, err, "ECDSA keygen session error")
ec.handleKeygenSessionError(walletID, err, "ECDSA keygen session error", natMsg)
errorChan <- err
doneEcdsa()
}
Expand All @@ -179,7 +179,7 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) {
successEvent.EDDSAPubKey = eddsaSession.GetPubKeyResult()
case err := <-eddsaSession.ErrChan():
logger.Error("EdDSA keygen session error", err)
ec.handleKeygenSessionError(walletID, err, "EdDSA keygen session error")
ec.handleKeygenSessionError(walletID, err, "EdDSA keygen session error", natMsg)
errorChan <- err
doneEddsa()
}
Expand Down Expand Up @@ -213,28 +213,29 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) {
case <-baseCtx.Done():
// timeout occurred
logger.Warn("Key generation timed out", "walletID", walletID, "timeout", KeyGenTimeOut)
ec.handleKeygenSessionError(walletID, fmt.Errorf("keygen session timed out after %v", KeyGenTimeOut), "Key generation timed out")
ec.handleKeygenSessionError(walletID, fmt.Errorf("keygen session timed out after %v", KeyGenTimeOut), "Key generation timed out", natMsg)
return
}

payload, err := json.Marshal(successEvent)
if err != nil {
logger.Error("Failed to marshal keygen success event", err)
ec.handleKeygenSessionError(walletID, err, "Failed to marshal keygen success event")
ec.handleKeygenSessionError(walletID, err, "Failed to marshal keygen success event", natMsg)
return
}

key := fmt.Sprintf(mpc.TypeGenerateWalletResultFmt, walletID)
if err := ec.genKeyResultQueue.Enqueue(key, payload, &messaging.EnqueueOptions{IdempotententKey: key}); err != nil {
logger.Error("Failed to publish key generation success message", err)
ec.handleKeygenSessionError(walletID, err, "Failed to publish key generation success message")
ec.handleKeygenSessionError(walletID, err, "Failed to publish key generation success message", natMsg)
return
}
ec.sendReplyToRemoveMsg(natMsg)
logger.Info("[COMPLETED KEY GEN] Key generation completed successfully", "walletID", walletID)
}

// handleKeygenSessionError handles errors that occur during key generation
func (ec *eventConsumer) handleKeygenSessionError(walletID string, err error, contextMsg string) {
func (ec *eventConsumer) handleKeygenSessionError(walletID string, err error, contextMsg string, natMsg *nats.Msg) {
fullErrMsg := fmt.Sprintf("%s: %v", contextMsg, err)
errorCode := event.GetErrorCodeFromError(err)

Expand Down Expand Up @@ -270,6 +271,7 @@ func (ec *eventConsumer) handleKeygenSessionError(walletID string, err error, co
"payload", string(keygenResultBytes),
)
}
ec.sendReplyToRemoveMsg(natMsg)
}

func (ec *eventConsumer) startKeyGenEventWorker() {
Expand Down Expand Up @@ -368,6 +370,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
msg.NetworkInternalCode,
err,
"Failed to create signing session",
natMsg,
)
return
}
Expand All @@ -386,6 +389,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
msg.NetworkInternalCode,
err,
"Failed to init signing session",
natMsg,
)
return
}
Expand All @@ -407,6 +411,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
msg.NetworkInternalCode,
err,
"Failed to sign tx",
natMsg,
)
return
}
Expand All @@ -426,14 +431,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {

onSuccess := func(data []byte) {
done()
if natMsg.Reply != "" {
err = ec.pubsub.Publish(natMsg.Reply, data)
if err != nil {
logger.Error("Failed to publish reply", err)
} else {
logger.Info("Reply to the original message", "reply", natMsg.Reply)
}
}
ec.sendReplyToRemoveMsg(natMsg)
}
go session.Sign(onSuccess)
})
Expand All @@ -445,7 +443,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {

return nil
}
func (ec *eventConsumer) handleSigningSessionError(walletID, txID, networkInternalCode string, err error, contextMsg string) {
func (ec *eventConsumer) handleSigningSessionError(walletID, txID, networkInternalCode string, err error, contextMsg string, natMsg *nats.Msg) {
fullErrMsg := fmt.Sprintf("%s: %v", contextMsg, err)
errorCode := event.GetErrorCodeFromError(err)

Expand Down Expand Up @@ -475,7 +473,6 @@ func (ec *eventConsumer) handleSigningSessionError(walletID, txID, networkIntern
)
return
}

err = ec.signingResultQueue.Enqueue(event.SigningResultCompleteTopic, signingResultBytes, &messaging.EnqueueOptions{
IdempotententKey: txID,
})
Expand All @@ -486,7 +483,24 @@ func (ec *eventConsumer) handleSigningSessionError(walletID, txID, networkIntern
"payload", string(signingResultBytes),
)
}
ec.sendReplyToRemoveMsg(natMsg)
}

func (ec *eventConsumer) sendReplyToRemoveMsg(natMsg *nats.Msg) {
msg := natMsg.Data

if natMsg.Reply == "" {
logger.Warn("No reply inbox specified for sign success message", "msg", string(msg))
return
}

err := ec.pubsub.Publish(natMsg.Reply, msg)
if err != nil {
logger.Error("Failed to reply message", err, "reply", natMsg.Reply)
return
}
}

func (ec *eventConsumer) consumeReshareEvent() error {
sub, err := ec.pubsub.Subscribe(MPCReshareEvent, func(natMsg *nats.Msg) {
var msg types.ResharingMessage
Expand Down
Loading
Loading