Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 53 additions & 6 deletions client/schema.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package client

const (
ActiveStakingQueueName string = "v2_active_staking_queue"
UnbondingStakingQueueName string = "v2_unbonding_staking_queue"
ActiveStakingQueueName string = "v2_active_staking_queue"
UnbondingStakingQueueName string = "v2_unbonding_staking_queue"
WithdrawableStakingQueueName string = "v2_withdrawable_staking_queue"
WithdrawnStakingQueueName string = "v2_withdrawn_staking_queue"
)

const (
ActiveStakingEventType EventType = 1
UnbondingStakingEventType EventType = 2
ActiveStakingEventType EventType = 1
UnbondingStakingEventType EventType = 2
WithdrawableStakingEventType EventType = 3
WithdrawnStakingEventType EventType = 4
)

// Event schema versions, only increment when the schema changes
const (
ActiveStakingEventVersion int = 0
UnbondingStakingEventVersion int = 0
ActiveStakingEventVersion int = 0
UnbondingStakingEventVersion int = 0
WithdrawableStakingEventVersion int = 0
WithdrawnStakingEventVersion int = 0
)

type EventType int
Expand All @@ -30,6 +36,7 @@ type StakingEvent struct {
StakerBtcPkHex string `json:"staker_btc_pk_hex"`
FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"`
StakingAmount uint64 `json:"staking_amount"`
StateHistory []string `json:"state_history"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think about it, the more it feels like passing everything down via msg might have been the wrong decision.

Perhaps passing down just a staking transaction hash would be sufficient and letting the API handle the lookup instead. 🤔

I’m trying to approach this from an extensibility perspective—considering whether this approach is flexible enough to support new features in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm good question - if we let api query the db on every event, it would cause load on db no?
we don't have any db cache or something in api so all the queries would go to mongo.

so basically it boils down to
queue bandwidth(storage) vs db query load 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queue bandwidth is fine since we’re not passing large-sized content. This is more of an extensibility concern.

For instance, if we add new metadata to the database, we can avoid modifying the queue code and instead rely on the API(or other) service to handle its own lookups.

While database load is a valid concern, MongoDB performs some internal caching for recently accessed documents. As long as the data has been accessed recently, the performance should remain reasonable.

In any case, this is neither a concern nor a blocker for this PR, in my opinion. It’s something we can revisit in the future, @kirugan, if there’s a need to redesign this communication pattern.

}

func (e StakingEvent) GetEventType() EventType {
Expand All @@ -45,6 +52,7 @@ func NewActiveStakingEvent(
stakerBtcPkHex string,
finalityProviderBtcPksHex []string,
stakingAmount uint64,
stateHistory []string,
) StakingEvent {
return StakingEvent{
SchemaVersion: ActiveStakingEventVersion,
Expand All @@ -53,6 +61,7 @@ func NewActiveStakingEvent(
StakerBtcPkHex: stakerBtcPkHex,
FinalityProviderBtcPksHex: finalityProviderBtcPksHex,
StakingAmount: stakingAmount,
StateHistory: stateHistory,
}
}

Expand All @@ -61,6 +70,7 @@ func NewUnbondingStakingEvent(
stakerBtcPkHex string,
finalityProviderBtcPksHex []string,
stakingAmount uint64,
stateHistory []string,
) StakingEvent {
return StakingEvent{
SchemaVersion: UnbondingStakingEventVersion,
Expand All @@ -69,5 +79,42 @@ func NewUnbondingStakingEvent(
StakerBtcPkHex: stakerBtcPkHex,
FinalityProviderBtcPksHex: finalityProviderBtcPksHex,
StakingAmount: stakingAmount,
StateHistory: stateHistory,
}
}

func NewWithdrawableStakingEvent(
stakingTxHashHex string,
stakerBtcPkHex string,
finalityProviderBtcPksHex []string,
stakingAmount uint64,
stateHistory []string,
) StakingEvent {
return StakingEvent{
SchemaVersion: WithdrawableStakingEventVersion,
EventType: WithdrawableStakingEventType,
StakingTxHashHex: stakingTxHashHex,
StakerBtcPkHex: stakerBtcPkHex,
FinalityProviderBtcPksHex: finalityProviderBtcPksHex,
StakingAmount: stakingAmount,
StateHistory: stateHistory,
}
}

func NewWithdrawnStakingEvent(
stakingTxHashHex string,
stakerBtcPkHex string,
finalityProviderBtcPksHex []string,
stakingAmount uint64,
stateHistory []string,
) StakingEvent {
return StakingEvent{
SchemaVersion: WithdrawnStakingEventVersion,
EventType: WithdrawnStakingEventType,
StakingTxHashHex: stakingTxHashHex,
StakerBtcPkHex: stakerBtcPkHex,
FinalityProviderBtcPksHex: finalityProviderBtcPksHex,
StakingAmount: stakingAmount,
StateHistory: stateHistory,
}
}
88 changes: 65 additions & 23 deletions queuemngr/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
const timeout = 5 * time.Second

type QueueManager struct {
ActiveStakingQueue client.QueueClient
UnbondingStakingQueue client.QueueClient
logger *zap.Logger
ActiveStakingQueue client.QueueClient
UnbondingStakingQueue client.QueueClient
WithdrawableStakingQueue client.QueueClient
WithdrawnStakingQueue client.QueueClient
logger *zap.Logger
}

func NewQueueManager(cfg *config.QueueConfig, logger *zap.Logger) (*QueueManager, error) {
Expand All @@ -31,63 +33,89 @@ func NewQueueManager(cfg *config.QueueConfig, logger *zap.Logger) (*QueueManager
return nil, fmt.Errorf("failed to create unbonding staking queue: %w", err)
}

withdrawableStakingQueue, err := client.NewQueueClient(cfg, client.WithdrawableStakingQueueName)
if err != nil {
return nil, fmt.Errorf("failed to create withdrawable staking queue: %w", err)
}

withdrawnStakingQueue, err := client.NewQueueClient(cfg, client.WithdrawnStakingQueueName)
if err != nil {
return nil, fmt.Errorf("failed to create withdrawn staking queue: %w", err)
}

return &QueueManager{
ActiveStakingQueue: activeStakingQueue,
UnbondingStakingQueue: unbondingStakingQueue,
logger: logger.With(zap.String("module", "queue consumer")),
ActiveStakingQueue: activeStakingQueue,
UnbondingStakingQueue: unbondingStakingQueue,
WithdrawableStakingQueue: withdrawableStakingQueue,
WithdrawnStakingQueue: withdrawnStakingQueue,
logger: logger.With(zap.String("module", "queue consumer")),
}, nil
}

func (qc *QueueManager) Start() error {
return nil
}

func PushEvent[T any](queueClient client.QueueClient, ev T) error {
func pushEvent[T any](ctx context.Context, queueClient client.QueueClient, ev T) error {
jsonBytes, err := json.Marshal(ev)
if err != nil {
return err
}
messageBody := string(jsonBytes)

err = queueClient.SendMessage(context.TODO(), messageBody)
err = queueClient.SendMessage(ctx, messageBody)
if err != nil {
return fmt.Errorf("failed to push event: %w", err)
}

return nil
}

func (qc *QueueManager) PushActiveStakingEvent(ev *client.StakingEvent) error {
jsonBytes, err := json.Marshal(ev)
func (qc *QueueManager) PushActiveStakingEvent(ctx context.Context, ev *client.StakingEvent) error {
qc.logger.Debug("pushing active staking event", zap.String("tx_hash", ev.StakingTxHashHex))

err := pushEvent(ctx, qc.ActiveStakingQueue, ev)
if err != nil {
return err
return fmt.Errorf("failed to push staking event: %w", err)
}
messageBody := string(jsonBytes)

qc.logger.Info("pushing active staking event", zap.String("tx_hash", ev.StakingTxHashHex))
err = qc.ActiveStakingQueue.SendMessage(context.TODO(), messageBody)
qc.logger.Debug("successfully pushed active staking event", zap.String("tx_hash", ev.StakingTxHashHex))
return nil
}

func (qc *QueueManager) PushUnbondingStakingEvent(ctx context.Context, ev *client.StakingEvent) error {
qc.logger.Debug("pushing unbonding staking event", zap.String("tx_hash", ev.StakingTxHashHex))

err := pushEvent(ctx, qc.UnbondingStakingQueue, ev)
if err != nil {
return fmt.Errorf("failed to push staking event: %w", err)
}
qc.logger.Info("successfully pushed active staking event", zap.String("tx_hash", ev.StakingTxHashHex))

qc.logger.Debug("successfully pushed unbonding staking event", zap.String("tx_hash", ev.StakingTxHashHex))
return nil
}

func (qc *QueueManager) PushUnbondingStakingEvent(ev *client.StakingEvent) error {
jsonBytes, err := json.Marshal(ev)
func (qc *QueueManager) PushWithdrawableStakingEvent(ctx context.Context, ev *client.StakingEvent) error {
qc.logger.Debug("pushing withdrawable staking event", zap.String("tx_hash", ev.StakingTxHashHex))

err := pushEvent(ctx, qc.WithdrawableStakingQueue, ev)
if err != nil {
return err
return fmt.Errorf("failed to push staking event: %w", err)
}
messageBody := string(jsonBytes)

qc.logger.Info("pushing unbonding staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex))
err = qc.UnbondingStakingQueue.SendMessage(context.TODO(), messageBody)
qc.logger.Debug("successfully pushed withdrawable staking event", zap.String("tx_hash", ev.StakingTxHashHex))
return nil
}

func (qc *QueueManager) PushWithdrawnStakingEvent(ctx context.Context, ev *client.StakingEvent) error {
qc.logger.Debug("pushing withdrawn staking event", zap.String("tx_hash", ev.StakingTxHashHex))

err := pushEvent(ctx, qc.WithdrawnStakingQueue, ev)
if err != nil {
return fmt.Errorf("failed to push unbonding staking event: %w", err)
return fmt.Errorf("failed to push staking event: %w", err)
}
qc.logger.Info("successfully pushed unbonding staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex))

qc.logger.Debug("successfully pushed withdrawn staking event", zap.String("tx_hash", ev.StakingTxHashHex))
return nil
}

Expand All @@ -98,6 +126,10 @@ func (qc *QueueManager) ReQueueMessage(ctx context.Context, message client.Queue
return qc.ActiveStakingQueue.ReQueueMessage(ctx, message)
case client.UnbondingStakingQueueName:
return qc.UnbondingStakingQueue.ReQueueMessage(ctx, message)
case client.WithdrawableStakingQueueName:
return qc.WithdrawableStakingQueue.ReQueueMessage(ctx, message)
case client.WithdrawnStakingQueueName:
return qc.WithdrawnStakingQueue.ReQueueMessage(ctx, message)
default:
return fmt.Errorf("unknown queue name: %s", queueName)
}
Expand All @@ -112,6 +144,14 @@ func (qc *QueueManager) Stop() error {
return err
}

if err := qc.WithdrawableStakingQueue.Stop(); err != nil {
return err
}

if err := qc.WithdrawnStakingQueue.Stop(); err != nil {
return err
}

return nil
}

Expand All @@ -120,6 +160,8 @@ func (qc *QueueManager) Ping() error {
queues := []client.QueueClient{
qc.ActiveStakingQueue,
qc.UnbondingStakingQueue,
qc.WithdrawableStakingQueue,
qc.WithdrawnStakingQueue,
}

for _, queue := range queues {
Expand Down