From f561d31efa79aa5a34c6afe2fead2e3ba8da269d Mon Sep 17 00:00:00 2001 From: wjrjerome Date: Fri, 9 Aug 2024 14:16:20 +1000 Subject: [PATCH 1/4] feat: event schema to have versions --- client/schema.go | 66 ++++++++++++++++++++++++++++++++++++++++++----- tests/e2e_test.go | 6 +++++ 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/client/schema.go b/client/schema.go index b1a60c8..863e9da 100644 --- a/client/schema.go +++ b/client/schema.go @@ -20,14 +20,27 @@ const ( ConfirmedInfoEventType EventType = 7 ) +// Event schema versions, only increment when the schema changes +const ( + activeEventVersion int = 0 + unbondingEventVersion int = 0 + withdrawEventVersion int = 0 + expiredEventVersion int = 0 + statsEventVersion int = 0 + btcInfoEventVersion int = 0 + confirmedInfoEventVersion int = 0 +) + type EventType int type EventMessage interface { GetEventType() EventType GetStakingTxHashHex() string + GetEventSchemaVersion() int } type ActiveStakingEvent struct { + SchemaVersion int `json:"schema_version"` EventType EventType `json:"event_type"` // always 1. ActiveStakingEventType StakingTxHashHex string `json:"staking_tx_hash_hex"` StakerPkHex string `json:"staker_pk_hex"` @@ -49,6 +62,10 @@ func (e ActiveStakingEvent) GetStakingTxHashHex() string { return e.StakingTxHashHex } +func (e ActiveStakingEvent) GetEventSchemaVersion() int { + return e.SchemaVersion +} + func NewActiveStakingEvent( stakingTxHashHex string, stakerPkHex string, @@ -62,6 +79,7 @@ func NewActiveStakingEvent( isOverflow bool, ) ActiveStakingEvent { return ActiveStakingEvent{ + SchemaVersion: activeEventVersion, EventType: ActiveStakingEventType, StakingTxHashHex: stakingTxHashHex, StakerPkHex: stakerPkHex, @@ -77,6 +95,7 @@ func NewActiveStakingEvent( } type UnbondingStakingEvent struct { + SchemaVersion int `json:"schema_version"` EventType EventType `json:"event_type"` // always 2. UnbondingStakingEventType StakingTxHashHex string `json:"staking_tx_hash_hex"` UnbondingStartHeight uint64 `json:"unbonding_start_height"` @@ -95,6 +114,10 @@ func (e UnbondingStakingEvent) GetStakingTxHashHex() string { return e.StakingTxHashHex } +func (e UnbondingStakingEvent) GetEventSchemaVersion() int { + return e.SchemaVersion +} + func NewUnbondingStakingEvent( stakingTxHashHex string, unbondingStartHeight uint64, @@ -105,6 +128,7 @@ func NewUnbondingStakingEvent( unbondingTxHashHex string, ) UnbondingStakingEvent { return UnbondingStakingEvent{ + SchemaVersion: unbondingEventVersion, EventType: UnbondingStakingEventType, StakingTxHashHex: stakingTxHashHex, UnbondingStartHeight: unbondingStartHeight, @@ -117,6 +141,7 @@ func NewUnbondingStakingEvent( } type WithdrawStakingEvent struct { + SchemaVersion int `json:"schema_version"` EventType EventType `json:"event_type"` // always 3. WithdrawStakingEventType StakingTxHashHex string `json:"staking_tx_hash_hex"` } @@ -129,14 +154,20 @@ func (e WithdrawStakingEvent) GetStakingTxHashHex() string { return e.StakingTxHashHex } +func (e WithdrawStakingEvent) GetEventSchemaVersion() int { + return withdrawEventVersion +} + func NewWithdrawStakingEvent(stakingTxHashHex string) WithdrawStakingEvent { return WithdrawStakingEvent{ + SchemaVersion: withdrawEventVersion, EventType: WithdrawStakingEventType, StakingTxHashHex: stakingTxHashHex, } } type ExpiredStakingEvent struct { + SchemaVersion int `json:"schema_version"` EventType EventType `json:"event_type"` // always 4. ExpiredStakingEventType StakingTxHashHex string `json:"staking_tx_hash_hex"` TxType string `json:"tx_type"` @@ -150,8 +181,13 @@ func (e ExpiredStakingEvent) GetStakingTxHashHex() string { return e.StakingTxHashHex } +func (e ExpiredStakingEvent) GetEventSchemaVersion() int { + return expiredEventVersion +} + func NewExpiredStakingEvent(stakingTxHashHex string, txType string) ExpiredStakingEvent { return ExpiredStakingEvent{ + SchemaVersion: expiredEventVersion, EventType: ExpiredStakingEventType, StakingTxHashHex: stakingTxHashHex, TxType: txType, @@ -159,6 +195,7 @@ func NewExpiredStakingEvent(stakingTxHashHex string, txType string) ExpiredStaki } type StatsEvent struct { + SchemaVersion int `json:"schema_version"` EventType EventType `json:"event_type"` // always 5. StatsEventType StakingTxHashHex string `json:"staking_tx_hash_hex"` StakerPkHex string `json:"staker_pk_hex"` @@ -175,6 +212,10 @@ func (e StatsEvent) GetStakingTxHashHex() string { return e.StakingTxHashHex } +func (e StatsEvent) GetEventSchemaVersion() int { + return statsEventVersion +} + func NewStatsEvent( stakingTxHashHex string, stakerPkHex string, @@ -183,6 +224,7 @@ func NewStatsEvent( state string, ) StatsEvent { return StatsEvent{ + SchemaVersion: statsEventVersion, EventType: StatsEventType, StakingTxHashHex: stakingTxHashHex, StakerPkHex: stakerPkHex, @@ -193,6 +235,7 @@ func NewStatsEvent( } type BtcInfoEvent struct { + SchemaVersion int `json:"schema_version"` EventType EventType `json:"event_type"` // always 6. BtcInfoEventType Height uint64 `json:"height"` ConfirmedTvl uint64 `json:"confirmed_tvl"` @@ -208,8 +251,13 @@ func (e BtcInfoEvent) GetStakingTxHashHex() string { return "" } +func (e BtcInfoEvent) GetEventSchemaVersion() int { + return btcInfoEventVersion +} + func NewBtcInfoEvent(height, confirmedTvl, unconfirmedTvl uint64) BtcInfoEvent { return BtcInfoEvent{ + SchemaVersion: btcInfoEventVersion, EventType: BtcInfoEventType, Height: height, ConfirmedTvl: confirmedTvl, @@ -218,9 +266,10 @@ func NewBtcInfoEvent(height, confirmedTvl, unconfirmedTvl uint64) BtcInfoEvent { } type ConfirmedInfoEvent struct { - EventType EventType `json:"event_type"` // always 7. ConfirmedInfoEventType - Height uint64 `json:"height"` - Tvl uint64 `json:"tvl"` + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 7. ConfirmedInfoEventType + Height uint64 `json:"height"` + Tvl uint64 `json:"tvl"` } func (e ConfirmedInfoEvent) GetEventType() EventType { @@ -232,10 +281,15 @@ func (e ConfirmedInfoEvent) GetStakingTxHashHex() string { return "" } +func (e ConfirmedInfoEvent) GetEventSchemaVersion() int { + return confirmedInfoEventVersion +} + func NewConfirmedInfoEvent(height, tvl uint64) ConfirmedInfoEvent { return ConfirmedInfoEvent{ - EventType: ConfirmedInfoEventType, - Height: height, - Tvl: tvl, + SchemaVersion: confirmedInfoEventVersion, + EventType: ConfirmedInfoEventType, + Height: height, + Tvl: tvl, } } diff --git a/tests/e2e_test.go b/tests/e2e_test.go index 2b29202..0be00ec 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -88,6 +88,7 @@ func TestStakingEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &stakingEv) require.NoError(t, err) require.Equal(t, ev, &stakingEv) + require.Equal(t, 0, stakingEv.GetEventSchemaVersion()) } } @@ -113,6 +114,7 @@ func TestUnbondingEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &unbondingEv) require.NoError(t, err) require.Equal(t, ev, &unbondingEv) + require.Equal(t, 0, unbondingEv.GetEventSchemaVersion()) } } @@ -138,6 +140,7 @@ func TestWithdrawEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &withdrawEv) require.NoError(t, err) require.Equal(t, ev, &withdrawEv) + require.Equal(t, 0, withdrawEv.GetEventSchemaVersion()) } } @@ -163,6 +166,7 @@ func TestExpiryEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &expiryEvent) require.NoError(t, err) require.Equal(t, ev, &expiryEvent) + require.Equal(t, 0, expiryEvent.GetEventSchemaVersion()) } } @@ -188,6 +192,7 @@ func TestBtcInfoEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &BtcInfoEvent) require.NoError(t, err) require.Equal(t, ev, &BtcInfoEvent) + require.Equal(t, 0, BtcInfoEvent.GetEventSchemaVersion()) } } @@ -213,6 +218,7 @@ func TestConfirmedInfoEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &confirmedInfoEvent) require.NoError(t, err) require.Equal(t, ev, &confirmedInfoEvent) + require.Equal(t, 0, confirmedInfoEvent.GetEventSchemaVersion()) } } From 6f180b65e5e795181793e37efd81309f47ebba6e Mon Sep 17 00:00:00 2001 From: wjrjerome Date: Fri, 9 Aug 2024 15:28:21 +1000 Subject: [PATCH 2/4] remove get schema version method --- client/schema.go | 29 ----------------------------- tests/e2e_test.go | 12 ++++++------ 2 files changed, 6 insertions(+), 35 deletions(-) diff --git a/client/schema.go b/client/schema.go index 863e9da..0ef11ed 100644 --- a/client/schema.go +++ b/client/schema.go @@ -36,7 +36,6 @@ type EventType int type EventMessage interface { GetEventType() EventType GetStakingTxHashHex() string - GetEventSchemaVersion() int } type ActiveStakingEvent struct { @@ -62,10 +61,6 @@ func (e ActiveStakingEvent) GetStakingTxHashHex() string { return e.StakingTxHashHex } -func (e ActiveStakingEvent) GetEventSchemaVersion() int { - return e.SchemaVersion -} - func NewActiveStakingEvent( stakingTxHashHex string, stakerPkHex string, @@ -114,10 +109,6 @@ func (e UnbondingStakingEvent) GetStakingTxHashHex() string { return e.StakingTxHashHex } -func (e UnbondingStakingEvent) GetEventSchemaVersion() int { - return e.SchemaVersion -} - func NewUnbondingStakingEvent( stakingTxHashHex string, unbondingStartHeight uint64, @@ -154,10 +145,6 @@ func (e WithdrawStakingEvent) GetStakingTxHashHex() string { return e.StakingTxHashHex } -func (e WithdrawStakingEvent) GetEventSchemaVersion() int { - return withdrawEventVersion -} - func NewWithdrawStakingEvent(stakingTxHashHex string) WithdrawStakingEvent { return WithdrawStakingEvent{ SchemaVersion: withdrawEventVersion, @@ -181,10 +168,6 @@ func (e ExpiredStakingEvent) GetStakingTxHashHex() string { return e.StakingTxHashHex } -func (e ExpiredStakingEvent) GetEventSchemaVersion() int { - return expiredEventVersion -} - func NewExpiredStakingEvent(stakingTxHashHex string, txType string) ExpiredStakingEvent { return ExpiredStakingEvent{ SchemaVersion: expiredEventVersion, @@ -212,10 +195,6 @@ func (e StatsEvent) GetStakingTxHashHex() string { return e.StakingTxHashHex } -func (e StatsEvent) GetEventSchemaVersion() int { - return statsEventVersion -} - func NewStatsEvent( stakingTxHashHex string, stakerPkHex string, @@ -251,10 +230,6 @@ func (e BtcInfoEvent) GetStakingTxHashHex() string { return "" } -func (e BtcInfoEvent) GetEventSchemaVersion() int { - return btcInfoEventVersion -} - func NewBtcInfoEvent(height, confirmedTvl, unconfirmedTvl uint64) BtcInfoEvent { return BtcInfoEvent{ SchemaVersion: btcInfoEventVersion, @@ -281,10 +256,6 @@ func (e ConfirmedInfoEvent) GetStakingTxHashHex() string { return "" } -func (e ConfirmedInfoEvent) GetEventSchemaVersion() int { - return confirmedInfoEventVersion -} - func NewConfirmedInfoEvent(height, tvl uint64) ConfirmedInfoEvent { return ConfirmedInfoEvent{ SchemaVersion: confirmedInfoEventVersion, diff --git a/tests/e2e_test.go b/tests/e2e_test.go index 0be00ec..27ed963 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -88,7 +88,7 @@ func TestStakingEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &stakingEv) require.NoError(t, err) require.Equal(t, ev, &stakingEv) - require.Equal(t, 0, stakingEv.GetEventSchemaVersion()) + require.Equal(t, 0, stakingEv.SchemaVersion) } } @@ -114,7 +114,7 @@ func TestUnbondingEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &unbondingEv) require.NoError(t, err) require.Equal(t, ev, &unbondingEv) - require.Equal(t, 0, unbondingEv.GetEventSchemaVersion()) + require.Equal(t, 0, unbondingEv.SchemaVersion) } } @@ -140,7 +140,7 @@ func TestWithdrawEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &withdrawEv) require.NoError(t, err) require.Equal(t, ev, &withdrawEv) - require.Equal(t, 0, withdrawEv.GetEventSchemaVersion()) + require.Equal(t, 0, withdrawEv.SchemaVersion) } } @@ -166,7 +166,7 @@ func TestExpiryEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &expiryEvent) require.NoError(t, err) require.Equal(t, ev, &expiryEvent) - require.Equal(t, 0, expiryEvent.GetEventSchemaVersion()) + require.Equal(t, 0, expiryEvent.SchemaVersion) } } @@ -192,7 +192,7 @@ func TestBtcInfoEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &BtcInfoEvent) require.NoError(t, err) require.Equal(t, ev, &BtcInfoEvent) - require.Equal(t, 0, BtcInfoEvent.GetEventSchemaVersion()) + require.Equal(t, 0, BtcInfoEvent.SchemaVersion) } } @@ -218,7 +218,7 @@ func TestConfirmedInfoEvent(t *testing.T) { err := json.Unmarshal([]byte(receivedEv.Body), &confirmedInfoEvent) require.NoError(t, err) require.Equal(t, ev, &confirmedInfoEvent) - require.Equal(t, 0, confirmedInfoEvent.GetEventSchemaVersion()) + require.Equal(t, 0, confirmedInfoEvent.SchemaVersion) } } From 5e9ae4b487aabb38dc3ffd953bd93b627b72dabf Mon Sep 17 00:00:00 2001 From: wjrjerome Date: Fri, 9 Aug 2024 16:02:13 +1000 Subject: [PATCH 3/4] add schema version backwards compatibility test --- queuemngr/queue_manager.go | 15 ++++++++++++ tests/e2e_test.go | 49 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/queuemngr/queue_manager.go b/queuemngr/queue_manager.go index c42f9ad..de4a40e 100644 --- a/queuemngr/queue_manager.go +++ b/queuemngr/queue_manager.go @@ -77,6 +77,21 @@ func (qc *QueueManager) Start() error { return nil } +func PushEvent[T any](qc *QueueManager, ev T) error { + jsonBytes, err := json.Marshal(ev) + if err != nil { + return err + } + messageBody := string(jsonBytes) + + err = qc.StakingQueue.SendMessage(context.TODO(), messageBody) + if err != nil { + return fmt.Errorf("failed to push event: %w", err) + } + + return nil +} + func (qc *QueueManager) PushStakingEvent(ev *client.ActiveStakingEvent) error { jsonBytes, err := json.Marshal(ev) if err != nil { diff --git a/tests/e2e_test.go b/tests/e2e_test.go index 27ed963..0d3e534 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -10,6 +10,7 @@ import ( "github.com/babylonlabs-io/staking-queue-client/client" "github.com/babylonlabs-io/staking-queue-client/config" + "github.com/babylonlabs-io/staking-queue-client/queuemngr" ) const ( @@ -66,6 +67,54 @@ func TestPing(t *testing.T) { require.Contains(t, err.Error(), "rabbitMQ connection is closed", "Error message should indicate which queue failed") } +func TestSchemaVersionBackwardsCompatibility(t *testing.T) { + type oldActiveStakingEvent struct { + EventType client.EventType `json:"event_type"` + StakingTxHashHex string `json:"staking_tx_hash_hex"` + StakerPkHex string `json:"staker_pk_hex"` + FinalityProviderPkHex string `json:"finality_provider_pk_hex"` + StakingValue uint64 `json:"staking_value"` + StakingStartHeight uint64 `json:"staking_start_height"` + StakingStartTimestamp int64 `json:"staking_start_timestamp"` + StakingTimeLock uint64 `json:"staking_timelock"` + StakingOutputIndex uint64 `json:"staking_output_index"` + StakingTxHex string `json:"staking_tx_hex"` + IsOverflow bool `json:"is_overflow"` + } + + event := &oldActiveStakingEvent{ + EventType: client.ActiveStakingEventType, + StakingTxHashHex: "0x1234567890abcdef", + StakerPkHex: "0x1234567890abcdef", + FinalityProviderPkHex: "0x1234567890abcdef", + StakingValue: 100, + StakingStartHeight: 100, + StakingStartTimestamp: 100, + StakingTimeLock: 100, + StakingOutputIndex: 100, + StakingTxHex: "0x1234567890abcdef", + IsOverflow: false, + } + queueCfg := config.DefaultQueueConfig() + + testServer := setupTestQueueConsumer(t, queueCfg) + defer testServer.Stop(t) + + queueManager := testServer.QueueManager + stakingEventReceivedChan, err := queueManager.StakingQueue.ReceiveMessages() + require.NoError(t, err) + + err = queuemngr.PushEvent(queueManager, event) + require.NoError(t, err) + receivedEv := <-stakingEventReceivedChan + var stakingEv client.ActiveStakingEvent + err = json.Unmarshal([]byte(receivedEv.Body), &stakingEv) + require.NoError(t, err) + require.Equal(t, event.EventType, stakingEv.GetEventType()) + require.Equal(t, event.StakingTxHashHex, stakingEv.GetStakingTxHashHex()) + require.Equal(t, 0, stakingEv.SchemaVersion) +} + func TestStakingEvent(t *testing.T) { numStakingEvents := 3 activeStakingEvents := buildActiveNStakingEvents(mockStakerHash, numStakingEvents) From dca157e99146e62f9d2f24b1936d563c444551f0 Mon Sep 17 00:00:00 2001 From: wjrjerome Date: Fri, 9 Aug 2024 16:05:02 +1000 Subject: [PATCH 4/4] rename to make test instead of tests --- .circleci/config.yml | 2 +- Makefile | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index a179517..4664cdc 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -30,7 +30,7 @@ jobs: - run: name: Run tests command: | - make tests + make test workflows: CI: diff --git a/Makefile b/Makefile index d498f45..a3a0bfb 100644 --- a/Makefile +++ b/Makefile @@ -17,8 +17,8 @@ endif BUILD_TARGETS := build install BUILD_FLAGS := --tags "$(build_tags)" --ldflags '$(ldflags)' -.PHONY: tests +.PHONY: test -tests: +test: ./bin/local-startup.sh; go test -v ./...