Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
- run:
name: Run tests
command: |
make tests
make test

workflows:
CI:
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ endif
BUILD_TARGETS := build install
BUILD_FLAGS := --tags "$(build_tags)" --ldflags '$(ldflags)'

.PHONY: tests
.PHONY: test

tests:
test:
./bin/local-startup.sh;
go test -v ./...
37 changes: 31 additions & 6 deletions client/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ const (
ConfirmedInfoEventType EventType = 7
)

// Event schema versions, only increment when the schema changes
const (
activeEventVersion int = 0
unbondingEventVersion int = 0
withdrawEventVersion int = 0
expiredEventVersion int = 0
statsEventVersion int = 0
btcInfoEventVersion int = 0
confirmedInfoEventVersion int = 0
)

type EventType int

type EventMessage interface {
Expand All @@ -28,6 +39,7 @@ type EventMessage interface {
}

type ActiveStakingEvent struct {
SchemaVersion int `json:"schema_version"`
EventType EventType `json:"event_type"` // always 1. ActiveStakingEventType
StakingTxHashHex string `json:"staking_tx_hash_hex"`
StakerPkHex string `json:"staker_pk_hex"`
Expand Down Expand Up @@ -62,6 +74,7 @@ func NewActiveStakingEvent(
isOverflow bool,
) ActiveStakingEvent {
return ActiveStakingEvent{
SchemaVersion: activeEventVersion,
EventType: ActiveStakingEventType,
StakingTxHashHex: stakingTxHashHex,
StakerPkHex: stakerPkHex,
Expand All @@ -77,6 +90,7 @@ func NewActiveStakingEvent(
}

type UnbondingStakingEvent struct {
SchemaVersion int `json:"schema_version"`
EventType EventType `json:"event_type"` // always 2. UnbondingStakingEventType
StakingTxHashHex string `json:"staking_tx_hash_hex"`
UnbondingStartHeight uint64 `json:"unbonding_start_height"`
Expand Down Expand Up @@ -105,6 +119,7 @@ func NewUnbondingStakingEvent(
unbondingTxHashHex string,
) UnbondingStakingEvent {
return UnbondingStakingEvent{
SchemaVersion: unbondingEventVersion,
EventType: UnbondingStakingEventType,
StakingTxHashHex: stakingTxHashHex,
UnbondingStartHeight: unbondingStartHeight,
Expand All @@ -117,6 +132,7 @@ func NewUnbondingStakingEvent(
}

type WithdrawStakingEvent struct {
SchemaVersion int `json:"schema_version"`
EventType EventType `json:"event_type"` // always 3. WithdrawStakingEventType
StakingTxHashHex string `json:"staking_tx_hash_hex"`
}
Expand All @@ -131,12 +147,14 @@ func (e WithdrawStakingEvent) GetStakingTxHashHex() string {

func NewWithdrawStakingEvent(stakingTxHashHex string) WithdrawStakingEvent {
return WithdrawStakingEvent{
SchemaVersion: withdrawEventVersion,
EventType: WithdrawStakingEventType,
StakingTxHashHex: stakingTxHashHex,
}
}

type ExpiredStakingEvent struct {
SchemaVersion int `json:"schema_version"`
EventType EventType `json:"event_type"` // always 4. ExpiredStakingEventType
StakingTxHashHex string `json:"staking_tx_hash_hex"`
TxType string `json:"tx_type"`
Expand All @@ -152,13 +170,15 @@ func (e ExpiredStakingEvent) GetStakingTxHashHex() string {

func NewExpiredStakingEvent(stakingTxHashHex string, txType string) ExpiredStakingEvent {
return ExpiredStakingEvent{
SchemaVersion: expiredEventVersion,
EventType: ExpiredStakingEventType,
StakingTxHashHex: stakingTxHashHex,
TxType: txType,
}
}

type StatsEvent struct {
SchemaVersion int `json:"schema_version"`
EventType EventType `json:"event_type"` // always 5. StatsEventType
StakingTxHashHex string `json:"staking_tx_hash_hex"`
StakerPkHex string `json:"staker_pk_hex"`
Expand All @@ -183,6 +203,7 @@ func NewStatsEvent(
state string,
) StatsEvent {
return StatsEvent{
SchemaVersion: statsEventVersion,
EventType: StatsEventType,
StakingTxHashHex: stakingTxHashHex,
StakerPkHex: stakerPkHex,
Expand All @@ -193,6 +214,7 @@ func NewStatsEvent(
}

type BtcInfoEvent struct {
SchemaVersion int `json:"schema_version"`
EventType EventType `json:"event_type"` // always 6. BtcInfoEventType
Height uint64 `json:"height"`
ConfirmedTvl uint64 `json:"confirmed_tvl"`
Expand All @@ -210,6 +232,7 @@ func (e BtcInfoEvent) GetStakingTxHashHex() string {

func NewBtcInfoEvent(height, confirmedTvl, unconfirmedTvl uint64) BtcInfoEvent {
return BtcInfoEvent{
SchemaVersion: btcInfoEventVersion,
EventType: BtcInfoEventType,
Height: height,
ConfirmedTvl: confirmedTvl,
Expand All @@ -218,9 +241,10 @@ func NewBtcInfoEvent(height, confirmedTvl, unconfirmedTvl uint64) BtcInfoEvent {
}

type ConfirmedInfoEvent struct {
EventType EventType `json:"event_type"` // always 7. ConfirmedInfoEventType
Height uint64 `json:"height"`
Tvl uint64 `json:"tvl"`
SchemaVersion int `json:"schema_version"`
EventType EventType `json:"event_type"` // always 7. ConfirmedInfoEventType
Height uint64 `json:"height"`
Tvl uint64 `json:"tvl"`
}

func (e ConfirmedInfoEvent) GetEventType() EventType {
Expand All @@ -234,8 +258,9 @@ func (e ConfirmedInfoEvent) GetStakingTxHashHex() string {

func NewConfirmedInfoEvent(height, tvl uint64) ConfirmedInfoEvent {
return ConfirmedInfoEvent{
EventType: ConfirmedInfoEventType,
Height: height,
Tvl: tvl,
SchemaVersion: confirmedInfoEventVersion,
EventType: ConfirmedInfoEventType,
Height: height,
Tvl: tvl,
}
}
15 changes: 15 additions & 0 deletions queuemngr/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ func (qc *QueueManager) Start() error {
return nil
}

func PushEvent[T any](qc *QueueManager, ev T) error {
jsonBytes, err := json.Marshal(ev)
if err != nil {
return err
}
messageBody := string(jsonBytes)

err = qc.StakingQueue.SendMessage(context.TODO(), messageBody)
if err != nil {
return fmt.Errorf("failed to push event: %w", err)
}

return nil
}

func (qc *QueueManager) PushStakingEvent(ev *client.ActiveStakingEvent) error {
jsonBytes, err := json.Marshal(ev)
if err != nil {
Expand Down
55 changes: 55 additions & 0 deletions tests/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/babylonlabs-io/staking-queue-client/client"
"github.com/babylonlabs-io/staking-queue-client/config"
"github.com/babylonlabs-io/staking-queue-client/queuemngr"
)

const (
Expand Down Expand Up @@ -66,6 +67,54 @@ func TestPing(t *testing.T) {
require.Contains(t, err.Error(), "rabbitMQ connection is closed", "Error message should indicate which queue failed")
}

func TestSchemaVersionBackwardsCompatibility(t *testing.T) {
type oldActiveStakingEvent struct {
EventType client.EventType `json:"event_type"`
StakingTxHashHex string `json:"staking_tx_hash_hex"`
StakerPkHex string `json:"staker_pk_hex"`
FinalityProviderPkHex string `json:"finality_provider_pk_hex"`
StakingValue uint64 `json:"staking_value"`
StakingStartHeight uint64 `json:"staking_start_height"`
StakingStartTimestamp int64 `json:"staking_start_timestamp"`
StakingTimeLock uint64 `json:"staking_timelock"`
StakingOutputIndex uint64 `json:"staking_output_index"`
StakingTxHex string `json:"staking_tx_hex"`
IsOverflow bool `json:"is_overflow"`
}

event := &oldActiveStakingEvent{
EventType: client.ActiveStakingEventType,
StakingTxHashHex: "0x1234567890abcdef",
StakerPkHex: "0x1234567890abcdef",
FinalityProviderPkHex: "0x1234567890abcdef",
StakingValue: 100,
StakingStartHeight: 100,
StakingStartTimestamp: 100,
StakingTimeLock: 100,
StakingOutputIndex: 100,
StakingTxHex: "0x1234567890abcdef",
IsOverflow: false,
}
queueCfg := config.DefaultQueueConfig()

testServer := setupTestQueueConsumer(t, queueCfg)
defer testServer.Stop(t)

queueManager := testServer.QueueManager
stakingEventReceivedChan, err := queueManager.StakingQueue.ReceiveMessages()
require.NoError(t, err)

err = queuemngr.PushEvent(queueManager, event)
require.NoError(t, err)
receivedEv := <-stakingEventReceivedChan
var stakingEv client.ActiveStakingEvent
err = json.Unmarshal([]byte(receivedEv.Body), &stakingEv)
require.NoError(t, err)
require.Equal(t, event.EventType, stakingEv.GetEventType())
require.Equal(t, event.StakingTxHashHex, stakingEv.GetStakingTxHashHex())
require.Equal(t, 0, stakingEv.SchemaVersion)
}

func TestStakingEvent(t *testing.T) {
numStakingEvents := 3
activeStakingEvents := buildActiveNStakingEvents(mockStakerHash, numStakingEvents)
Expand All @@ -88,6 +137,7 @@ func TestStakingEvent(t *testing.T) {
err := json.Unmarshal([]byte(receivedEv.Body), &stakingEv)
require.NoError(t, err)
require.Equal(t, ev, &stakingEv)
require.Equal(t, 0, stakingEv.SchemaVersion)
}
}

Expand All @@ -113,6 +163,7 @@ func TestUnbondingEvent(t *testing.T) {
err := json.Unmarshal([]byte(receivedEv.Body), &unbondingEv)
require.NoError(t, err)
require.Equal(t, ev, &unbondingEv)
require.Equal(t, 0, unbondingEv.SchemaVersion)
}
}

Expand All @@ -138,6 +189,7 @@ func TestWithdrawEvent(t *testing.T) {
err := json.Unmarshal([]byte(receivedEv.Body), &withdrawEv)
require.NoError(t, err)
require.Equal(t, ev, &withdrawEv)
require.Equal(t, 0, withdrawEv.SchemaVersion)
}
}

Expand All @@ -163,6 +215,7 @@ func TestExpiryEvent(t *testing.T) {
err := json.Unmarshal([]byte(receivedEv.Body), &expiryEvent)
require.NoError(t, err)
require.Equal(t, ev, &expiryEvent)
require.Equal(t, 0, expiryEvent.SchemaVersion)
}
}

Expand All @@ -188,6 +241,7 @@ func TestBtcInfoEvent(t *testing.T) {
err := json.Unmarshal([]byte(receivedEv.Body), &BtcInfoEvent)
require.NoError(t, err)
require.Equal(t, ev, &BtcInfoEvent)
require.Equal(t, 0, BtcInfoEvent.SchemaVersion)
}
}

Expand All @@ -213,6 +267,7 @@ func TestConfirmedInfoEvent(t *testing.T) {
err := json.Unmarshal([]byte(receivedEv.Body), &confirmedInfoEvent)
require.NoError(t, err)
require.Equal(t, ev, &confirmedInfoEvent)
require.Equal(t, 0, confirmedInfoEvent.SchemaVersion)
}
}

Expand Down