Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions client/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ const (

// Event schema versions, only increment when the schema changes
const (
activeEventVersion int = 0
unbondingEventVersion int = 0
withdrawEventVersion int = 0
expiredEventVersion int = 0
statsEventVersion int = 0
btcInfoEventVersion int = 0
confirmedInfoEventVersion int = 0
ActiveEventVersion int = 0
UnbondingEventVersion int = 0
WithdrawEventVersion int = 0
ExpiredEventVersion int = 0
StatsEventVersion int = 1
BtcInfoEventVersion int = 0
ConfirmedInfoEventVersion int = 0
)

type EventType int
Expand Down Expand Up @@ -74,7 +74,7 @@ func NewActiveStakingEvent(
isOverflow bool,
) ActiveStakingEvent {
return ActiveStakingEvent{
SchemaVersion: activeEventVersion,
SchemaVersion: ActiveEventVersion,
EventType: ActiveStakingEventType,
StakingTxHashHex: stakingTxHashHex,
StakerPkHex: stakerPkHex,
Expand Down Expand Up @@ -119,7 +119,7 @@ func NewUnbondingStakingEvent(
unbondingTxHashHex string,
) UnbondingStakingEvent {
return UnbondingStakingEvent{
SchemaVersion: unbondingEventVersion,
SchemaVersion: UnbondingEventVersion,
EventType: UnbondingStakingEventType,
StakingTxHashHex: stakingTxHashHex,
UnbondingStartHeight: unbondingStartHeight,
Expand Down Expand Up @@ -147,7 +147,7 @@ func (e WithdrawStakingEvent) GetStakingTxHashHex() string {

func NewWithdrawStakingEvent(stakingTxHashHex string) WithdrawStakingEvent {
return WithdrawStakingEvent{
SchemaVersion: withdrawEventVersion,
SchemaVersion: WithdrawEventVersion,
EventType: WithdrawStakingEventType,
StakingTxHashHex: stakingTxHashHex,
}
Expand All @@ -170,7 +170,7 @@ func (e ExpiredStakingEvent) GetStakingTxHashHex() string {

func NewExpiredStakingEvent(stakingTxHashHex string, txType string) ExpiredStakingEvent {
return ExpiredStakingEvent{
SchemaVersion: expiredEventVersion,
SchemaVersion: ExpiredEventVersion,
EventType: ExpiredStakingEventType,
StakingTxHashHex: stakingTxHashHex,
TxType: txType,
Expand All @@ -185,6 +185,7 @@ type StatsEvent struct {
FinalityProviderPkHex string `json:"finality_provider_pk_hex"`
StakingValue uint64 `json:"staking_value"`
State string `json:"state"`
IsOverflow bool `json:"is_overflow"`
}

func (e StatsEvent) GetEventType() EventType {
Expand All @@ -201,15 +202,17 @@ func NewStatsEvent(
finalityProviderPkHex string,
stakingValue uint64,
state string,
isOverflow bool,
) StatsEvent {
return StatsEvent{
SchemaVersion: statsEventVersion,
SchemaVersion: StatsEventVersion,
EventType: StatsEventType,
StakingTxHashHex: stakingTxHashHex,
StakerPkHex: stakerPkHex,
FinalityProviderPkHex: finalityProviderPkHex,
StakingValue: stakingValue,
State: state,
IsOverflow: isOverflow,
}
}

Expand All @@ -232,7 +235,7 @@ func (e BtcInfoEvent) GetStakingTxHashHex() string {

func NewBtcInfoEvent(height, confirmedTvl, unconfirmedTvl uint64) BtcInfoEvent {
return BtcInfoEvent{
SchemaVersion: btcInfoEventVersion,
SchemaVersion: BtcInfoEventVersion,
EventType: BtcInfoEventType,
Height: height,
ConfirmedTvl: confirmedTvl,
Expand All @@ -258,7 +261,7 @@ func (e ConfirmedInfoEvent) GetStakingTxHashHex() string {

func NewConfirmedInfoEvent(height, tvl uint64) ConfirmedInfoEvent {
return ConfirmedInfoEvent{
SchemaVersion: confirmedInfoEventVersion,
SchemaVersion: ConfirmedInfoEventVersion,
EventType: ConfirmedInfoEventType,
Height: height,
Tvl: tvl,
Expand Down
4 changes: 2 additions & 2 deletions queuemngr/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ func (qc *QueueManager) Start() error {
return nil
}

func PushEvent[T any](qc *QueueManager, ev T) error {
func PushEvent[T any](queueClient client.QueueClient, ev T) error {
jsonBytes, err := json.Marshal(ev)
if err != nil {
return err
}
messageBody := string(jsonBytes)

err = qc.StakingQueue.SendMessage(context.TODO(), messageBody)
err = queueClient.SendMessage(context.TODO(), messageBody)
if err != nil {
return fmt.Errorf("failed to push event: %w", err)
}
Expand Down
28 changes: 27 additions & 1 deletion tests/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestSchemaVersionBackwardsCompatibility(t *testing.T) {
stakingEventReceivedChan, err := queueManager.StakingQueue.ReceiveMessages()
require.NoError(t, err)

err = queuemngr.PushEvent(queueManager, event)
err = queuemngr.PushEvent(queueManager.StakingQueue, event)
require.NoError(t, err)
receivedEv := <-stakingEventReceivedChan
var stakingEv client.ActiveStakingEvent
Expand Down Expand Up @@ -193,6 +193,32 @@ func TestWithdrawEvent(t *testing.T) {
}
}

func TestStatsEvent(t *testing.T) {
numEvents := 3
statsEvents := buildNStatsEvents(mockStakerHash, numEvents)
queueCfg := config.DefaultQueueConfig()

testServer := setupTestQueueConsumer(t, queueCfg)
defer testServer.Stop(t)

queueManager := testServer.QueueManager

eventReceivedChan, err := queueManager.StatsQueue.ReceiveMessages()
require.NoError(t, err)

for _, ev := range statsEvents {
err = queuemngr.PushEvent(queueManager.StatsQueue, ev)
require.NoError(t, err)

receivedEv := <-eventReceivedChan
var statsEv client.StatsEvent
err := json.Unmarshal([]byte(receivedEv.Body), &statsEv)
require.NoError(t, err)
require.Equal(t, ev, &statsEv)
require.Equal(t, 1, statsEv.SchemaVersion)
}
}

func TestExpiryEvent(t *testing.T) {
numExpiryEvents := 3
expiryEvents := buildNExpiryEvents(numExpiryEvents)
Expand Down
17 changes: 17 additions & 0 deletions tests/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,23 @@ func buildNExpiryEvents(numOfEvent int) []*client.ExpiredStakingEvent {
return expiryEvents
}

func buildNStatsEvents(stakerHash string, numOfEvent int) []*client.StatsEvent {
var statsEvents []*client.StatsEvent
for i := 0; i < numOfEvent; i++ {
activeStakingEvent := client.NewStatsEvent(
"0x1234567890abcdef"+fmt.Sprint(i),
stakerHash,
"0xabcdef1234567890"+fmt.Sprint(i),
1+uint64(i),
"active",
false,
)

statsEvents = append(statsEvents, &activeStakingEvent)
}
return statsEvents
}

func buildNBtcInfoEvents(numOfEvent int) []*client.BtcInfoEvent {
var btcInfoEvents []*client.BtcInfoEvent
for i := 0; i < numOfEvent; i++ {
Expand Down