From 664aebdb9c1b28adb74e0adca07e0f2b9cb83d9e Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 28 Nov 2024 12:25:26 +0530 Subject: [PATCH 1/7] new schema --- client/schema.go | 108 +++++++++++-------------------------- queuemngr/queue_manager.go | 8 +-- tests/e2e_test.go | 8 +-- tests/setup.go | 24 +++------ 4 files changed, 47 insertions(+), 101 deletions(-) diff --git a/client/schema.go b/client/schema.go index 2800341..bec6e36 100644 --- a/client/schema.go +++ b/client/schema.go @@ -38,96 +38,52 @@ type EventMessage interface { GetStakingTxHashHex() string } -type ActiveStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 1. ActiveStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` - StakerPkHex string `json:"staker_pk_hex"` - FinalityProviderPkHex string `json:"finality_provider_pk_hex"` - StakingValue uint64 `json:"staking_value"` - StakingStartHeight uint64 `json:"staking_start_height"` - StakingStartTimestamp int64 `json:"staking_start_timestamp"` - StakingTimeLock uint64 `json:"staking_timelock"` - StakingOutputIndex uint64 `json:"staking_output_index"` - StakingTxHex string `json:"staking_tx_hex"` - IsOverflow bool `json:"is_overflow"` -} - -func (e ActiveStakingEvent) GetEventType() EventType { - return ActiveStakingEventType -} - -func (e ActiveStakingEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex +type StakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` + StakingTxHashHex string `json:"staking_tx_hash_hex"` + StakerBtcPkHex string `json:"staker_btc_pk_hex"` + FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` + StakingAmount uint64 `json:"staking_amount"` } func NewActiveStakingEvent( stakingTxHashHex string, - stakerPkHex string, - finalityProviderPkHex string, - stakingValue uint64, - stakingStartHeight uint64, - stakingStartTimestamp int64, - stakingTimeLock uint64, - stakingOutputIndex uint64, - stakingTxHex string, - isOverflow bool, -) ActiveStakingEvent { - return ActiveStakingEvent{ - SchemaVersion: ActiveEventVersion, - EventType: ActiveStakingEventType, - StakingTxHashHex: stakingTxHashHex, - StakerPkHex: stakerPkHex, - FinalityProviderPkHex: finalityProviderPkHex, - StakingValue: stakingValue, - StakingStartHeight: stakingStartHeight, - StakingStartTimestamp: stakingStartTimestamp, - StakingTimeLock: stakingTimeLock, - StakingOutputIndex: stakingOutputIndex, - StakingTxHex: stakingTxHex, - IsOverflow: isOverflow, + stakerBtcPkHex string, + finalityProviderBtcPksHex []string, + stakingAmount uint64, +) StakingEvent { + return StakingEvent{ + SchemaVersion: ActiveEventVersion, + EventType: ActiveStakingEventType, + StakingTxHashHex: stakingTxHashHex, + StakerBtcPkHex: stakerBtcPkHex, + FinalityProviderBtcPksHex: finalityProviderBtcPksHex, + StakingAmount: stakingAmount, } } -type UnbondingStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 2. UnbondingStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` - UnbondingStartHeight uint64 `json:"unbonding_start_height"` - UnbondingStartTimestamp int64 `json:"unbonding_start_timestamp"` - UnbondingTimeLock uint64 `json:"unbonding_timelock"` - UnbondingOutputIndex uint64 `json:"unbonding_output_index"` - UnbondingTxHex string `json:"unbonding_tx_hex"` - UnbondingTxHashHex string `json:"unbonding_tx_hash_hex"` -} - -func (e UnbondingStakingEvent) GetEventType() EventType { - return UnbondingStakingEventType +func (e StakingEvent) GetEventType() EventType { + return ActiveStakingEventType } -func (e UnbondingStakingEvent) GetStakingTxHashHex() string { +func (e StakingEvent) GetStakingTxHashHex() string { return e.StakingTxHashHex } func NewUnbondingStakingEvent( stakingTxHashHex string, - unbondingStartHeight uint64, - unbondingStartTimestamp int64, - unbondingTimeLock uint64, - unbondingOutputIndex uint64, - unbondingTxHex string, - unbondingTxHashHex string, -) UnbondingStakingEvent { - return UnbondingStakingEvent{ - SchemaVersion: UnbondingEventVersion, - EventType: UnbondingStakingEventType, - StakingTxHashHex: stakingTxHashHex, - UnbondingStartHeight: unbondingStartHeight, - UnbondingStartTimestamp: unbondingStartTimestamp, - UnbondingTimeLock: unbondingTimeLock, - UnbondingOutputIndex: unbondingOutputIndex, - UnbondingTxHex: unbondingTxHex, - UnbondingTxHashHex: unbondingTxHashHex, + stakerBtcPkHex string, + finalityProviderBtcPksHex []string, + stakingAmount uint64, +) StakingEvent { + return StakingEvent{ + SchemaVersion: UnbondingEventVersion, + EventType: UnbondingStakingEventType, + StakingTxHashHex: stakingTxHashHex, + StakerBtcPkHex: stakerBtcPkHex, + FinalityProviderBtcPksHex: finalityProviderBtcPksHex, + StakingAmount: stakingAmount, } } diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index 0d4e056..0c4ed00 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -92,7 +92,7 @@ func PushEvent[T any](queueClient client.QueueClient, ev T) error { return nil } -func (qc *QueueManager) PushStakingEvent(ev *client.ActiveStakingEvent) error { +func (qc *QueueManager) PushStakingEvent(ev *client.StakingEvent) error { jsonBytes, err := json.Marshal(ev) if err != nil { return err @@ -109,19 +109,19 @@ func (qc *QueueManager) PushStakingEvent(ev *client.ActiveStakingEvent) error { return nil } -func (qc *QueueManager) PushUnbondingEvent(ev *client.UnbondingStakingEvent) error { +func (qc *QueueManager) PushUnbondingEvent(ev *client.StakingEvent) error { jsonBytes, err := json.Marshal(ev) if err != nil { return err } messageBody := string(jsonBytes) - qc.logger.Info("pushing unbonding event", zap.String("staking_tx_hash", ev.UnbondingTxHashHex)) + qc.logger.Info("pushing unbonding event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) err = qc.UnbondingQueue.SendMessage(context.TODO(), messageBody) if err != nil { return fmt.Errorf("failed to push unbonding event: %w", err) } - qc.logger.Info("successfully pushed unbonding event", zap.String("staking_tx_hash", ev.UnbondingTxHashHex)) + qc.logger.Info("successfully pushed unbonding event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) return nil } diff --git a/tests/e2e_test.go b/tests/e2e_test.go index fbf9678..663ef05 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -107,7 +107,7 @@ func TestSchemaVersionBackwardsCompatibility(t *testing.T) { err = queuemngr.PushEvent(queueManager.StakingQueue, event) require.NoError(t, err) receivedEv := <-stakingEventReceivedChan - var stakingEv client.ActiveStakingEvent + var stakingEv client.StakingEvent err = json.Unmarshal([]byte(receivedEv.Body), &stakingEv) require.NoError(t, err) require.Equal(t, event.EventType, stakingEv.GetEventType()) @@ -133,7 +133,7 @@ func TestStakingEvent(t *testing.T) { require.NoError(t, err) receivedEv := <-stakingEventReceivedChan - var stakingEv client.ActiveStakingEvent + var stakingEv client.StakingEvent err := json.Unmarshal([]byte(receivedEv.Body), &stakingEv) require.NoError(t, err) require.Equal(t, ev, &stakingEv) @@ -159,7 +159,7 @@ func TestUnbondingEvent(t *testing.T) { require.NoError(t, err) receivedEv := <-unbondingEvReceivedChan - var unbondingEv client.UnbondingStakingEvent + var unbondingEv client.StakingEvent err := json.Unmarshal([]byte(receivedEv.Body), &unbondingEv) require.NoError(t, err) require.Equal(t, ev, &unbondingEv) @@ -324,7 +324,7 @@ func TestReQueueEvent(t *testing.T) { t.Fatal("timeout waiting for staking event") } - var stakingEv client.ActiveStakingEvent + var stakingEv client.StakingEvent err = json.Unmarshal([]byte(receivedEv.Body), &stakingEv) require.NoError(t, err) require.Equal(t, ev, &stakingEv) diff --git a/tests/setup.go b/tests/setup.go index 83181d3..f1323a0 100644 --- a/tests/setup.go +++ b/tests/setup.go @@ -4,7 +4,6 @@ import ( "fmt" "strings" "testing" - "time" "github.com/rabbitmq/amqp091-go" "github.com/stretchr/testify/require" @@ -84,20 +83,14 @@ func purgeQueues(conn *amqp091.Connection, queues []string) error { return nil } -func buildActiveNStakingEvents(stakerHash string, numOfEvent int) []*client.ActiveStakingEvent { - var activeStakingEvents []*client.ActiveStakingEvent +func buildActiveNStakingEvents(stakerHash string, numOfEvent int) []*client.StakingEvent { + var activeStakingEvents []*client.StakingEvent for i := 0; i < numOfEvent; i++ { activeStakingEvent := client.NewActiveStakingEvent( "0x1234567890abcdef"+fmt.Sprint(i), stakerHash, - "0xabcdef1234567890"+fmt.Sprint(i), + []string{"0xabcdef1234567890" + fmt.Sprint(i)}, 1+uint64(i), - 100+uint64(i), - time.Now().Unix(), - 200+uint64(i), - 1+uint64(i), - "0xabcdef1234567890"+fmt.Sprint(i), - false, ) activeStakingEvents = append(activeStakingEvents, &activeStakingEvent) @@ -105,17 +98,14 @@ func buildActiveNStakingEvents(stakerHash string, numOfEvent int) []*client.Acti return activeStakingEvents } -func buildNUnbondingEvents(numOfEvent int) []*client.UnbondingStakingEvent { - var unbondingEvents []*client.UnbondingStakingEvent +func buildNUnbondingEvents(numOfEvent int) []*client.StakingEvent { + var unbondingEvents []*client.StakingEvent for i := 0; i < numOfEvent; i++ { unbondingEv := client.NewUnbondingStakingEvent( "0x1234567890abcdef"+fmt.Sprint(i), - uint64(i), - time.Now().Unix(), - 200+uint64(i), - uint64(0), "0xabcdef1234567890"+fmt.Sprint(i), - "0x1234567890abcdef"+fmt.Sprint(i), + []string{"0x1234567890abcdef" + fmt.Sprint(i)}, + 200+uint64(i), ) unbondingEvents = append(unbondingEvents, &unbondingEv) } From ab3a0c910368794c0c830047a47c38152430ff61 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 28 Nov 2024 16:14:30 +0530 Subject: [PATCH 2/7] fix schema --- client/schema.go | 157 ++++++++++++++++++++++++++++++++++++---------- tests/e2e_test.go | 8 +-- tests/setup.go | 24 ++++--- 3 files changed, 146 insertions(+), 43 deletions(-) diff --git a/client/schema.go b/client/schema.go index bec6e36..66acbf1 100644 --- a/client/schema.go +++ b/client/schema.go @@ -38,52 +38,96 @@ type EventMessage interface { GetStakingTxHashHex() string } -type StakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` - StakingTxHashHex string `json:"staking_tx_hash_hex"` - StakerBtcPkHex string `json:"staker_btc_pk_hex"` - FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` - StakingAmount uint64 `json:"staking_amount"` +type ActiveStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 1. ActiveStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` + StakerPkHex string `json:"staker_pk_hex"` + FinalityProviderPkHex string `json:"finality_provider_pk_hex"` + StakingValue uint64 `json:"staking_value"` + StakingStartHeight uint64 `json:"staking_start_height"` + StakingStartTimestamp int64 `json:"staking_start_timestamp"` + StakingTimeLock uint64 `json:"staking_timelock"` + StakingOutputIndex uint64 `json:"staking_output_index"` + StakingTxHex string `json:"staking_tx_hex"` + IsOverflow bool `json:"is_overflow"` +} + +func (e ActiveStakingEvent) GetEventType() EventType { + return ActiveStakingEventType +} + +func (e ActiveStakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex } func NewActiveStakingEvent( stakingTxHashHex string, - stakerBtcPkHex string, - finalityProviderBtcPksHex []string, - stakingAmount uint64, -) StakingEvent { - return StakingEvent{ - SchemaVersion: ActiveEventVersion, - EventType: ActiveStakingEventType, - StakingTxHashHex: stakingTxHashHex, - StakerBtcPkHex: stakerBtcPkHex, - FinalityProviderBtcPksHex: finalityProviderBtcPksHex, - StakingAmount: stakingAmount, + stakerPkHex string, + finalityProviderPkHex string, + stakingValue uint64, + stakingStartHeight uint64, + stakingStartTimestamp int64, + stakingTimeLock uint64, + stakingOutputIndex uint64, + stakingTxHex string, + isOverflow bool, +) ActiveStakingEvent { + return ActiveStakingEvent{ + SchemaVersion: ActiveEventVersion, + EventType: ActiveStakingEventType, + StakingTxHashHex: stakingTxHashHex, + StakerPkHex: stakerPkHex, + FinalityProviderPkHex: finalityProviderPkHex, + StakingValue: stakingValue, + StakingStartHeight: stakingStartHeight, + StakingStartTimestamp: stakingStartTimestamp, + StakingTimeLock: stakingTimeLock, + StakingOutputIndex: stakingOutputIndex, + StakingTxHex: stakingTxHex, + IsOverflow: isOverflow, } } -func (e StakingEvent) GetEventType() EventType { - return ActiveStakingEventType +type UnbondingStakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 2. UnbondingStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` + UnbondingStartHeight uint64 `json:"unbonding_start_height"` + UnbondingStartTimestamp int64 `json:"unbonding_start_timestamp"` + UnbondingTimeLock uint64 `json:"unbonding_timelock"` + UnbondingOutputIndex uint64 `json:"unbonding_output_index"` + UnbondingTxHex string `json:"unbonding_tx_hex"` + UnbondingTxHashHex string `json:"unbonding_tx_hash_hex"` } -func (e StakingEvent) GetStakingTxHashHex() string { +func (e UnbondingStakingEvent) GetEventType() EventType { + return UnbondingStakingEventType +} + +func (e UnbondingStakingEvent) GetStakingTxHashHex() string { return e.StakingTxHashHex } func NewUnbondingStakingEvent( stakingTxHashHex string, - stakerBtcPkHex string, - finalityProviderBtcPksHex []string, - stakingAmount uint64, -) StakingEvent { - return StakingEvent{ - SchemaVersion: UnbondingEventVersion, - EventType: UnbondingStakingEventType, - StakingTxHashHex: stakingTxHashHex, - StakerBtcPkHex: stakerBtcPkHex, - FinalityProviderBtcPksHex: finalityProviderBtcPksHex, - StakingAmount: stakingAmount, + unbondingStartHeight uint64, + unbondingStartTimestamp int64, + unbondingTimeLock uint64, + unbondingOutputIndex uint64, + unbondingTxHex string, + unbondingTxHashHex string, +) UnbondingStakingEvent { + return UnbondingStakingEvent{ + SchemaVersion: UnbondingEventVersion, + EventType: UnbondingStakingEventType, + StakingTxHashHex: stakingTxHashHex, + UnbondingStartHeight: unbondingStartHeight, + UnbondingStartTimestamp: unbondingStartTimestamp, + UnbondingTimeLock: unbondingTimeLock, + UnbondingOutputIndex: unbondingOutputIndex, + UnbondingTxHex: unbondingTxHex, + UnbondingTxHashHex: unbondingTxHashHex, } } @@ -234,3 +278,52 @@ func NewConfirmedInfoEvent(height, tvl uint64) ConfirmedInfoEvent { Tvl: tvl, } } + +type StakingEvent struct { + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` + StakingTxHashHex string `json:"staking_tx_hash_hex"` + StakerBtcPkHex string `json:"staker_btc_pk_hex"` + FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` + StakingAmount uint64 `json:"staking_amount"` +} + +func NewActiveStakingEventV2( + stakingTxHashHex string, + stakerBtcPkHex string, + finalityProviderBtcPksHex []string, + stakingAmount uint64, +) StakingEvent { + return StakingEvent{ + SchemaVersion: ActiveEventVersion, + EventType: ActiveStakingEventType, + StakingTxHashHex: stakingTxHashHex, + StakerBtcPkHex: stakerBtcPkHex, + FinalityProviderBtcPksHex: finalityProviderBtcPksHex, + StakingAmount: stakingAmount, + } +} + +func NewUnbondingStakingEventV2( + stakingTxHashHex string, + stakerBtcPkHex string, + finalityProviderBtcPksHex []string, + stakingAmount uint64, +) StakingEvent { + return StakingEvent{ + SchemaVersion: UnbondingEventVersion, + EventType: UnbondingStakingEventType, + StakingTxHashHex: stakingTxHashHex, + StakerBtcPkHex: stakerBtcPkHex, + FinalityProviderBtcPksHex: finalityProviderBtcPksHex, + StakingAmount: stakingAmount, + } +} + +func (e StakingEvent) GetEventType() EventType { + return e.EventType +} + +func (e StakingEvent) GetStakingTxHashHex() string { + return e.StakingTxHashHex +} diff --git a/tests/e2e_test.go b/tests/e2e_test.go index 663ef05..fbf9678 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -107,7 +107,7 @@ func TestSchemaVersionBackwardsCompatibility(t *testing.T) { err = queuemngr.PushEvent(queueManager.StakingQueue, event) require.NoError(t, err) receivedEv := <-stakingEventReceivedChan - var stakingEv client.StakingEvent + var stakingEv client.ActiveStakingEvent err = json.Unmarshal([]byte(receivedEv.Body), &stakingEv) require.NoError(t, err) require.Equal(t, event.EventType, stakingEv.GetEventType()) @@ -133,7 +133,7 @@ func TestStakingEvent(t *testing.T) { require.NoError(t, err) receivedEv := <-stakingEventReceivedChan - var stakingEv client.StakingEvent + var stakingEv client.ActiveStakingEvent err := json.Unmarshal([]byte(receivedEv.Body), &stakingEv) require.NoError(t, err) require.Equal(t, ev, &stakingEv) @@ -159,7 +159,7 @@ func TestUnbondingEvent(t *testing.T) { require.NoError(t, err) receivedEv := <-unbondingEvReceivedChan - var unbondingEv client.StakingEvent + var unbondingEv client.UnbondingStakingEvent err := json.Unmarshal([]byte(receivedEv.Body), &unbondingEv) require.NoError(t, err) require.Equal(t, ev, &unbondingEv) @@ -324,7 +324,7 @@ func TestReQueueEvent(t *testing.T) { t.Fatal("timeout waiting for staking event") } - var stakingEv client.StakingEvent + var stakingEv client.ActiveStakingEvent err = json.Unmarshal([]byte(receivedEv.Body), &stakingEv) require.NoError(t, err) require.Equal(t, ev, &stakingEv) diff --git a/tests/setup.go b/tests/setup.go index f1323a0..83181d3 100644 --- a/tests/setup.go +++ b/tests/setup.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" "testing" + "time" "github.com/rabbitmq/amqp091-go" "github.com/stretchr/testify/require" @@ -83,14 +84,20 @@ func purgeQueues(conn *amqp091.Connection, queues []string) error { return nil } -func buildActiveNStakingEvents(stakerHash string, numOfEvent int) []*client.StakingEvent { - var activeStakingEvents []*client.StakingEvent +func buildActiveNStakingEvents(stakerHash string, numOfEvent int) []*client.ActiveStakingEvent { + var activeStakingEvents []*client.ActiveStakingEvent for i := 0; i < numOfEvent; i++ { activeStakingEvent := client.NewActiveStakingEvent( "0x1234567890abcdef"+fmt.Sprint(i), stakerHash, - []string{"0xabcdef1234567890" + fmt.Sprint(i)}, + "0xabcdef1234567890"+fmt.Sprint(i), 1+uint64(i), + 100+uint64(i), + time.Now().Unix(), + 200+uint64(i), + 1+uint64(i), + "0xabcdef1234567890"+fmt.Sprint(i), + false, ) activeStakingEvents = append(activeStakingEvents, &activeStakingEvent) @@ -98,14 +105,17 @@ func buildActiveNStakingEvents(stakerHash string, numOfEvent int) []*client.Stak return activeStakingEvents } -func buildNUnbondingEvents(numOfEvent int) []*client.StakingEvent { - var unbondingEvents []*client.StakingEvent +func buildNUnbondingEvents(numOfEvent int) []*client.UnbondingStakingEvent { + var unbondingEvents []*client.UnbondingStakingEvent for i := 0; i < numOfEvent; i++ { unbondingEv := client.NewUnbondingStakingEvent( "0x1234567890abcdef"+fmt.Sprint(i), - "0xabcdef1234567890"+fmt.Sprint(i), - []string{"0x1234567890abcdef" + fmt.Sprint(i)}, + uint64(i), + time.Now().Unix(), 200+uint64(i), + uint64(0), + "0xabcdef1234567890"+fmt.Sprint(i), + "0x1234567890abcdef"+fmt.Sprint(i), ) unbondingEvents = append(unbondingEvents, &unbondingEv) } From 43b4edbaf0895bef9daf160383a67a2d57b0d441 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 12:07:50 +0530 Subject: [PATCH 3/7] fix --- queuemngr/queue_manager.go | 42 ++++++++++++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index 0c4ed00..7607432 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -92,7 +92,7 @@ func PushEvent[T any](queueClient client.QueueClient, ev T) error { return nil } -func (qc *QueueManager) PushStakingEvent(ev *client.StakingEvent) error { +func (qc *QueueManager) PushStakingEvent(ev *client.ActiveStakingEvent) error { jsonBytes, err := json.Marshal(ev) if err != nil { return err @@ -109,19 +109,19 @@ func (qc *QueueManager) PushStakingEvent(ev *client.StakingEvent) error { return nil } -func (qc *QueueManager) PushUnbondingEvent(ev *client.StakingEvent) error { +func (qc *QueueManager) PushUnbondingEvent(ev *client.UnbondingStakingEvent) error { jsonBytes, err := json.Marshal(ev) if err != nil { return err } messageBody := string(jsonBytes) - qc.logger.Info("pushing unbonding event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + qc.logger.Info("pushing unbonding event", zap.String("staking_tx_hash", ev.UnbondingTxHashHex)) err = qc.UnbondingQueue.SendMessage(context.TODO(), messageBody) if err != nil { return fmt.Errorf("failed to push unbonding event: %w", err) } - qc.logger.Info("successfully pushed unbonding event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + qc.logger.Info("successfully pushed unbonding event", zap.String("staking_tx_hash", ev.UnbondingTxHashHex)) return nil } @@ -197,6 +197,40 @@ func (qc *QueueManager) PushConfirmedInfoEvent(ev *client.ConfirmedInfoEvent) er return nil } +func (qc *QueueManager) PushActiveEventV2(ev *client.StakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + qc.logger.Info("pushing active event v2", zap.String("tx_hash", ev.StakingTxHashHex)) + err = qc.StakingQueue.SendMessage(context.TODO(), messageBody) + if err != nil { + return fmt.Errorf("failed to push active event v2: %w", err) + } + qc.logger.Info("successfully pushed active event v2", zap.String("tx_hash", ev.StakingTxHashHex)) + + return nil +} + +func (qc *QueueManager) PushUnbondingEventV2(ev *client.StakingEvent) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + qc.logger.Info("pushing unbonding event v2", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + err = qc.UnbondingQueue.SendMessage(context.TODO(), messageBody) + if err != nil { + return fmt.Errorf("failed to push unbonding event v2: %w", err) + } + qc.logger.Info("successfully pushed unbonding event v2", zap.String("staking_tx_hash", ev.StakingTxHashHex)) + + return nil +} + // requeue message func (qc *QueueManager) ReQueueMessage(ctx context.Context, message client.QueueMessage, queueName string) error { switch queueName { From ffa04f0df50a9a93d23e2b9b04065774e405b962 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 12:48:03 +0530 Subject: [PATCH 4/7] Revert "fix" This reverts commit 43b4edbaf0895bef9daf160383a67a2d57b0d441. --- queuemngr/queue_manager.go | 42 ++++---------------------------------- 1 file changed, 4 insertions(+), 38 deletions(-) diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index 7607432..0c4ed00 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -92,7 +92,7 @@ func PushEvent[T any](queueClient client.QueueClient, ev T) error { return nil } -func (qc *QueueManager) PushStakingEvent(ev *client.ActiveStakingEvent) error { +func (qc *QueueManager) PushStakingEvent(ev *client.StakingEvent) error { jsonBytes, err := json.Marshal(ev) if err != nil { return err @@ -109,19 +109,19 @@ func (qc *QueueManager) PushStakingEvent(ev *client.ActiveStakingEvent) error { return nil } -func (qc *QueueManager) PushUnbondingEvent(ev *client.UnbondingStakingEvent) error { +func (qc *QueueManager) PushUnbondingEvent(ev *client.StakingEvent) error { jsonBytes, err := json.Marshal(ev) if err != nil { return err } messageBody := string(jsonBytes) - qc.logger.Info("pushing unbonding event", zap.String("staking_tx_hash", ev.UnbondingTxHashHex)) + qc.logger.Info("pushing unbonding event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) err = qc.UnbondingQueue.SendMessage(context.TODO(), messageBody) if err != nil { return fmt.Errorf("failed to push unbonding event: %w", err) } - qc.logger.Info("successfully pushed unbonding event", zap.String("staking_tx_hash", ev.UnbondingTxHashHex)) + qc.logger.Info("successfully pushed unbonding event", zap.String("staking_tx_hash", ev.StakingTxHashHex)) return nil } @@ -197,40 +197,6 @@ func (qc *QueueManager) PushConfirmedInfoEvent(ev *client.ConfirmedInfoEvent) er return nil } -func (qc *QueueManager) PushActiveEventV2(ev *client.StakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - qc.logger.Info("pushing active event v2", zap.String("tx_hash", ev.StakingTxHashHex)) - err = qc.StakingQueue.SendMessage(context.TODO(), messageBody) - if err != nil { - return fmt.Errorf("failed to push active event v2: %w", err) - } - qc.logger.Info("successfully pushed active event v2", zap.String("tx_hash", ev.StakingTxHashHex)) - - return nil -} - -func (qc *QueueManager) PushUnbondingEventV2(ev *client.StakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - qc.logger.Info("pushing unbonding event v2", zap.String("staking_tx_hash", ev.StakingTxHashHex)) - err = qc.UnbondingQueue.SendMessage(context.TODO(), messageBody) - if err != nil { - return fmt.Errorf("failed to push unbonding event v2: %w", err) - } - qc.logger.Info("successfully pushed unbonding event v2", zap.String("staking_tx_hash", ev.StakingTxHashHex)) - - return nil -} - // requeue message func (qc *QueueManager) ReQueueMessage(ctx context.Context, message client.QueueMessage, queueName string) error { switch queueName { From 8c2cc57a7ed84528670ba0a0da868e030661219a Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 12:49:45 +0530 Subject: [PATCH 5/7] disable tests --- .circleci/config.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 4664cdc..14bce6b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -27,10 +27,10 @@ jobs: command: | curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.55.2 ./bin/golangci-lint run --timeout 5m0s - - run: - name: Run tests - command: | - make test + # - run: + # name: Run tests + # command: | + # make test workflows: CI: From 9b461a91579ce781608edc00823a4bda428d0feb Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 12:56:07 +0530 Subject: [PATCH 6/7] cmnt some tests --- .circleci/config.yml | 8 +- tests/e2e_test.go | 222 +++++++++++++++++++++---------------------- 2 files changed, 115 insertions(+), 115 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 14bce6b..4664cdc 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -27,10 +27,10 @@ jobs: command: | curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.55.2 ./bin/golangci-lint run --timeout 5m0s - # - run: - # name: Run tests - # command: | - # make test + - run: + name: Run tests + command: | + make test workflows: CI: diff --git a/tests/e2e_test.go b/tests/e2e_test.go index fbf9678..ea8ca1f 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -115,57 +115,57 @@ func TestSchemaVersionBackwardsCompatibility(t *testing.T) { require.Equal(t, 0, stakingEv.SchemaVersion) } -func TestStakingEvent(t *testing.T) { - numStakingEvents := 3 - activeStakingEvents := buildActiveNStakingEvents(mockStakerHash, numStakingEvents) - queueCfg := config.DefaultQueueConfig() - - testServer := setupTestQueueConsumer(t, queueCfg) - defer testServer.Stop(t) - - queueManager := testServer.QueueManager - - stakingEventReceivedChan, err := queueManager.StakingQueue.ReceiveMessages() - require.NoError(t, err) - - for _, ev := range activeStakingEvents { - err = queueManager.PushStakingEvent(ev) - require.NoError(t, err) - - receivedEv := <-stakingEventReceivedChan - var stakingEv client.ActiveStakingEvent - err := json.Unmarshal([]byte(receivedEv.Body), &stakingEv) - require.NoError(t, err) - require.Equal(t, ev, &stakingEv) - require.Equal(t, 0, stakingEv.SchemaVersion) - } -} - -func TestUnbondingEvent(t *testing.T) { - numUnbondingEvents := 3 - unbondingEvents := buildNUnbondingEvents(numUnbondingEvents) - queueCfg := config.DefaultQueueConfig() - - testServer := setupTestQueueConsumer(t, queueCfg) - defer testServer.Stop(t) - - queueManager := testServer.QueueManager - - unbondingEvReceivedChan, err := queueManager.UnbondingQueue.ReceiveMessages() - require.NoError(t, err) - - for _, ev := range unbondingEvents { - err = queueManager.PushUnbondingEvent(ev) - require.NoError(t, err) - - receivedEv := <-unbondingEvReceivedChan - var unbondingEv client.UnbondingStakingEvent - err := json.Unmarshal([]byte(receivedEv.Body), &unbondingEv) - require.NoError(t, err) - require.Equal(t, ev, &unbondingEv) - require.Equal(t, 0, unbondingEv.SchemaVersion) - } -} +// func TestStakingEvent(t *testing.T) { +// numStakingEvents := 3 +// activeStakingEvents := buildActiveNStakingEvents(mockStakerHash, numStakingEvents) +// queueCfg := config.DefaultQueueConfig() + +// testServer := setupTestQueueConsumer(t, queueCfg) +// defer testServer.Stop(t) + +// queueManager := testServer.QueueManager + +// stakingEventReceivedChan, err := queueManager.StakingQueue.ReceiveMessages() +// require.NoError(t, err) + +// for _, ev := range activeStakingEvents { +// err = queueManager.PushStakingEvent(ev) +// require.NoError(t, err) + +// receivedEv := <-stakingEventReceivedChan +// var stakingEv client.ActiveStakingEvent +// err := json.Unmarshal([]byte(receivedEv.Body), &stakingEv) +// require.NoError(t, err) +// require.Equal(t, ev, &stakingEv) +// require.Equal(t, 0, stakingEv.SchemaVersion) +// } +// } + +// func TestUnbondingEvent(t *testing.T) { +// numUnbondingEvents := 3 +// unbondingEvents := buildNUnbondingEvents(numUnbondingEvents) +// queueCfg := config.DefaultQueueConfig() + +// testServer := setupTestQueueConsumer(t, queueCfg) +// defer testServer.Stop(t) + +// queueManager := testServer.QueueManager + +// unbondingEvReceivedChan, err := queueManager.UnbondingQueue.ReceiveMessages() +// require.NoError(t, err) + +// for _, ev := range unbondingEvents { +// err = queueManager.PushUnbondingEvent(ev) +// require.NoError(t, err) + +// receivedEv := <-unbondingEvReceivedChan +// var unbondingEv client.UnbondingStakingEvent +// err := json.Unmarshal([]byte(receivedEv.Body), &unbondingEv) +// require.NoError(t, err) +// require.Equal(t, ev, &unbondingEv) +// require.Equal(t, 0, unbondingEv.SchemaVersion) +// } +// } func TestWithdrawEvent(t *testing.T) { numWithdrawEvents := 3 @@ -300,63 +300,63 @@ func TestConfirmedInfoEvent(t *testing.T) { } } -func TestReQueueEvent(t *testing.T) { - activeStakingEvents := buildActiveNStakingEvents(mockStakerHash, 1) - queueCfg := config.DefaultQueueConfig() - - testServer := setupTestQueueConsumer(t, queueCfg) - defer testServer.Stop(t) - - queueManager := testServer.QueueManager - - stakingEventReceivedChan, err := queueManager.StakingQueue.ReceiveMessages() - require.NoError(t, err) - - ev := activeStakingEvents[0] - err = queueManager.PushStakingEvent(ev) - require.NoError(t, err) - - var receivedEv client.QueueMessage - - select { - case receivedEv = <-stakingEventReceivedChan: - case <-time.After(10 * time.Second): // Wait up to 10 seconds for a message - t.Fatal("timeout waiting for staking event") - } - - var stakingEv client.ActiveStakingEvent - err = json.Unmarshal([]byte(receivedEv.Body), &stakingEv) - require.NoError(t, err) - require.Equal(t, ev, &stakingEv) - require.Equal(t, int32(0), receivedEv.RetryAttempts) - - // Now let's requeue the event - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - err = queueManager.StakingQueue.ReQueueMessage(ctx, receivedEv) - require.NoError(t, err) - time.Sleep(1 * time.Second) // Wait to ensure message has time to move to delayed queue - - // Check that the main queue is empty - count, err := inspectQueueMessageCount(t, testServer.Conn, client.ActiveStakingQueueName) - require.NoError(t, err) - require.Equal(t, 0, count) - - // Make sure it appears in the delayed queue - delayedQueueCount, err := inspectQueueMessageCount(t, testServer.Conn, client.ActiveStakingQueueName+"_delay") - require.NoError(t, err) - require.Equal(t, 1, delayedQueueCount) - - // Checking delayed queue message appearance - select { - case requeuedEvent := <-stakingEventReceivedChan: - require.Nil(t, requeuedEvent, "Event should not be available immediately in the main queue") - case <-time.After(3 * time.Second): // Wait longer than the delay to ensure the message moves back - } - - // Now let's wait for the requeued event - time.Sleep(2 * time.Second) // Wait additional time for delayed message to return - requeuedEvent := <-stakingEventReceivedChan - require.NotNil(t, requeuedEvent) - require.Equal(t, int32(1), requeuedEvent.RetryAttempts) -} +// func TestReQueueEvent(t *testing.T) { +// activeStakingEvents := buildActiveNStakingEvents(mockStakerHash, 1) +// queueCfg := config.DefaultQueueConfig() + +// testServer := setupTestQueueConsumer(t, queueCfg) +// defer testServer.Stop(t) + +// queueManager := testServer.QueueManager + +// stakingEventReceivedChan, err := queueManager.StakingQueue.ReceiveMessages() +// require.NoError(t, err) + +// ev := activeStakingEvents[0] +// err = queueManager.PushStakingEvent(ev) +// require.NoError(t, err) + +// var receivedEv client.QueueMessage + +// select { +// case receivedEv = <-stakingEventReceivedChan: +// case <-time.After(10 * time.Second): // Wait up to 10 seconds for a message +// t.Fatal("timeout waiting for staking event") +// } + +// var stakingEv client.ActiveStakingEvent +// err = json.Unmarshal([]byte(receivedEv.Body), &stakingEv) +// require.NoError(t, err) +// require.Equal(t, ev, &stakingEv) +// require.Equal(t, int32(0), receivedEv.RetryAttempts) + +// // Now let's requeue the event +// ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) +// defer cancel() +// err = queueManager.StakingQueue.ReQueueMessage(ctx, receivedEv) +// require.NoError(t, err) +// time.Sleep(1 * time.Second) // Wait to ensure message has time to move to delayed queue + +// // Check that the main queue is empty +// count, err := inspectQueueMessageCount(t, testServer.Conn, client.ActiveStakingQueueName) +// require.NoError(t, err) +// require.Equal(t, 0, count) + +// // Make sure it appears in the delayed queue +// delayedQueueCount, err := inspectQueueMessageCount(t, testServer.Conn, client.ActiveStakingQueueName+"_delay") +// require.NoError(t, err) +// require.Equal(t, 1, delayedQueueCount) + +// // Checking delayed queue message appearance +// select { +// case requeuedEvent := <-stakingEventReceivedChan: +// require.Nil(t, requeuedEvent, "Event should not be available immediately in the main queue") +// case <-time.After(3 * time.Second): // Wait longer than the delay to ensure the message moves back +// } + +// // Now let's wait for the requeued event +// time.Sleep(2 * time.Second) // Wait additional time for delayed message to return +// requeuedEvent := <-stakingEventReceivedChan +// require.NotNil(t, requeuedEvent) +// require.Equal(t, int32(1), requeuedEvent.RetryAttempts) +// } From a69b329ff376725b73cd68377cf35193aabb31a5 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 13:01:53 +0530 Subject: [PATCH 7/7] fix tests --- tests/e2e_test.go | 202 +++++++++++++++++++++++----------------------- tests/setup.go | 28 +++---- 2 files changed, 110 insertions(+), 120 deletions(-) diff --git a/tests/e2e_test.go b/tests/e2e_test.go index ea8ca1f..663ef05 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -107,7 +107,7 @@ func TestSchemaVersionBackwardsCompatibility(t *testing.T) { err = queuemngr.PushEvent(queueManager.StakingQueue, event) require.NoError(t, err) receivedEv := <-stakingEventReceivedChan - var stakingEv client.ActiveStakingEvent + var stakingEv client.StakingEvent err = json.Unmarshal([]byte(receivedEv.Body), &stakingEv) require.NoError(t, err) require.Equal(t, event.EventType, stakingEv.GetEventType()) @@ -115,57 +115,57 @@ func TestSchemaVersionBackwardsCompatibility(t *testing.T) { require.Equal(t, 0, stakingEv.SchemaVersion) } -// func TestStakingEvent(t *testing.T) { -// numStakingEvents := 3 -// activeStakingEvents := buildActiveNStakingEvents(mockStakerHash, numStakingEvents) -// queueCfg := config.DefaultQueueConfig() +func TestStakingEvent(t *testing.T) { + numStakingEvents := 3 + activeStakingEvents := buildActiveNStakingEvents(mockStakerHash, numStakingEvents) + queueCfg := config.DefaultQueueConfig() -// testServer := setupTestQueueConsumer(t, queueCfg) -// defer testServer.Stop(t) + testServer := setupTestQueueConsumer(t, queueCfg) + defer testServer.Stop(t) -// queueManager := testServer.QueueManager + queueManager := testServer.QueueManager -// stakingEventReceivedChan, err := queueManager.StakingQueue.ReceiveMessages() -// require.NoError(t, err) + stakingEventReceivedChan, err := queueManager.StakingQueue.ReceiveMessages() + require.NoError(t, err) -// for _, ev := range activeStakingEvents { -// err = queueManager.PushStakingEvent(ev) -// require.NoError(t, err) + for _, ev := range activeStakingEvents { + err = queueManager.PushStakingEvent(ev) + require.NoError(t, err) -// receivedEv := <-stakingEventReceivedChan -// var stakingEv client.ActiveStakingEvent -// err := json.Unmarshal([]byte(receivedEv.Body), &stakingEv) -// require.NoError(t, err) -// require.Equal(t, ev, &stakingEv) -// require.Equal(t, 0, stakingEv.SchemaVersion) -// } -// } + receivedEv := <-stakingEventReceivedChan + var stakingEv client.StakingEvent + err := json.Unmarshal([]byte(receivedEv.Body), &stakingEv) + require.NoError(t, err) + require.Equal(t, ev, &stakingEv) + require.Equal(t, 0, stakingEv.SchemaVersion) + } +} -// func TestUnbondingEvent(t *testing.T) { -// numUnbondingEvents := 3 -// unbondingEvents := buildNUnbondingEvents(numUnbondingEvents) -// queueCfg := config.DefaultQueueConfig() +func TestUnbondingEvent(t *testing.T) { + numUnbondingEvents := 3 + unbondingEvents := buildNUnbondingEvents(numUnbondingEvents) + queueCfg := config.DefaultQueueConfig() -// testServer := setupTestQueueConsumer(t, queueCfg) -// defer testServer.Stop(t) + testServer := setupTestQueueConsumer(t, queueCfg) + defer testServer.Stop(t) -// queueManager := testServer.QueueManager + queueManager := testServer.QueueManager -// unbondingEvReceivedChan, err := queueManager.UnbondingQueue.ReceiveMessages() -// require.NoError(t, err) + unbondingEvReceivedChan, err := queueManager.UnbondingQueue.ReceiveMessages() + require.NoError(t, err) -// for _, ev := range unbondingEvents { -// err = queueManager.PushUnbondingEvent(ev) -// require.NoError(t, err) + for _, ev := range unbondingEvents { + err = queueManager.PushUnbondingEvent(ev) + require.NoError(t, err) -// receivedEv := <-unbondingEvReceivedChan -// var unbondingEv client.UnbondingStakingEvent -// err := json.Unmarshal([]byte(receivedEv.Body), &unbondingEv) -// require.NoError(t, err) -// require.Equal(t, ev, &unbondingEv) -// require.Equal(t, 0, unbondingEv.SchemaVersion) -// } -// } + receivedEv := <-unbondingEvReceivedChan + var unbondingEv client.StakingEvent + err := json.Unmarshal([]byte(receivedEv.Body), &unbondingEv) + require.NoError(t, err) + require.Equal(t, ev, &unbondingEv) + require.Equal(t, 0, unbondingEv.SchemaVersion) + } +} func TestWithdrawEvent(t *testing.T) { numWithdrawEvents := 3 @@ -300,63 +300,63 @@ func TestConfirmedInfoEvent(t *testing.T) { } } -// func TestReQueueEvent(t *testing.T) { -// activeStakingEvents := buildActiveNStakingEvents(mockStakerHash, 1) -// queueCfg := config.DefaultQueueConfig() - -// testServer := setupTestQueueConsumer(t, queueCfg) -// defer testServer.Stop(t) - -// queueManager := testServer.QueueManager - -// stakingEventReceivedChan, err := queueManager.StakingQueue.ReceiveMessages() -// require.NoError(t, err) - -// ev := activeStakingEvents[0] -// err = queueManager.PushStakingEvent(ev) -// require.NoError(t, err) - -// var receivedEv client.QueueMessage - -// select { -// case receivedEv = <-stakingEventReceivedChan: -// case <-time.After(10 * time.Second): // Wait up to 10 seconds for a message -// t.Fatal("timeout waiting for staking event") -// } - -// var stakingEv client.ActiveStakingEvent -// err = json.Unmarshal([]byte(receivedEv.Body), &stakingEv) -// require.NoError(t, err) -// require.Equal(t, ev, &stakingEv) -// require.Equal(t, int32(0), receivedEv.RetryAttempts) - -// // Now let's requeue the event -// ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) -// defer cancel() -// err = queueManager.StakingQueue.ReQueueMessage(ctx, receivedEv) -// require.NoError(t, err) -// time.Sleep(1 * time.Second) // Wait to ensure message has time to move to delayed queue - -// // Check that the main queue is empty -// count, err := inspectQueueMessageCount(t, testServer.Conn, client.ActiveStakingQueueName) -// require.NoError(t, err) -// require.Equal(t, 0, count) - -// // Make sure it appears in the delayed queue -// delayedQueueCount, err := inspectQueueMessageCount(t, testServer.Conn, client.ActiveStakingQueueName+"_delay") -// require.NoError(t, err) -// require.Equal(t, 1, delayedQueueCount) - -// // Checking delayed queue message appearance -// select { -// case requeuedEvent := <-stakingEventReceivedChan: -// require.Nil(t, requeuedEvent, "Event should not be available immediately in the main queue") -// case <-time.After(3 * time.Second): // Wait longer than the delay to ensure the message moves back -// } - -// // Now let's wait for the requeued event -// time.Sleep(2 * time.Second) // Wait additional time for delayed message to return -// requeuedEvent := <-stakingEventReceivedChan -// require.NotNil(t, requeuedEvent) -// require.Equal(t, int32(1), requeuedEvent.RetryAttempts) -// } +func TestReQueueEvent(t *testing.T) { + activeStakingEvents := buildActiveNStakingEvents(mockStakerHash, 1) + queueCfg := config.DefaultQueueConfig() + + testServer := setupTestQueueConsumer(t, queueCfg) + defer testServer.Stop(t) + + queueManager := testServer.QueueManager + + stakingEventReceivedChan, err := queueManager.StakingQueue.ReceiveMessages() + require.NoError(t, err) + + ev := activeStakingEvents[0] + err = queueManager.PushStakingEvent(ev) + require.NoError(t, err) + + var receivedEv client.QueueMessage + + select { + case receivedEv = <-stakingEventReceivedChan: + case <-time.After(10 * time.Second): // Wait up to 10 seconds for a message + t.Fatal("timeout waiting for staking event") + } + + var stakingEv client.StakingEvent + err = json.Unmarshal([]byte(receivedEv.Body), &stakingEv) + require.NoError(t, err) + require.Equal(t, ev, &stakingEv) + require.Equal(t, int32(0), receivedEv.RetryAttempts) + + // Now let's requeue the event + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + err = queueManager.StakingQueue.ReQueueMessage(ctx, receivedEv) + require.NoError(t, err) + time.Sleep(1 * time.Second) // Wait to ensure message has time to move to delayed queue + + // Check that the main queue is empty + count, err := inspectQueueMessageCount(t, testServer.Conn, client.ActiveStakingQueueName) + require.NoError(t, err) + require.Equal(t, 0, count) + + // Make sure it appears in the delayed queue + delayedQueueCount, err := inspectQueueMessageCount(t, testServer.Conn, client.ActiveStakingQueueName+"_delay") + require.NoError(t, err) + require.Equal(t, 1, delayedQueueCount) + + // Checking delayed queue message appearance + select { + case requeuedEvent := <-stakingEventReceivedChan: + require.Nil(t, requeuedEvent, "Event should not be available immediately in the main queue") + case <-time.After(3 * time.Second): // Wait longer than the delay to ensure the message moves back + } + + // Now let's wait for the requeued event + time.Sleep(2 * time.Second) // Wait additional time for delayed message to return + requeuedEvent := <-stakingEventReceivedChan + require.NotNil(t, requeuedEvent) + require.Equal(t, int32(1), requeuedEvent.RetryAttempts) +} diff --git a/tests/setup.go b/tests/setup.go index 83181d3..37fc3a3 100644 --- a/tests/setup.go +++ b/tests/setup.go @@ -4,7 +4,6 @@ import ( "fmt" "strings" "testing" - "time" "github.com/rabbitmq/amqp091-go" "github.com/stretchr/testify/require" @@ -84,20 +83,14 @@ func purgeQueues(conn *amqp091.Connection, queues []string) error { return nil } -func buildActiveNStakingEvents(stakerHash string, numOfEvent int) []*client.ActiveStakingEvent { - var activeStakingEvents []*client.ActiveStakingEvent +func buildActiveNStakingEvents(stakerHash string, numOfEvent int) []*client.StakingEvent { + var activeStakingEvents []*client.StakingEvent for i := 0; i < numOfEvent; i++ { - activeStakingEvent := client.NewActiveStakingEvent( + activeStakingEvent := client.NewActiveStakingEventV2( "0x1234567890abcdef"+fmt.Sprint(i), stakerHash, - "0xabcdef1234567890"+fmt.Sprint(i), + []string{"0xabcdef1234567890" + fmt.Sprint(i)}, 1+uint64(i), - 100+uint64(i), - time.Now().Unix(), - 200+uint64(i), - 1+uint64(i), - "0xabcdef1234567890"+fmt.Sprint(i), - false, ) activeStakingEvents = append(activeStakingEvents, &activeStakingEvent) @@ -105,17 +98,14 @@ func buildActiveNStakingEvents(stakerHash string, numOfEvent int) []*client.Acti return activeStakingEvents } -func buildNUnbondingEvents(numOfEvent int) []*client.UnbondingStakingEvent { - var unbondingEvents []*client.UnbondingStakingEvent +func buildNUnbondingEvents(numOfEvent int) []*client.StakingEvent { + var unbondingEvents []*client.StakingEvent for i := 0; i < numOfEvent; i++ { - unbondingEv := client.NewUnbondingStakingEvent( + unbondingEv := client.NewUnbondingStakingEventV2( "0x1234567890abcdef"+fmt.Sprint(i), - uint64(i), - time.Now().Unix(), - 200+uint64(i), - uint64(0), "0xabcdef1234567890"+fmt.Sprint(i), - "0x1234567890abcdef"+fmt.Sprint(i), + []string{"0xabcdef1234567890" + fmt.Sprint(i)}, + 200+uint64(i), ) unbondingEvents = append(unbondingEvents, &unbondingEv) }