diff --git a/client/schema.go b/client/schema.go index efe0f33..aaee80b 100644 --- a/client/schema.go +++ b/client/schema.go @@ -1,19 +1,25 @@ package client const ( - ActiveStakingQueueName string = "v2_active_staking_queue" - UnbondingStakingQueueName string = "v2_unbonding_staking_queue" + ActiveStakingQueueName string = "v2_active_staking_queue" + UnbondingStakingQueueName string = "v2_unbonding_staking_queue" + WithdrawableStakingQueueName string = "v2_withdrawable_staking_queue" + WithdrawnStakingQueueName string = "v2_withdrawn_staking_queue" ) const ( - ActiveStakingEventType EventType = 1 - UnbondingStakingEventType EventType = 2 + ActiveStakingEventType EventType = 1 + UnbondingStakingEventType EventType = 2 + WithdrawableStakingEventType EventType = 3 + WithdrawnStakingEventType EventType = 4 ) // Event schema versions, only increment when the schema changes const ( - ActiveStakingEventVersion int = 0 - UnbondingStakingEventVersion int = 0 + ActiveStakingEventVersion int = 0 + UnbondingStakingEventVersion int = 0 + WithdrawableStakingEventVersion int = 0 + WithdrawnStakingEventVersion int = 0 ) type EventType int @@ -30,6 +36,7 @@ type StakingEvent struct { StakerBtcPkHex string `json:"staker_btc_pk_hex"` FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` StakingAmount uint64 `json:"staking_amount"` + StateHistory []string `json:"state_history"` } func (e StakingEvent) GetEventType() EventType { @@ -45,6 +52,7 @@ func NewActiveStakingEvent( stakerBtcPkHex string, finalityProviderBtcPksHex []string, stakingAmount uint64, + stateHistory []string, ) StakingEvent { return StakingEvent{ SchemaVersion: ActiveStakingEventVersion, @@ -53,6 +61,7 @@ func NewActiveStakingEvent( StakerBtcPkHex: stakerBtcPkHex, FinalityProviderBtcPksHex: finalityProviderBtcPksHex, StakingAmount: stakingAmount, + StateHistory: stateHistory, } } @@ -61,6 +70,7 @@ func NewUnbondingStakingEvent( stakerBtcPkHex string, finalityProviderBtcPksHex []string, stakingAmount uint64, + stateHistory []string, ) StakingEvent { return StakingEvent{ SchemaVersion: UnbondingStakingEventVersion, @@ -69,5 +79,42 @@ func NewUnbondingStakingEvent( StakerBtcPkHex: stakerBtcPkHex, FinalityProviderBtcPksHex: finalityProviderBtcPksHex, StakingAmount: stakingAmount, + StateHistory: stateHistory, + } +} + +func NewWithdrawableStakingEvent( + stakingTxHashHex string, + stakerBtcPkHex string, + finalityProviderBtcPksHex []string, + stakingAmount uint64, + stateHistory []string, +) StakingEvent { + return StakingEvent{ + SchemaVersion: WithdrawableStakingEventVersion, + EventType: WithdrawableStakingEventType, + StakingTxHashHex: stakingTxHashHex, + StakerBtcPkHex: stakerBtcPkHex, + FinalityProviderBtcPksHex: finalityProviderBtcPksHex, + StakingAmount: stakingAmount, + StateHistory: stateHistory, + } +} + +func NewWithdrawnStakingEvent( + stakingTxHashHex string, + stakerBtcPkHex string, + finalityProviderBtcPksHex []string, + stakingAmount uint64, + stateHistory []string, +) StakingEvent { + return StakingEvent{ + SchemaVersion: WithdrawnStakingEventVersion, + EventType: WithdrawnStakingEventType, + StakingTxHashHex: stakingTxHashHex, + StakerBtcPkHex: stakerBtcPkHex, + FinalityProviderBtcPksHex: finalityProviderBtcPksHex, + StakingAmount: stakingAmount, + StateHistory: stateHistory, } } diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index 04bcb62..2aa0dbb 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -15,9 +15,11 @@ import ( const timeout = 5 * time.Second type QueueManager struct { - ActiveStakingQueue client.QueueClient - UnbondingStakingQueue client.QueueClient - logger *zap.Logger + ActiveStakingQueue client.QueueClient + UnbondingStakingQueue client.QueueClient + WithdrawableStakingQueue client.QueueClient + WithdrawnStakingQueue client.QueueClient + logger *zap.Logger } func NewQueueManager(cfg *config.QueueConfig, logger *zap.Logger) (*QueueManager, error) { @@ -31,10 +33,22 @@ func NewQueueManager(cfg *config.QueueConfig, logger *zap.Logger) (*QueueManager return nil, fmt.Errorf("failed to create unbonding staking queue: %w", err) } + withdrawableStakingQueue, err := client.NewQueueClient(cfg, client.WithdrawableStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to create withdrawable staking queue: %w", err) + } + + withdrawnStakingQueue, err := client.NewQueueClient(cfg, client.WithdrawnStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to create withdrawn staking queue: %w", err) + } + return &QueueManager{ - ActiveStakingQueue: activeStakingQueue, - UnbondingStakingQueue: unbondingStakingQueue, - logger: logger.With(zap.String("module", "queue consumer")), + ActiveStakingQueue: activeStakingQueue, + UnbondingStakingQueue: unbondingStakingQueue, + WithdrawableStakingQueue: withdrawableStakingQueue, + WithdrawnStakingQueue: withdrawnStakingQueue, + logger: logger.With(zap.String("module", "queue consumer")), }, nil } @@ -42,14 +56,14 @@ func (qc *QueueManager) Start() error { return nil } -func PushEvent[T any](queueClient client.QueueClient, ev T) error { +func pushEvent[T any](ctx context.Context, queueClient client.QueueClient, ev T) error { jsonBytes, err := json.Marshal(ev) if err != nil { return err } messageBody := string(jsonBytes) - err = queueClient.SendMessage(context.TODO(), messageBody) + err = queueClient.SendMessage(ctx, messageBody) if err != nil { return fmt.Errorf("failed to push event: %w", err) } @@ -57,37 +71,51 @@ func PushEvent[T any](queueClient client.QueueClient, ev T) error { return nil } -func (qc *QueueManager) PushActiveStakingEvent(ev *client.StakingEvent) error { - jsonBytes, err := json.Marshal(ev) +func (qc *QueueManager) PushActiveStakingEvent(ctx context.Context, ev *client.StakingEvent) error { + qc.logger.Debug("pushing active staking event", zap.String("tx_hash", ev.StakingTxHashHex)) + + err := pushEvent(ctx, qc.ActiveStakingQueue, ev) if err != nil { - return err + return fmt.Errorf("failed to push staking event: %w", err) } - messageBody := string(jsonBytes) - qc.logger.Info("pushing active staking event", zap.String("tx_hash", ev.StakingTxHashHex)) - err = qc.ActiveStakingQueue.SendMessage(context.TODO(), messageBody) + qc.logger.Debug("successfully pushed active staking event", zap.String("tx_hash", ev.StakingTxHashHex)) + return nil +} + +func (qc *QueueManager) PushUnbondingStakingEvent(ctx context.Context, ev *client.StakingEvent) error { + qc.logger.Debug("pushing unbonding staking event", zap.String("tx_hash", ev.StakingTxHashHex)) + + err := pushEvent(ctx, qc.UnbondingStakingQueue, ev) if err != nil { return fmt.Errorf("failed to push staking event: %w", err) } - qc.logger.Info("successfully pushed active staking event", zap.String("tx_hash", ev.StakingTxHashHex)) + qc.logger.Debug("successfully pushed unbonding staking event", zap.String("tx_hash", ev.StakingTxHashHex)) return nil } -func (qc *QueueManager) PushUnbondingStakingEvent(ev *client.StakingEvent) error { - jsonBytes, err := json.Marshal(ev) +func (qc *QueueManager) PushWithdrawableStakingEvent(ctx context.Context, ev *client.StakingEvent) error { + qc.logger.Debug("pushing withdrawable staking event", zap.String("tx_hash", ev.StakingTxHashHex)) + + err := pushEvent(ctx, qc.WithdrawableStakingQueue, ev) if err != nil { - return err + return fmt.Errorf("failed to push staking event: %w", err) } - messageBody := string(jsonBytes) - qc.logger.Info("pushing unbonding staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) - err = qc.UnbondingStakingQueue.SendMessage(context.TODO(), messageBody) + qc.logger.Debug("successfully pushed withdrawable staking event", zap.String("tx_hash", ev.StakingTxHashHex)) + return nil +} + +func (qc *QueueManager) PushWithdrawnStakingEvent(ctx context.Context, ev *client.StakingEvent) error { + qc.logger.Debug("pushing withdrawn staking event", zap.String("tx_hash", ev.StakingTxHashHex)) + + err := pushEvent(ctx, qc.WithdrawnStakingQueue, ev) if err != nil { - return fmt.Errorf("failed to push unbonding staking event: %w", err) + return fmt.Errorf("failed to push staking event: %w", err) } - qc.logger.Info("successfully pushed unbonding staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + qc.logger.Debug("successfully pushed withdrawn staking event", zap.String("tx_hash", ev.StakingTxHashHex)) return nil } @@ -98,6 +126,10 @@ func (qc *QueueManager) ReQueueMessage(ctx context.Context, message client.Queue return qc.ActiveStakingQueue.ReQueueMessage(ctx, message) case client.UnbondingStakingQueueName: return qc.UnbondingStakingQueue.ReQueueMessage(ctx, message) + case client.WithdrawableStakingQueueName: + return qc.WithdrawableStakingQueue.ReQueueMessage(ctx, message) + case client.WithdrawnStakingQueueName: + return qc.WithdrawnStakingQueue.ReQueueMessage(ctx, message) default: return fmt.Errorf("unknown queue name: %s", queueName) } @@ -112,6 +144,14 @@ func (qc *QueueManager) Stop() error { return err } + if err := qc.WithdrawableStakingQueue.Stop(); err != nil { + return err + } + + if err := qc.WithdrawnStakingQueue.Stop(); err != nil { + return err + } + return nil } @@ -120,6 +160,8 @@ func (qc *QueueManager) Ping() error { queues := []client.QueueClient{ qc.ActiveStakingQueue, qc.UnbondingStakingQueue, + qc.WithdrawableStakingQueue, + qc.WithdrawnStakingQueue, } for _, queue := range queues {