Skip to content

Commit

Permalink
[OTE-176] Reduce the cost of adding an indexer event. (#1078)
Browse files Browse the repository at this point in the history
Currently we unmarshalled and marshalled all of them which grew linearly with the number of messages so as blocks got larger this became a noticeable cost.
  • Loading branch information
lcwik authored Feb 22, 2024
1 parent dde5e67 commit 524d60f
Show file tree
Hide file tree
Showing 16 changed files with 67 additions and 148 deletions.
21 changes: 0 additions & 21 deletions protocol/indexer/common/marshaler.go

This file was deleted.

21 changes: 0 additions & 21 deletions protocol/indexer/common/unmarshaler.go

This file was deleted.

65 changes: 35 additions & 30 deletions protocol/indexer/indexer_manager/events.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package indexer_manager

import (
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"encoding/binary"
"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
)
Expand All @@ -15,37 +16,44 @@ const (
// TransientStoreKey is the transient store key for indexer events.
TransientStoreKey = "transient_indexer_events"

// IndexerEventsKey is the key to retrieve the indexer events
// within the last block.
IndexerEventsKey = "IndexerEvents"
// IndexerEventsCountKey is the key to retrieve the count of the indexer events
// within the last block. Each individual event is stored at a big endian encoded
// uint32 starting from 0 upto and not including count.
IndexerEventsCountKey = "c"
IndexerEventsPrefix = "e"

ModuleName = "indexer_events"
)

func getIndexerEvents(ctx sdk.Context, storeKey storetypes.StoreKey) []*IndexerTendermintEventWrapper {
// This is necessary to prevent GasConsumed from being incremented when indexer events are recorded.
// Without this, consensus failure would occur due to lastResultsHash mismatch from different gas costs.
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(storeKey)
indexerEventsSliceBytes := store.Get([]byte(IndexerEventsKey))
if indexerEventsSliceBytes == nil {
return []*IndexerTendermintEventWrapper{}
func getIndexerEventsCount(noGasCtx sdk.Context, store storetypes.KVStore) uint32 {
countsBytes := store.Get([]byte(IndexerEventsCountKey))
if countsBytes == nil {
return 0
}
var events IndexerEventsStoreValue
unmarshaler := &common.UnmarshalerImpl{}
err := unmarshaler.Unmarshal(indexerEventsSliceBytes, &events)
if err != nil {
panic(err)
return binary.BigEndian.Uint32(countsBytes)
}

func getIndexerEvents(noGasCtx sdk.Context, storeKey storetypes.StoreKey) []*IndexerTendermintEventWrapper {
store := noGasCtx.TransientStore(storeKey)
count := getIndexerEventsCount(noGasCtx, store)
events := make([]*IndexerTendermintEventWrapper, count)
store = prefix.NewStore(store, []byte(IndexerEventsPrefix))
for i := uint32(0); i < count; i++ {
var event IndexerTendermintEventWrapper
bytes := store.Get(lib.Uint32ToKey(i))
if err := proto.Unmarshal(bytes, &event); err != nil {
panic(err)
}
events[i] = &event
}
return events.Events
return events
}

// GetBytes returns the marshaled bytes of the event message.
func GetBytes(
eventMessage proto.Message,
) []byte {
marshaler := &common.MarshalerImpl{}
eventMessageBytes, err := marshaler.Marshal(eventMessage)
eventMessageBytes, err := proto.Marshal(eventMessage)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -101,18 +109,15 @@ func addEvent(
storeKey storetypes.StoreKey,
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
indexerEvents := getIndexerEvents(noGasCtx, storeKey)
indexerEvents = append(indexerEvents, &event)
newEventsValue := IndexerEventsStoreValue{
Events: indexerEvents,
}
marshaler := &common.MarshalerImpl{}
newEventsValueBytes, err := marshaler.Marshal(&newEventsValue)
store := noGasCtx.TransientStore(storeKey)
count := getIndexerEventsCount(noGasCtx, store)
b, err := proto.Marshal(&event)
if err != nil {
panic(err)
}
store := noGasCtx.TransientStore(storeKey)
store.Set([]byte(IndexerEventsKey), newEventsValueBytes)
store.Set([]byte(IndexerEventsCountKey), lib.Uint32ToKey(count+1))
store = prefix.NewStore(store, []byte(IndexerEventsPrefix))
store.Set(lib.Uint32ToKey(count), b)
}

// clearEvents clears events in the context's transient store of indexer events.
Expand All @@ -122,7 +127,7 @@ func clearEvents(
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(storeKey)
store.Delete([]byte(IndexerEventsKey))
store.Delete([]byte(IndexerEventsCountKey))
}

// produceBlock returns the block. It should only be called in EndBlocker when the
Expand Down
12 changes: 2 additions & 10 deletions protocol/indexer/indexer_manager/on_chain_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package indexer_manager

import (
"fmt"
"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
"github.com/cosmos/gogoproto/proto"
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"
)

Expand All @@ -11,22 +11,14 @@ const (
onChainEventsKafkaKey = "on_chain_events"
)

func marshalIndexerTendermintBlock(
indexerTendermintBlock *IndexerTendermintBlock,
marshaler common.Marshaler,
) ([]byte, error) {
bytes, err := marshaler.Marshal(indexerTendermintBlock)
return bytes, err
}

// CreateIndexerBlockEventMessage creates an on-chain update message for all the Indexer events in a block.
func CreateIndexerBlockEventMessage(
block *IndexerTendermintBlock,
) msgsender.Message {
errMessage := "Error creating on-chain Indexer block event message."
errDetails := fmt.Sprintf("Block: %+v", *block)

update, err := marshalIndexerTendermintBlock(block, &common.MarshalerImpl{})
update, err := proto.Marshal(block)
if err != nil {
panic(fmt.Sprintf("%s %s Err: %+v %s\n", errMessage, createErrMsg, err, errDetails))
}
Expand Down
16 changes: 4 additions & 12 deletions protocol/indexer/off_chain_updates/off_chain_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"crypto/sha256"
"errors"
"fmt"
"github.com/cosmos/gogoproto/proto"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"
v1 "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1"
"github.com/dydxprotocol/v4-chain/protocol/indexer/shared"
Expand Down Expand Up @@ -252,7 +252,7 @@ func newOrderPlaceMessage(
},
},
}
return marshalOffchainUpdate(update, &common.MarshalerImpl{})
return proto.Marshal(&update)
}

// newOrderPlaceMessage returns an `OffChainUpdate` struct populated with an `OrderRemove`
Expand All @@ -273,7 +273,7 @@ func newOrderRemoveMessage(
},
},
}
return marshalOffchainUpdate(update, &common.MarshalerImpl{})
return proto.Marshal(&update)
}

// NewOrderUpdateMessage returns an `OffChainUpdate` struct populated with an `OrderUpdate`
Expand All @@ -292,15 +292,7 @@ func newOrderUpdateMessage(
},
},
}
return marshalOffchainUpdate(update, &common.MarshalerImpl{})
}

func marshalOffchainUpdate(
offChainUpdate OffChainUpdateV1,
marshaler common.Marshaler,
) ([]byte, error) {
updateBytes, err := marshaler.Marshal(&offChainUpdate)
return updateBytes, err
return proto.Marshal(&update)
}

// GetOrderIdHash gets the SHA256 hash of the `IndexerOrderId` mapped from an `OrderId`.
Expand Down
17 changes: 0 additions & 17 deletions protocol/indexer/off_chain_updates/off_chain_updates_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package off_chain_updates

import (
"fmt"
"testing"

errorsmod "cosmossdk.io/errors"
Expand All @@ -10,7 +9,6 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"
v1 "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1"
"github.com/dydxprotocol/v4-chain/protocol/indexer/shared"
"github.com/dydxprotocol/v4-chain/protocol/mocks"
"github.com/dydxprotocol/v4-chain/protocol/testutil/constants"
"github.com/dydxprotocol/v4-chain/protocol/testutil/sdk"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
Expand Down Expand Up @@ -273,21 +271,6 @@ func TestNewOrderRemoveMessage(t *testing.T) {
)
}

func TestMarshalOffchainUpdate_MarshalError(t *testing.T) {
expectedError := fmt.Errorf("Marshal error")
mockMarshaller := mocks.Marshaler{}
mockMarshaller.On("Marshal", &offchainUpdateOrderPlace).Return(
[]byte{},
expectedError,
)

updateBytes, err := marshalOffchainUpdate(offchainUpdateOrderPlace, &mockMarshaller)

require.Equal(t, []byte{}, updateBytes)
require.ErrorContains(t, err, expectedError.Error())
require.True(t, mockMarshaller.AssertExpectations(t))
}

func TestGetOrderIdHash(t *testing.T) {
tests := map[string]struct {
orderId clobtypes.OrderId
Expand Down
5 changes: 2 additions & 3 deletions protocol/testing/e2e/trading_rewards/trading_rewards_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package trading_rewards_test

import (
sdkmath "cosmossdk.io/math"
"github.com/cosmos/gogoproto/proto"
"math/big"
"testing"
"time"
Expand All @@ -12,7 +13,6 @@ import (
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
"github.com/dydxprotocol/v4-chain/protocol/dtypes"
"github.com/dydxprotocol/v4-chain/protocol/indexer"
"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
indexerevents "github.com/dydxprotocol/v4-chain/protocol/indexer/events"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"
Expand Down Expand Up @@ -831,9 +831,8 @@ func TestTradingRewards(t *testing.T) {
)
}

unmarshaler := common.UnmarshalerImpl{}
var block indexer_manager.IndexerTendermintBlock
_ = unmarshaler.Unmarshal(msgSender.GetOnchainMessages()[0].Value, &block)
_ = proto.Unmarshal(msgSender.GetOnchainMessages()[0].Value, &block)
rewards := keepertest.GetTradingRewardEventsFromIndexerTendermintBlock(block)
require.ElementsMatch(t, expectedStateAtBlock.ExpectedTradingRewardEvents, rewards)
}
Expand Down
5 changes: 2 additions & 3 deletions protocol/testutil/keeper/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package keeper

import (
dbm "github.com/cosmos/cosmos-db"
"github.com/cosmos/gogoproto/proto"
"testing"

"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
indexerevents "github.com/dydxprotocol/v4-chain/protocol/indexer/events"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/mocks"
Expand Down Expand Up @@ -109,9 +109,8 @@ func GetAssetCreateEventsFromIndexerBlock(
if event.Subtype != indexerevents.SubtypeAsset {
continue
}
unmarshaler := common.UnmarshalerImpl{}
var assetEvent indexerevents.AssetCreateEventV1
err := unmarshaler.Unmarshal(event.DataBytes, &assetEvent)
err := proto.Unmarshal(event.DataBytes, &assetEvent)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/testutil/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type callback func(
func initKeepers(t testing.TB, cb callback) sdk.Context {
ctx, stateStore, db := sdktest.NewSdkContextWithMultistore()
// Mount transient store for indexer events, shared by all keepers that emit indexer events.
transientStoreKey := storetypes.NewTransientStoreKey(indexer_manager.IndexerEventsKey)
transientStoreKey := storetypes.NewTransientStoreKey(indexer_manager.TransientStoreKey)
stateStore.MountStoreWithDB(transientStoreKey, storetypes.StoreTypeTransient, db)
cdc := codec.NewProtoCodec(module.InterfaceRegistry)

Expand Down
5 changes: 2 additions & 3 deletions protocol/testutil/keeper/perpetuals.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package keeper
import (
"fmt"
dbm "github.com/cosmos/cosmos-db"
"github.com/cosmos/gogoproto/proto"
"testing"

storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
pricefeedserver_types "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/pricefeed"
"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
indexerevents "github.com/dydxprotocol/v4-chain/protocol/indexer/events"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
Expand Down Expand Up @@ -195,9 +195,8 @@ func GetLiquidityTierUpsertEventsFromIndexerBlock(
if event.Subtype != indexerevents.SubtypeLiquidityTier {
continue
}
unmarshaler := common.UnmarshalerImpl{}
var liquidityTierEvent indexerevents.LiquidityTierUpsertEventV1
err := unmarshaler.Unmarshal(event.DataBytes, &liquidityTierEvent)
err := proto.Unmarshal(event.DataBytes, &liquidityTierEvent)
if err != nil {
panic(err)
}
Expand Down
5 changes: 2 additions & 3 deletions protocol/testutil/keeper/prices.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package keeper

import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"testing"

storetypes "cosmossdk.io/store/types"
Expand All @@ -11,7 +12,6 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
pricefeed_types "github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/types"
pricefeedserver_types "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/pricefeed"
"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
indexerevents "github.com/dydxprotocol/v4-chain/protocol/indexer/events"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
Expand Down Expand Up @@ -182,9 +182,8 @@ func getMarketEventsFromIndexerBlock(
if event.Subtype != indexerevents.SubtypeMarket {
continue
}
unmarshaler := common.UnmarshalerImpl{}
var marketEvent indexerevents.MarketEventV1
err := unmarshaler.Unmarshal(event.DataBytes, &marketEvent)
err := proto.Unmarshal(event.DataBytes, &marketEvent)
if err != nil {
panic(err)
}
Expand Down
Loading

0 comments on commit 524d60f

Please sign in to comment.