diff --git a/.circleci/config.yml b/.circleci/config.yml index a179517..4664cdc 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -30,7 +30,7 @@ jobs: - run: name: Run tests command: | - make tests + make test workflows: CI: diff --git a/Makefile b/Makefile index d498f45..a3a0bfb 100644 --- a/Makefile +++ b/Makefile @@ -17,8 +17,8 @@ endif BUILD_TARGETS := build install BUILD_FLAGS := --tags "$(build_tags)" --ldflags '$(ldflags)' -.PHONY: tests +.PHONY: test -tests: +test: ./bin/local-startup.sh; go test -v ./... diff --git a/client/schema.go b/client/schema.go index b1a60c8..0ef11ed 100644 --- a/client/schema.go +++ b/client/schema.go @@ -20,6 +20,17 @@ const ( ConfirmedInfoEventType EventType = 7 ) +// Event schema versions, only increment when the schema changes +const ( + activeEventVersion int = 0 + unbondingEventVersion int = 0 + withdrawEventVersion int = 0 + expiredEventVersion int = 0 + statsEventVersion int = 0 + btcInfoEventVersion int = 0 + confirmedInfoEventVersion int = 0 +) + type EventType int type EventMessage interface { @@ -28,6 +39,7 @@ type EventMessage interface { } type ActiveStakingEvent struct { + SchemaVersion int `json:"schema_version"` EventType EventType `json:"event_type"` // always 1. ActiveStakingEventType StakingTxHashHex string `json:"staking_tx_hash_hex"` StakerPkHex string `json:"staker_pk_hex"` @@ -62,6 +74,7 @@ func NewActiveStakingEvent( isOverflow bool, ) ActiveStakingEvent { return ActiveStakingEvent{ + SchemaVersion: activeEventVersion, EventType: ActiveStakingEventType, StakingTxHashHex: stakingTxHashHex, StakerPkHex: stakerPkHex, @@ -77,6 +90,7 @@ func NewActiveStakingEvent( } type UnbondingStakingEvent struct { + SchemaVersion int `json:"schema_version"` EventType EventType `json:"event_type"` // always 2. UnbondingStakingEventType StakingTxHashHex string `json:"staking_tx_hash_hex"` UnbondingStartHeight uint64 `json:"unbonding_start_height"` @@ -105,6 +119,7 @@ func NewUnbondingStakingEvent( unbondingTxHashHex string, ) UnbondingStakingEvent { return UnbondingStakingEvent{ + SchemaVersion: unbondingEventVersion, EventType: UnbondingStakingEventType, StakingTxHashHex: stakingTxHashHex, UnbondingStartHeight: unbondingStartHeight, @@ -117,6 +132,7 @@ func NewUnbondingStakingEvent( } type WithdrawStakingEvent struct { + SchemaVersion int `json:"schema_version"` EventType EventType `json:"event_type"` // always 3. WithdrawStakingEventType StakingTxHashHex string `json:"staking_tx_hash_hex"` } @@ -131,12 +147,14 @@ func (e WithdrawStakingEvent) GetStakingTxHashHex() string { func NewWithdrawStakingEvent(stakingTxHashHex string) WithdrawStakingEvent { return WithdrawStakingEvent{ + SchemaVersion: withdrawEventVersion, EventType: WithdrawStakingEventType, StakingTxHashHex: stakingTxHashHex, } } type ExpiredStakingEvent struct { + SchemaVersion int `json:"schema_version"` EventType EventType `json:"event_type"` // always 4. ExpiredStakingEventType StakingTxHashHex string `json:"staking_tx_hash_hex"` TxType string `json:"tx_type"` @@ -152,6 +170,7 @@ func (e ExpiredStakingEvent) GetStakingTxHashHex() string { func NewExpiredStakingEvent(stakingTxHashHex string, txType string) ExpiredStakingEvent { return ExpiredStakingEvent{ + SchemaVersion: expiredEventVersion, EventType: ExpiredStakingEventType, StakingTxHashHex: stakingTxHashHex, TxType: txType, @@ -159,6 +178,7 @@ func NewExpiredStakingEvent(stakingTxHashHex string, txType string) ExpiredStaki } type StatsEvent struct { + SchemaVersion int `json:"schema_version"` EventType EventType `json:"event_type"` // always 5. StatsEventType StakingTxHashHex string `json:"staking_tx_hash_hex"` StakerPkHex string `json:"staker_pk_hex"` @@ -183,6 +203,7 @@ func NewStatsEvent( state string, ) StatsEvent { return StatsEvent{ + SchemaVersion: statsEventVersion, EventType: StatsEventType, StakingTxHashHex: stakingTxHashHex, StakerPkHex: stakerPkHex, @@ -193,6 +214,7 @@ func NewStatsEvent( } type BtcInfoEvent struct { + SchemaVersion int `json:"schema_version"` EventType EventType `json:"event_type"` // always 6. BtcInfoEventType Height uint64 `json:"height"` ConfirmedTvl uint64 `json:"confirmed_tvl"` @@ -210,6 +232,7 @@ func (e BtcInfoEvent) GetStakingTxHashHex() string { func NewBtcInfoEvent(height, confirmedTvl, unconfirmedTvl uint64) BtcInfoEvent { return BtcInfoEvent{ + SchemaVersion: btcInfoEventVersion, EventType: BtcInfoEventType, Height: height, ConfirmedTvl: confirmedTvl, @@ -218,9 +241,10 @@ func NewBtcInfoEvent(height, confirmedTvl, unconfirmedTvl uint64) BtcInfoEvent { } type ConfirmedInfoEvent struct { - EventType EventType `json:"event_type"` // always 7. ConfirmedInfoEventType - Height uint64 `json:"height"` - Tvl uint64 `json:"tvl"` + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 7. ConfirmedInfoEventType + Height uint64 `json:"height"` + Tvl uint64 `json:"tvl"` } func (e ConfirmedInfoEvent) GetEventType() EventType { @@ -234,8 +258,9 @@ func (e ConfirmedInfoEvent) GetStakingTxHashHex() string { func NewConfirmedInfoEvent(height, tvl uint64) ConfirmedInfoEvent { return ConfirmedInfoEvent{ - EventType: ConfirmedInfoEventType, - Height: height, - Tvl: tvl, + SchemaVersion: confirmedInfoEventVersion, + EventType: ConfirmedInfoEventType, + Height: height, + Tvl: tvl, } } diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index c42f9ad..de4a40e 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -77,6 +77,21 @@ func (qc *QueueManager) Start() error { return nil } +func PushEvent[T any](qc *QueueManager, ev T) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + err = qc.StakingQueue.SendMessage(context.TODO(), messageBody) + if err != nil { + return fmt.Errorf("failed to push event: %w", err) + } + + return nil +} + func (qc *QueueManager) PushStakingEvent(ev *client.ActiveStakingEvent) error { jsonBytes, err := json.Marshal(ev) if err != nil { diff --git a/tests/e2e_test.go b/tests/e2e_test.go index 2b29202..0d3e534 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -10,6 +10,7 @@ import ( "github.com/babylonlabs-io/staking-queue-client/client" "github.com/babylonlabs-io/staking-queue-client/config" + "github.com/babylonlabs-io/staking-queue-client/queuemngr" ) const ( @@ -66,6 +67,54 @@ func TestPing(t *testing.T) { require.Contains(t, err.Error(), "rabbitMQ connection is closed", "Error message should indicate which queue failed") } +func TestSchemaVersionBackwardsCompatibility(t *testing.T) { + type oldActiveStakingEvent struct { + EventType client.EventType `json:"event_type"` + StakingTxHashHex string `json:"staking_tx_hash_hex"` + StakerPkHex string `json:"staker_pk_hex"` + FinalityProviderPkHex string `json:"finality_provider_pk_hex"` + StakingValue uint64 `json:"staking_value"` + StakingStartHeight uint64 `json:"staking_start_height"` + StakingStartTimestamp int64 `json:"staking_start_timestamp"` + StakingTimeLock uint64 `json:"staking_timelock"` + StakingOutputIndex uint64 `json:"staking_output_index"` + StakingTxHex string `json:"staking_tx_hex"` + IsOverflow bool `json:"is_overflow"` + } + + event := &oldActiveStakingEvent{ + EventType: client.ActiveStakingEventType, + StakingTxHashHex: "0x1234567890abcdef", + StakerPkHex: "0x1234567890abcdef", + FinalityProviderPkHex: "0x1234567890abcdef", + StakingValue: 100, + StakingStartHeight: 100, + StakingStartTimestamp: 100, + StakingTimeLock: 100, + StakingOutputIndex: 100, + StakingTxHex: "0x1234567890abcdef", + IsOverflow: false, + } + queueCfg := config.DefaultQueueConfig() + + testServer := setupTestQueueConsumer(t, queueCfg) + defer testServer.Stop(t) + + queueManager := testServer.QueueManager + stakingEventReceivedChan, err := queueManager.StakingQueue.ReceiveMessages() + require.NoError(t, err) + + err = queuemngr.PushEvent(queueManager, event) + require.NoError(t, err) + receivedEv := <-stakingEventReceivedChan + var stakingEv client.ActiveStakingEvent + err = json.Unmarshal([]byte(receivedEv.Body), &stakingEv) + require.NoError(t, err) + require.Equal(t, event.EventType, stakingEv.GetEventType()) + require.Equal(t, event.StakingTxHashHex, stakingEv.GetStakingTxHashHex()) + require.Equal(t, 0, stakingEv.SchemaVersion) +} + func TestStakingEvent(t *testing.T) { numStakingEvents := 3 activeStakingEvents := buildActiveNStakingEvents(mockStakerHash, numStakingEvents) @@ -88,6 +137,7 @@ func TestStakingEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &stakingEv) require.NoError(t, err) require.Equal(t, ev, &stakingEv) + require.Equal(t, 0, stakingEv.SchemaVersion) } } @@ -113,6 +163,7 @@ func TestUnbondingEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &unbondingEv) require.NoError(t, err) require.Equal(t, ev, &unbondingEv) + require.Equal(t, 0, unbondingEv.SchemaVersion) } } @@ -138,6 +189,7 @@ func TestWithdrawEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &withdrawEv) require.NoError(t, err) require.Equal(t, ev, &withdrawEv) + require.Equal(t, 0, withdrawEv.SchemaVersion) } } @@ -163,6 +215,7 @@ func TestExpiryEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &expiryEvent) require.NoError(t, err) require.Equal(t, ev, &expiryEvent) + require.Equal(t, 0, expiryEvent.SchemaVersion) } } @@ -188,6 +241,7 @@ func TestBtcInfoEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &BtcInfoEvent) require.NoError(t, err) require.Equal(t, ev, &BtcInfoEvent) + require.Equal(t, 0, BtcInfoEvent.SchemaVersion) } } @@ -213,6 +267,7 @@ func TestConfirmedInfoEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &confirmedInfoEvent) require.NoError(t, err) require.Equal(t, ev, &confirmedInfoEvent) + require.Equal(t, 0, confirmedInfoEvent.SchemaVersion) } }