-
Notifications
You must be signed in to change notification settings - Fork 26
Add durability for keygen event with jetstream #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR replaces the old JetStream-based StreamPubsub abstraction with a new MessageBroker interface and its JetStream implementation, refactors event consumers/clients to use the broker, and updates application startup to run keygen and signing consumers concurrently.
- Introduced
MessageBrokerandjetStreamBrokerfor publish/subscribe, removedStreamPubsubandjetstreamPubSub - Updated consumers and client initialization to use
MessageBroker(CreateSubscription,PublishMessage) - Refactored
cmd/mpcium/main.goto start keygen and signing consumers in parallel with error handling
Reviewed Changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| pkg/messaging/pubsub.go | Removed StreamPubsub and jetstreamPubSub, simplified PublishWithReply |
| pkg/messaging/jetstream_borker.go | New jetStreamBroker implementing MessageBroker with JetStream |
| pkg/eventconsumer/sign_consumer.go | Switched from StreamPubsub to MessageBroker, updated reply logic |
| pkg/eventconsumer/keygen_consumer.go | Switched to MessageBroker, added keygen consumer implementation |
| pkg/eventconsumer/event_consumer.go | Unified reply handling via sendReplyToRemoveMsg |
| pkg/event/sign.go | Removed deprecated MPCSigningEventTopic |
| pkg/event/keygen.go | Added keygen stream and request topic constants |
| pkg/client/client.go | Updated client to initialize and use MessageBroker |
| cmd/mpcium/main.go | Replaced JetStreamPubSub with brokers, added concurrent consumer startup |
Comments suppressed due to low confidence (4)
pkg/eventconsumer/keygen_consumer.go:40
- The
jsSubfield is typed asmessaging.Subscription, butCreateSubscriptionnow returnsmessaging.MessageSubscription. Update the field type for consistency.
jsSub messaging.Subscription
pkg/eventconsumer/sign_consumer.go:161
- The constant
MPCSignEventis undefined (it was removed frompkg/event/sign.go). Replace this with an existing event constant or reintroduce it in theeventpackage.
if err := sc.pubsub.PublishWithReply(MPCSignEvent, replyInbox, msg.Data()); err != nil {
pkg/eventconsumer/keygen_consumer.go:153
- This references
signingResponseTimeout, which is undefined here. It should usekeygenResponseTimeoutdefined at the top of this file.
deadline := time.Now().Add(signingResponseTimeout)
pkg/eventconsumer/keygen_consumer.go:155
- This uses
signingPollingInterval, which does not exist in this context. It should bekeygenPollingInterval.
replyMsg, err := replySub.NextMsg(signingPollingInterval)
e8c1215 to
6b66ae2
Compare
6b66ae2 to
99522c3
Compare
|
bugbot run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: NATS Reply Bug Sends Incorrect Data
The sendReplyToRemoveMsg function, introduced by this commit, incorrectly sends the original NATS message data (natMsg.Data) as a reply. This breaks the reply mechanism for key generation and signing operations, as consumers expecting the operation's result will now receive the original request instead.
pkg/eventconsumer/event_consumer.go#L488-L502
mpcium/pkg/eventconsumer/event_consumer.go
Lines 488 to 502 in 12d87ab
| func (ec *eventConsumer) sendReplyToRemoveMsg(natMsg *nats.Msg) { | |
| msg := natMsg.Data | |
| if natMsg.Reply == "" { | |
| logger.Warn("No reply inbox specified for sign success message", "msg", string(msg)) | |
| return | |
| } | |
| err := ec.pubsub.Publish(natMsg.Reply, msg) | |
| if err != nil { | |
| logger.Error("Failed to reply message", err, "reply", natMsg.Reply) | |
| return | |
| } | |
| } |
pkg/eventconsumer/event_consumer.go#L431-L435
mpcium/pkg/eventconsumer/event_consumer.go
Lines 431 to 435 in 12d87ab
| onSuccess := func(data []byte) { | |
| done() | |
| ec.sendReplyToRemoveMsg(natMsg) | |
| } |
BugBot free trial expires on July 22, 2025
You have used $0.00 of your $2.00 spend limit so far. Manage your spend limit in the Cursor dashboard.
Was this report helpful? Give feedback by reacting with 👍 or 👎
Add durability for keygen event