From 2b3e155e25a7e8bec988297a2620fc7bd7213ade Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 2 Jan 2025 16:00:40 +0530 Subject: [PATCH 1/9] add withdrawable queue --- client/schema.go | 31 +++++++++++++++++++++------ queuemngr/queue_manager.go | 43 ++++++++++++++++++++++++++++++++------ 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/client/schema.go b/client/schema.go index efe0f33..ad975fa 100644 --- a/client/schema.go +++ b/client/schema.go @@ -1,19 +1,22 @@ package client const ( - ActiveStakingQueueName string = "v2_active_staking_queue" - UnbondingStakingQueueName string = "v2_unbonding_staking_queue" + ActiveStakingQueueName string = "v2_active_staking_queue" + UnbondingStakingQueueName string = "v2_unbonding_staking_queue" + WithdrawableStakingQueueName string = "v2_withdrawable_staking_queue" ) const ( - ActiveStakingEventType EventType = 1 - UnbondingStakingEventType EventType = 2 + ActiveStakingEventType EventType = 1 + UnbondingStakingEventType EventType = 2 + WithdrawableStakingEventType EventType = 3 ) // Event schema versions, only increment when the schema changes const ( - ActiveStakingEventVersion int = 0 - UnbondingStakingEventVersion int = 0 + ActiveStakingEventVersion int = 0 + UnbondingStakingEventVersion int = 0 + WithdrawableStakingEventVersion int = 0 ) type EventType int @@ -71,3 +74,19 @@ func NewUnbondingStakingEvent( StakingAmount: stakingAmount, } } + +func NewWithdrawableStakingEvent( + stakingTxHashHex string, + stakerBtcPkHex string, + finalityProviderBtcPksHex []string, + stakingAmount uint64, +) StakingEvent { + return StakingEvent{ + SchemaVersion: WithdrawableStakingEventVersion, + EventType: WithdrawableStakingEventType, + StakingTxHashHex: stakingTxHashHex, + StakerBtcPkHex: stakerBtcPkHex, + FinalityProviderBtcPksHex: finalityProviderBtcPksHex, + StakingAmount: stakingAmount, + } +} diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index 04bcb62..1fe6c36 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -15,9 +15,10 @@ import ( const timeout = 5 * time.Second type QueueManager struct { - ActiveStakingQueue client.QueueClient - UnbondingStakingQueue client.QueueClient - logger *zap.Logger + ActiveStakingQueue client.QueueClient + UnbondingStakingQueue client.QueueClient + WithdrawableStakingQueue client.QueueClient + logger *zap.Logger } func NewQueueManager(cfg *config.QueueConfig, logger *zap.Logger) (*QueueManager, error) { @@ -31,10 +32,16 @@ func NewQueueManager(cfg *config.QueueConfig, logger *zap.Logger) (*QueueManager return nil, fmt.Errorf("failed to create unbonding staking queue: %w", err) } + withdrawableStakingQueue, err := client.NewQueueClient(cfg, client.WithdrawableStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to create withdrawable staking queue: %w", err) + } + return &QueueManager{ - ActiveStakingQueue: activeStakingQueue, - UnbondingStakingQueue: unbondingStakingQueue, - logger: logger.With(zap.String("module", "queue consumer")), + ActiveStakingQueue: activeStakingQueue, + UnbondingStakingQueue: unbondingStakingQueue, + WithdrawableStakingQueue: withdrawableStakingQueue, + logger: logger.With(zap.String("module", "queue consumer")), }, nil } @@ -91,6 +98,23 @@ func (qc *QueueManager) PushUnbondingStakingEvent(ev *client.StakingEvent) error return nil } +func (qc *QueueManager) PushWithdrawableStakingEvent(ev *client.StakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + qc.logger.Info("pushing withdrawable staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + err = qc.WithdrawableStakingQueue.SendMessage(context.TODO(), messageBody) + if err != nil { + return fmt.Errorf("failed to push withdrawable staking event: %w", err) + } + qc.logger.Info("successfully pushed withdrawable staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + + return nil +} + // requeue message func (qc *QueueManager) ReQueueMessage(ctx context.Context, message client.QueueMessage, queueName string) error { switch queueName { @@ -98,6 +122,8 @@ func (qc *QueueManager) ReQueueMessage(ctx context.Context, message client.Queue return qc.ActiveStakingQueue.ReQueueMessage(ctx, message) case client.UnbondingStakingQueueName: return qc.UnbondingStakingQueue.ReQueueMessage(ctx, message) + case client.WithdrawableStakingQueueName: + return qc.WithdrawableStakingQueue.ReQueueMessage(ctx, message) default: return fmt.Errorf("unknown queue name: %s", queueName) } @@ -112,6 +138,10 @@ func (qc *QueueManager) Stop() error { return err } + if err := qc.WithdrawableStakingQueue.Stop(); err != nil { + return err + } + return nil } @@ -120,6 +150,7 @@ func (qc *QueueManager) Ping() error { queues := []client.QueueClient{ qc.ActiveStakingQueue, qc.UnbondingStakingQueue, + qc.WithdrawableStakingQueue, } for _, queue := range queues { From 8697bb820a22ba128cfcb5a7999282af83456143 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 2 Jan 2025 17:18:00 +0530 Subject: [PATCH 2/9] withdrawn queue --- client/schema.go | 19 +++++++++++++++++++ queuemngr/queue_manager.go | 7 +++++++ 2 files changed, 26 insertions(+) diff --git a/client/schema.go b/client/schema.go index ad975fa..0a37109 100644 --- a/client/schema.go +++ b/client/schema.go @@ -4,12 +4,14 @@ const ( ActiveStakingQueueName string = "v2_active_staking_queue" UnbondingStakingQueueName string = "v2_unbonding_staking_queue" WithdrawableStakingQueueName string = "v2_withdrawable_staking_queue" + WithdrawnStakingQueueName string = "v2_withdrawn_staking_queue" ) const ( ActiveStakingEventType EventType = 1 UnbondingStakingEventType EventType = 2 WithdrawableStakingEventType EventType = 3 + WithdrawnStakingEventType EventType = 4 ) // Event schema versions, only increment when the schema changes @@ -17,6 +19,7 @@ const ( ActiveStakingEventVersion int = 0 UnbondingStakingEventVersion int = 0 WithdrawableStakingEventVersion int = 0 + WithdrawnStakingEventVersion int = 0 ) type EventType int @@ -90,3 +93,19 @@ func NewWithdrawableStakingEvent( StakingAmount: stakingAmount, } } + +func NewWithdrawnStakingEvent( + stakingTxHashHex string, + stakerBtcPkHex string, + finalityProviderBtcPksHex []string, + stakingAmount uint64, +) StakingEvent { + return StakingEvent{ + SchemaVersion: WithdrawnStakingEventVersion, + EventType: WithdrawnStakingEventType, + StakingTxHashHex: stakingTxHashHex, + StakerBtcPkHex: stakerBtcPkHex, + FinalityProviderBtcPksHex: finalityProviderBtcPksHex, + StakingAmount: stakingAmount, + } +} diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index 1fe6c36..9d2ef2b 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -18,6 +18,7 @@ type QueueManager struct { ActiveStakingQueue client.QueueClient UnbondingStakingQueue client.QueueClient WithdrawableStakingQueue client.QueueClient + WithdrawnStakingQueue client.QueueClient logger *zap.Logger } @@ -37,10 +38,16 @@ func NewQueueManager(cfg *config.QueueConfig, logger *zap.Logger) (*QueueManager return nil, fmt.Errorf("failed to create withdrawable staking queue: %w", err) } + withdrawnStakingQueue, err := client.NewQueueClient(cfg, client.WithdrawnStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to create withdrawn staking queue: %w", err) + } + return &QueueManager{ ActiveStakingQueue: activeStakingQueue, UnbondingStakingQueue: unbondingStakingQueue, WithdrawableStakingQueue: withdrawableStakingQueue, + WithdrawnStakingQueue: withdrawnStakingQueue, logger: logger.With(zap.String("module", "queue consumer")), }, nil } From 0354abfd5eece749ed2226235b2c6991b674d939 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 2 Jan 2025 18:10:54 +0530 Subject: [PATCH 3/9] fix --- queuemngr/queue_manager.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index 9d2ef2b..886104e 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -122,6 +122,23 @@ func (qc *QueueManager) PushWithdrawableStakingEvent(ev *client.StakingEvent) er return nil } +func (qc *QueueManager) PushWithdrawnStakingEvent(ev *client.StakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + qc.logger.Info("pushing withdrawn staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + err = qc.WithdrawnStakingQueue.SendMessage(context.TODO(), messageBody) + if err != nil { + return fmt.Errorf("failed to push withdrawn staking event: %w", err) + } + qc.logger.Info("successfully pushed withdrawn staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + + return nil +} + // requeue message func (qc *QueueManager) ReQueueMessage(ctx context.Context, message client.QueueMessage, queueName string) error { switch queueName { From 2875aea182ab88689b5753b8355876b5ef80ce6b Mon Sep 17 00:00:00 2001 From: Gurjot Date: Tue, 14 Jan 2025 13:05:44 +0530 Subject: [PATCH 4/9] add state record --- client/schema.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/client/schema.go b/client/schema.go index 0a37109..887f18d 100644 --- a/client/schema.go +++ b/client/schema.go @@ -29,13 +29,19 @@ type EventMessage interface { GetStakingTxHashHex() string } +type StateRecord struct { + State string `json:"state"` + SubState string `json:"sub_state,omitempty"` +} + type StakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` - StakingTxHashHex string `json:"staking_tx_hash_hex"` - StakerBtcPkHex string `json:"staker_btc_pk_hex"` - FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` - StakingAmount uint64 `json:"staking_amount"` + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` + StakingTxHashHex string `json:"staking_tx_hash_hex"` + StakerBtcPkHex string `json:"staker_btc_pk_hex"` + FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` + StakingAmount uint64 `json:"staking_amount"` + StateHistory []StateRecord `json:"state_history"` } func (e StakingEvent) GetEventType() EventType { @@ -51,6 +57,7 @@ func NewActiveStakingEvent( stakerBtcPkHex string, finalityProviderBtcPksHex []string, stakingAmount uint64, + stateHistory []StateRecord, ) StakingEvent { return StakingEvent{ SchemaVersion: ActiveStakingEventVersion, @@ -59,6 +66,7 @@ func NewActiveStakingEvent( StakerBtcPkHex: stakerBtcPkHex, FinalityProviderBtcPksHex: finalityProviderBtcPksHex, StakingAmount: stakingAmount, + StateHistory: stateHistory, } } @@ -67,6 +75,7 @@ func NewUnbondingStakingEvent( stakerBtcPkHex string, finalityProviderBtcPksHex []string, stakingAmount uint64, + stateHistory []StateRecord, ) StakingEvent { return StakingEvent{ SchemaVersion: UnbondingStakingEventVersion, @@ -75,6 +84,7 @@ func NewUnbondingStakingEvent( StakerBtcPkHex: stakerBtcPkHex, FinalityProviderBtcPksHex: finalityProviderBtcPksHex, StakingAmount: stakingAmount, + StateHistory: stateHistory, } } @@ -83,6 +93,7 @@ func NewWithdrawableStakingEvent( stakerBtcPkHex string, finalityProviderBtcPksHex []string, stakingAmount uint64, + stateHistory []StateRecord, ) StakingEvent { return StakingEvent{ SchemaVersion: WithdrawableStakingEventVersion, @@ -91,6 +102,7 @@ func NewWithdrawableStakingEvent( StakerBtcPkHex: stakerBtcPkHex, FinalityProviderBtcPksHex: finalityProviderBtcPksHex, StakingAmount: stakingAmount, + StateHistory: stateHistory, } } @@ -99,6 +111,7 @@ func NewWithdrawnStakingEvent( stakerBtcPkHex string, finalityProviderBtcPksHex []string, stakingAmount uint64, + stateHistory []StateRecord, ) StakingEvent { return StakingEvent{ SchemaVersion: WithdrawnStakingEventVersion, @@ -107,5 +120,6 @@ func NewWithdrawnStakingEvent( StakerBtcPkHex: stakerBtcPkHex, FinalityProviderBtcPksHex: finalityProviderBtcPksHex, StakingAmount: stakingAmount, + StateHistory: stateHistory, } } From 51b7de17249965c2e88dfb2a7a3efde854c656a4 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Tue, 14 Jan 2025 13:48:36 +0530 Subject: [PATCH 5/9] fix state history --- client/schema.go | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/client/schema.go b/client/schema.go index 887f18d..aaee80b 100644 --- a/client/schema.go +++ b/client/schema.go @@ -29,19 +29,14 @@ type EventMessage interface { GetStakingTxHashHex() string } -type StateRecord struct { - State string `json:"state"` - SubState string `json:"sub_state,omitempty"` -} - type StakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` - StakingTxHashHex string `json:"staking_tx_hash_hex"` - StakerBtcPkHex string `json:"staker_btc_pk_hex"` - FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` - StakingAmount uint64 `json:"staking_amount"` - StateHistory []StateRecord `json:"state_history"` + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` + StakingTxHashHex string `json:"staking_tx_hash_hex"` + StakerBtcPkHex string `json:"staker_btc_pk_hex"` + FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` + StakingAmount uint64 `json:"staking_amount"` + StateHistory []string `json:"state_history"` } func (e StakingEvent) GetEventType() EventType { @@ -57,7 +52,7 @@ func NewActiveStakingEvent( stakerBtcPkHex string, finalityProviderBtcPksHex []string, stakingAmount uint64, - stateHistory []StateRecord, + stateHistory []string, ) StakingEvent { return StakingEvent{ SchemaVersion: ActiveStakingEventVersion, @@ -75,7 +70,7 @@ func NewUnbondingStakingEvent( stakerBtcPkHex string, finalityProviderBtcPksHex []string, stakingAmount uint64, - stateHistory []StateRecord, + stateHistory []string, ) StakingEvent { return StakingEvent{ SchemaVersion: UnbondingStakingEventVersion, @@ -93,7 +88,7 @@ func NewWithdrawableStakingEvent( stakerBtcPkHex string, finalityProviderBtcPksHex []string, stakingAmount uint64, - stateHistory []StateRecord, + stateHistory []string, ) StakingEvent { return StakingEvent{ SchemaVersion: WithdrawableStakingEventVersion, @@ -111,7 +106,7 @@ func NewWithdrawnStakingEvent( stakerBtcPkHex string, finalityProviderBtcPksHex []string, stakingAmount uint64, - stateHistory []StateRecord, + stateHistory []string, ) StakingEvent { return StakingEvent{ SchemaVersion: WithdrawnStakingEventVersion, From e7e0311408d57b89367e1a9a16624dd339e78d16 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Wed, 15 Jan 2025 00:56:04 +0530 Subject: [PATCH 6/9] slashed queue --- client/schema.go | 21 +++++++++++++++++++++ queuemngr/queue_manager.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/client/schema.go b/client/schema.go index aaee80b..f85e559 100644 --- a/client/schema.go +++ b/client/schema.go @@ -5,6 +5,7 @@ const ( UnbondingStakingQueueName string = "v2_unbonding_staking_queue" WithdrawableStakingQueueName string = "v2_withdrawable_staking_queue" WithdrawnStakingQueueName string = "v2_withdrawn_staking_queue" + SlashedStakingQueueName string = "v2_slashed_staking_queue" ) const ( @@ -12,6 +13,7 @@ const ( UnbondingStakingEventType EventType = 2 WithdrawableStakingEventType EventType = 3 WithdrawnStakingEventType EventType = 4 + SlashedStakingEventType EventType = 5 ) // Event schema versions, only increment when the schema changes @@ -20,6 +22,7 @@ const ( UnbondingStakingEventVersion int = 0 WithdrawableStakingEventVersion int = 0 WithdrawnStakingEventVersion int = 0 + SlashedStakingEventVersion int = 0 ) type EventType int @@ -118,3 +121,21 @@ func NewWithdrawnStakingEvent( StateHistory: stateHistory, } } + +func NewSlashedStakingEvent( + stakingTxHashHex string, + stakerBtcPkHex string, + finalityProviderBtcPksHex []string, + stakingAmount uint64, + stateHistory []string, +) StakingEvent { + return StakingEvent{ + SchemaVersion: SlashedStakingEventVersion, + EventType: SlashedStakingEventType, + StakingTxHashHex: stakingTxHashHex, + StakerBtcPkHex: stakerBtcPkHex, + FinalityProviderBtcPksHex: finalityProviderBtcPksHex, + StakingAmount: stakingAmount, + StateHistory: stateHistory, + } +} diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index 886104e..08cd3a5 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -19,6 +19,7 @@ type QueueManager struct { UnbondingStakingQueue client.QueueClient WithdrawableStakingQueue client.QueueClient WithdrawnStakingQueue client.QueueClient + SlashedStakingQueue client.QueueClient logger *zap.Logger } @@ -43,11 +44,17 @@ func NewQueueManager(cfg *config.QueueConfig, logger *zap.Logger) (*QueueManager return nil, fmt.Errorf("failed to create withdrawn staking queue: %w", err) } + slashedStakingQueue, err := client.NewQueueClient(cfg, client.SlashedStakingQueueName) + if err != nil { + return nil, fmt.Errorf("failed to create slashed staking queue: %w", err) + } + return &QueueManager{ ActiveStakingQueue: activeStakingQueue, UnbondingStakingQueue: unbondingStakingQueue, WithdrawableStakingQueue: withdrawableStakingQueue, WithdrawnStakingQueue: withdrawnStakingQueue, + SlashedStakingQueue: slashedStakingQueue, logger: logger.With(zap.String("module", "queue consumer")), }, nil } @@ -139,6 +146,23 @@ func (qc *QueueManager) PushWithdrawnStakingEvent(ev *client.StakingEvent) error return nil } +func (qc *QueueManager) PushSlashedStakingEvent(ev *client.StakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + qc.logger.Info("pushing slashed staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + err = qc.SlashedStakingQueue.SendMessage(context.TODO(), messageBody) + if err != nil { + return fmt.Errorf("failed to push slashed staking event: %w", err) + } + qc.logger.Info("successfully pushed slashed staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + + return nil +} + // requeue message func (qc *QueueManager) ReQueueMessage(ctx context.Context, message client.QueueMessage, queueName string) error { switch queueName { @@ -148,6 +172,10 @@ func (qc *QueueManager) ReQueueMessage(ctx context.Context, message client.Queue return qc.UnbondingStakingQueue.ReQueueMessage(ctx, message) case client.WithdrawableStakingQueueName: return qc.WithdrawableStakingQueue.ReQueueMessage(ctx, message) + case client.WithdrawnStakingQueueName: + return qc.WithdrawnStakingQueue.ReQueueMessage(ctx, message) + case client.SlashedStakingQueueName: + return qc.SlashedStakingQueue.ReQueueMessage(ctx, message) default: return fmt.Errorf("unknown queue name: %s", queueName) } @@ -166,6 +194,14 @@ func (qc *QueueManager) Stop() error { return err } + if err := qc.WithdrawnStakingQueue.Stop(); err != nil { + return err + } + + if err := qc.SlashedStakingQueue.Stop(); err != nil { + return err + } + return nil } From 6623b6c8e0f59529d978e99bbf42e1bd71628dda Mon Sep 17 00:00:00 2001 From: Gurjot Date: Wed, 15 Jan 2025 11:10:31 +0530 Subject: [PATCH 7/9] Revert "slashed queue" This reverts commit e7e0311408d57b89367e1a9a16624dd339e78d16. --- client/schema.go | 21 --------------------- queuemngr/queue_manager.go | 36 ------------------------------------ 2 files changed, 57 deletions(-) diff --git a/client/schema.go b/client/schema.go index f85e559..aaee80b 100644 --- a/client/schema.go +++ b/client/schema.go @@ -5,7 +5,6 @@ const ( UnbondingStakingQueueName string = "v2_unbonding_staking_queue" WithdrawableStakingQueueName string = "v2_withdrawable_staking_queue" WithdrawnStakingQueueName string = "v2_withdrawn_staking_queue" - SlashedStakingQueueName string = "v2_slashed_staking_queue" ) const ( @@ -13,7 +12,6 @@ const ( UnbondingStakingEventType EventType = 2 WithdrawableStakingEventType EventType = 3 WithdrawnStakingEventType EventType = 4 - SlashedStakingEventType EventType = 5 ) // Event schema versions, only increment when the schema changes @@ -22,7 +20,6 @@ const ( UnbondingStakingEventVersion int = 0 WithdrawableStakingEventVersion int = 0 WithdrawnStakingEventVersion int = 0 - SlashedStakingEventVersion int = 0 ) type EventType int @@ -121,21 +118,3 @@ func NewWithdrawnStakingEvent( StateHistory: stateHistory, } } - -func NewSlashedStakingEvent( - stakingTxHashHex string, - stakerBtcPkHex string, - finalityProviderBtcPksHex []string, - stakingAmount uint64, - stateHistory []string, -) StakingEvent { - return StakingEvent{ - SchemaVersion: SlashedStakingEventVersion, - EventType: SlashedStakingEventType, - StakingTxHashHex: stakingTxHashHex, - StakerBtcPkHex: stakerBtcPkHex, - FinalityProviderBtcPksHex: finalityProviderBtcPksHex, - StakingAmount: stakingAmount, - StateHistory: stateHistory, - } -} diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index 08cd3a5..886104e 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -19,7 +19,6 @@ type QueueManager struct { UnbondingStakingQueue client.QueueClient WithdrawableStakingQueue client.QueueClient WithdrawnStakingQueue client.QueueClient - SlashedStakingQueue client.QueueClient logger *zap.Logger } @@ -44,17 +43,11 @@ func NewQueueManager(cfg *config.QueueConfig, logger *zap.Logger) (*QueueManager return nil, fmt.Errorf("failed to create withdrawn staking queue: %w", err) } - slashedStakingQueue, err := client.NewQueueClient(cfg, client.SlashedStakingQueueName) - if err != nil { - return nil, fmt.Errorf("failed to create slashed staking queue: %w", err) - } - return &QueueManager{ ActiveStakingQueue: activeStakingQueue, UnbondingStakingQueue: unbondingStakingQueue, WithdrawableStakingQueue: withdrawableStakingQueue, WithdrawnStakingQueue: withdrawnStakingQueue, - SlashedStakingQueue: slashedStakingQueue, logger: logger.With(zap.String("module", "queue consumer")), }, nil } @@ -146,23 +139,6 @@ func (qc *QueueManager) PushWithdrawnStakingEvent(ev *client.StakingEvent) error return nil } -func (qc *QueueManager) PushSlashedStakingEvent(ev *client.StakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - qc.logger.Info("pushing slashed staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) - err = qc.SlashedStakingQueue.SendMessage(context.TODO(), messageBody) - if err != nil { - return fmt.Errorf("failed to push slashed staking event: %w", err) - } - qc.logger.Info("successfully pushed slashed staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) - - return nil -} - // requeue message func (qc *QueueManager) ReQueueMessage(ctx context.Context, message client.QueueMessage, queueName string) error { switch queueName { @@ -172,10 +148,6 @@ func (qc *QueueManager) ReQueueMessage(ctx context.Context, message client.Queue return qc.UnbondingStakingQueue.ReQueueMessage(ctx, message) case client.WithdrawableStakingQueueName: return qc.WithdrawableStakingQueue.ReQueueMessage(ctx, message) - case client.WithdrawnStakingQueueName: - return qc.WithdrawnStakingQueue.ReQueueMessage(ctx, message) - case client.SlashedStakingQueueName: - return qc.SlashedStakingQueue.ReQueueMessage(ctx, message) default: return fmt.Errorf("unknown queue name: %s", queueName) } @@ -194,14 +166,6 @@ func (qc *QueueManager) Stop() error { return err } - if err := qc.WithdrawnStakingQueue.Stop(); err != nil { - return err - } - - if err := qc.SlashedStakingQueue.Stop(); err != nil { - return err - } - return nil } From cf0594017f62d4a897ef41f9c91c62465a8a9d79 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Wed, 15 Jan 2025 11:50:58 +0530 Subject: [PATCH 8/9] fix --- queuemngr/queue_manager.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index 886104e..f38ccdf 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -148,6 +148,8 @@ func (qc *QueueManager) ReQueueMessage(ctx context.Context, message client.Queue return qc.UnbondingStakingQueue.ReQueueMessage(ctx, message) case client.WithdrawableStakingQueueName: return qc.WithdrawableStakingQueue.ReQueueMessage(ctx, message) + case client.WithdrawnStakingQueueName: + return qc.WithdrawnStakingQueue.ReQueueMessage(ctx, message) default: return fmt.Errorf("unknown queue name: %s", queueName) } @@ -166,6 +168,10 @@ func (qc *QueueManager) Stop() error { return err } + if err := qc.WithdrawnStakingQueue.Stop(); err != nil { + return err + } + return nil } @@ -175,6 +181,7 @@ func (qc *QueueManager) Ping() error { qc.ActiveStakingQueue, qc.UnbondingStakingQueue, qc.WithdrawableStakingQueue, + qc.WithdrawnStakingQueue, } for _, queue := range queues { From 7f9e78b1745d01f7d95fb41d44c1457a96dd8223 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Wed, 15 Jan 2025 16:47:20 +0530 Subject: [PATCH 9/9] pr comments --- queuemngr/queue_manager.go | 62 +++++++++++++------------------------- 1 file changed, 21 insertions(+), 41 deletions(-) diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index f38ccdf..2aa0dbb 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -56,14 +56,14 @@ func (qc *QueueManager) Start() error { return nil } -func PushEvent[T any](queueClient client.QueueClient, ev T) error { +func pushEvent[T any](ctx context.Context, queueClient client.QueueClient, ev T) error { jsonBytes, err := json.Marshal(ev) if err != nil { return err } messageBody := string(jsonBytes) - err = queueClient.SendMessage(context.TODO(), messageBody) + err = queueClient.SendMessage(ctx, messageBody) if err != nil { return fmt.Errorf("failed to push event: %w", err) } @@ -71,71 +71,51 @@ func PushEvent[T any](queueClient client.QueueClient, ev T) error { return nil } -func (qc *QueueManager) PushActiveStakingEvent(ev *client.StakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) +func (qc *QueueManager) PushActiveStakingEvent(ctx context.Context, ev *client.StakingEvent) error { + qc.logger.Debug("pushing active staking event", zap.String("tx_hash", ev.StakingTxHashHex)) - qc.logger.Info("pushing active staking event", zap.String("tx_hash", ev.StakingTxHashHex)) - err = qc.ActiveStakingQueue.SendMessage(context.TODO(), messageBody) + err := pushEvent(ctx, qc.ActiveStakingQueue, ev) if err != nil { return fmt.Errorf("failed to push staking event: %w", err) } - qc.logger.Info("successfully pushed active staking event", zap.String("tx_hash", ev.StakingTxHashHex)) + qc.logger.Debug("successfully pushed active staking event", zap.String("tx_hash", ev.StakingTxHashHex)) return nil } -func (qc *QueueManager) PushUnbondingStakingEvent(ev *client.StakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) +func (qc *QueueManager) PushUnbondingStakingEvent(ctx context.Context, ev *client.StakingEvent) error { + qc.logger.Debug("pushing unbonding staking event", zap.String("tx_hash", ev.StakingTxHashHex)) - qc.logger.Info("pushing unbonding staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) - err = qc.UnbondingStakingQueue.SendMessage(context.TODO(), messageBody) + err := pushEvent(ctx, qc.UnbondingStakingQueue, ev) if err != nil { - return fmt.Errorf("failed to push unbonding staking event: %w", err) + return fmt.Errorf("failed to push staking event: %w", err) } - qc.logger.Info("successfully pushed unbonding staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + qc.logger.Debug("successfully pushed unbonding staking event", zap.String("tx_hash", ev.StakingTxHashHex)) return nil } -func (qc *QueueManager) PushWithdrawableStakingEvent(ev *client.StakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) +func (qc *QueueManager) PushWithdrawableStakingEvent(ctx context.Context, ev *client.StakingEvent) error { + qc.logger.Debug("pushing withdrawable staking event", zap.String("tx_hash", ev.StakingTxHashHex)) - qc.logger.Info("pushing withdrawable staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) - err = qc.WithdrawableStakingQueue.SendMessage(context.TODO(), messageBody) + err := pushEvent(ctx, qc.WithdrawableStakingQueue, ev) if err != nil { - return fmt.Errorf("failed to push withdrawable staking event: %w", err) + return fmt.Errorf("failed to push staking event: %w", err) } - qc.logger.Info("successfully pushed withdrawable staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + qc.logger.Debug("successfully pushed withdrawable staking event", zap.String("tx_hash", ev.StakingTxHashHex)) return nil } -func (qc *QueueManager) PushWithdrawnStakingEvent(ev *client.StakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) +func (qc *QueueManager) PushWithdrawnStakingEvent(ctx context.Context, ev *client.StakingEvent) error { + qc.logger.Debug("pushing withdrawn staking event", zap.String("tx_hash", ev.StakingTxHashHex)) - qc.logger.Info("pushing withdrawn staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) - err = qc.WithdrawnStakingQueue.SendMessage(context.TODO(), messageBody) + err := pushEvent(ctx, qc.WithdrawnStakingQueue, ev) if err != nil { - return fmt.Errorf("failed to push withdrawn staking event: %w", err) + return fmt.Errorf("failed to push staking event: %w", err) } - qc.logger.Info("successfully pushed withdrawn staking event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + qc.logger.Debug("successfully pushed withdrawn staking event", zap.String("tx_hash", ev.StakingTxHashHex)) return nil }