Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OTE-176] Reduce the cost of adding an indexer event. #1078

Merged
merged 1 commit into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions protocol/indexer/common/marshaler.go

This file was deleted.

21 changes: 0 additions & 21 deletions protocol/indexer/common/unmarshaler.go

This file was deleted.

65 changes: 35 additions & 30 deletions protocol/indexer/indexer_manager/events.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package indexer_manager

import (
"cosmossdk.io/store/prefix"
storetypes "cosmossdk.io/store/types"
"encoding/binary"
"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
)
Expand All @@ -15,37 +16,44 @@ const (
// TransientStoreKey is the transient store key for indexer events.
TransientStoreKey = "transient_indexer_events"

// IndexerEventsKey is the key to retrieve the indexer events
// within the last block.
IndexerEventsKey = "IndexerEvents"
// IndexerEventsCountKey is the key to retrieve the count of the indexer events
// within the last block. Each individual event is stored at a big endian encoded
// uint32 starting from 0 upto and not including count.
IndexerEventsCountKey = "c"
IndexerEventsPrefix = "e"

ModuleName = "indexer_events"
)

func getIndexerEvents(ctx sdk.Context, storeKey storetypes.StoreKey) []*IndexerTendermintEventWrapper {
// This is necessary to prevent GasConsumed from being incremented when indexer events are recorded.
// Without this, consensus failure would occur due to lastResultsHash mismatch from different gas costs.
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(storeKey)
indexerEventsSliceBytes := store.Get([]byte(IndexerEventsKey))
if indexerEventsSliceBytes == nil {
return []*IndexerTendermintEventWrapper{}
func getIndexerEventsCount(noGasCtx sdk.Context, store storetypes.KVStore) uint32 {
countsBytes := store.Get([]byte(IndexerEventsCountKey))
if countsBytes == nil {
return 0
}
var events IndexerEventsStoreValue
unmarshaler := &common.UnmarshalerImpl{}
err := unmarshaler.Unmarshal(indexerEventsSliceBytes, &events)
if err != nil {
panic(err)
return binary.BigEndian.Uint32(countsBytes)
}

func getIndexerEvents(noGasCtx sdk.Context, storeKey storetypes.StoreKey) []*IndexerTendermintEventWrapper {
store := noGasCtx.TransientStore(storeKey)
count := getIndexerEventsCount(noGasCtx, store)
events := make([]*IndexerTendermintEventWrapper, count)
store = prefix.NewStore(store, []byte(IndexerEventsPrefix))
for i := uint32(0); i < count; i++ {
var event IndexerTendermintEventWrapper
bytes := store.Get(lib.Uint32ToKey(i))
if err := proto.Unmarshal(bytes, &event); err != nil {
panic(err)
}
events[i] = &event
}
return events.Events
return events
}

// GetBytes returns the marshaled bytes of the event message.
func GetBytes(
eventMessage proto.Message,
) []byte {
marshaler := &common.MarshalerImpl{}
eventMessageBytes, err := marshaler.Marshal(eventMessage)
eventMessageBytes, err := proto.Marshal(eventMessage)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -101,18 +109,15 @@ func addEvent(
storeKey storetypes.StoreKey,
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
indexerEvents := getIndexerEvents(noGasCtx, storeKey)
indexerEvents = append(indexerEvents, &event)
newEventsValue := IndexerEventsStoreValue{
Events: indexerEvents,
}
marshaler := &common.MarshalerImpl{}
newEventsValueBytes, err := marshaler.Marshal(&newEventsValue)
store := noGasCtx.TransientStore(storeKey)
count := getIndexerEventsCount(noGasCtx, store)
b, err := proto.Marshal(&event)
if err != nil {
panic(err)
}
store := noGasCtx.TransientStore(storeKey)
store.Set([]byte(IndexerEventsKey), newEventsValueBytes)
store.Set([]byte(IndexerEventsCountKey), lib.Uint32ToKey(count+1))
store = prefix.NewStore(store, []byte(IndexerEventsPrefix))
store.Set(lib.Uint32ToKey(count), b)
}

// clearEvents clears events in the context's transient store of indexer events.
Expand All @@ -122,7 +127,7 @@ func clearEvents(
) {
noGasCtx := ctx.WithGasMeter(ante_types.NewFreeInfiniteGasMeter())
store := noGasCtx.TransientStore(storeKey)
store.Delete([]byte(IndexerEventsKey))
store.Delete([]byte(IndexerEventsCountKey))
}

// produceBlock returns the block. It should only be called in EndBlocker when the
Expand Down
12 changes: 2 additions & 10 deletions protocol/indexer/indexer_manager/on_chain_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package indexer_manager

import (
"fmt"
"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
"github.com/cosmos/gogoproto/proto"
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"
)

Expand All @@ -11,22 +11,14 @@ const (
onChainEventsKafkaKey = "on_chain_events"
)

func marshalIndexerTendermintBlock(
indexerTendermintBlock *IndexerTendermintBlock,
marshaler common.Marshaler,
) ([]byte, error) {
bytes, err := marshaler.Marshal(indexerTendermintBlock)
return bytes, err
}

// CreateIndexerBlockEventMessage creates an on-chain update message for all the Indexer events in a block.
func CreateIndexerBlockEventMessage(
block *IndexerTendermintBlock,
) msgsender.Message {
errMessage := "Error creating on-chain Indexer block event message."
errDetails := fmt.Sprintf("Block: %+v", *block)

update, err := marshalIndexerTendermintBlock(block, &common.MarshalerImpl{})
update, err := proto.Marshal(block)
if err != nil {
panic(fmt.Sprintf("%s %s Err: %+v %s\n", errMessage, createErrMsg, err, errDetails))
}
Expand Down
16 changes: 4 additions & 12 deletions protocol/indexer/off_chain_updates/off_chain_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"crypto/sha256"
"errors"
"fmt"
"github.com/cosmos/gogoproto/proto"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"
v1 "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1"
"github.com/dydxprotocol/v4-chain/protocol/indexer/shared"
Expand Down Expand Up @@ -252,7 +252,7 @@ func newOrderPlaceMessage(
},
},
}
return marshalOffchainUpdate(update, &common.MarshalerImpl{})
return proto.Marshal(&update)
}

// newOrderPlaceMessage returns an `OffChainUpdate` struct populated with an `OrderRemove`
Expand All @@ -273,7 +273,7 @@ func newOrderRemoveMessage(
},
},
}
return marshalOffchainUpdate(update, &common.MarshalerImpl{})
return proto.Marshal(&update)
}

// NewOrderUpdateMessage returns an `OffChainUpdate` struct populated with an `OrderUpdate`
Expand All @@ -292,15 +292,7 @@ func newOrderUpdateMessage(
},
},
}
return marshalOffchainUpdate(update, &common.MarshalerImpl{})
}

func marshalOffchainUpdate(
offChainUpdate OffChainUpdateV1,
marshaler common.Marshaler,
) ([]byte, error) {
updateBytes, err := marshaler.Marshal(&offChainUpdate)
return updateBytes, err
return proto.Marshal(&update)
}

// GetOrderIdHash gets the SHA256 hash of the `IndexerOrderId` mapped from an `OrderId`.
Expand Down
17 changes: 0 additions & 17 deletions protocol/indexer/off_chain_updates/off_chain_updates_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package off_chain_updates

import (
"fmt"
"testing"

errorsmod "cosmossdk.io/errors"
Expand All @@ -10,7 +9,6 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"
v1 "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1"
"github.com/dydxprotocol/v4-chain/protocol/indexer/shared"
"github.com/dydxprotocol/v4-chain/protocol/mocks"
"github.com/dydxprotocol/v4-chain/protocol/testutil/constants"
"github.com/dydxprotocol/v4-chain/protocol/testutil/sdk"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
Expand Down Expand Up @@ -273,21 +271,6 @@ func TestNewOrderRemoveMessage(t *testing.T) {
)
}

func TestMarshalOffchainUpdate_MarshalError(t *testing.T) {
expectedError := fmt.Errorf("Marshal error")
mockMarshaller := mocks.Marshaler{}
mockMarshaller.On("Marshal", &offchainUpdateOrderPlace).Return(
[]byte{},
expectedError,
)

updateBytes, err := marshalOffchainUpdate(offchainUpdateOrderPlace, &mockMarshaller)

require.Equal(t, []byte{}, updateBytes)
require.ErrorContains(t, err, expectedError.Error())
require.True(t, mockMarshaller.AssertExpectations(t))
}

func TestGetOrderIdHash(t *testing.T) {
tests := map[string]struct {
orderId clobtypes.OrderId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package trading_rewards_test

import (
sdkmath "cosmossdk.io/math"
"github.com/cosmos/gogoproto/proto"
"math/big"
"testing"
"time"
Expand All @@ -12,7 +13,6 @@ import (
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
"github.com/dydxprotocol/v4-chain/protocol/dtypes"
"github.com/dydxprotocol/v4-chain/protocol/indexer"
"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
indexerevents "github.com/dydxprotocol/v4-chain/protocol/indexer/events"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"
Expand Down Expand Up @@ -831,9 +831,8 @@ func TestTradingRewards(t *testing.T) {
)
}

unmarshaler := common.UnmarshalerImpl{}
var block indexer_manager.IndexerTendermintBlock
_ = unmarshaler.Unmarshal(msgSender.GetOnchainMessages()[0].Value, &block)
_ = proto.Unmarshal(msgSender.GetOnchainMessages()[0].Value, &block)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The replacement of a custom unmarshalling implementation with proto.Unmarshal() is a significant improvement in terms of both performance and code maintainability. By using a standard library method, the code becomes more readable, easier to maintain, and potentially more efficient. However, it's crucial to ensure that all data structures being unmarshalled are compatible with the proto package and that this change does not introduce any regressions in how data is interpreted or processed.

One minor suggestion is to add error handling for the proto.Unmarshal() call. Ignoring the error returned by this function can lead to hard-to-debug issues if the unmarshalling fails for any reason (e.g., due to incompatible data structures or corrupted data). Proper error handling would improve the robustness of the test code.

- _ = proto.Unmarshal(msgSender.GetOnchainMessages()[0].Value, &block)
+ err := proto.Unmarshal(msgSender.GetOnchainMessages()[0].Value, &block)
+ require.NoError(t, err, "Failed to unmarshal indexer tendermint block")

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
_ = proto.Unmarshal(msgSender.GetOnchainMessages()[0].Value, &block)
err := proto.Unmarshal(msgSender.GetOnchainMessages()[0].Value, &block)
require.NoError(t, err, "Failed to unmarshal indexer tendermint block")

rewards := keepertest.GetTradingRewardEventsFromIndexerTendermintBlock(block)
require.ElementsMatch(t, expectedStateAtBlock.ExpectedTradingRewardEvents, rewards)
}
Expand Down
5 changes: 2 additions & 3 deletions protocol/testutil/keeper/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package keeper

import (
dbm "github.com/cosmos/cosmos-db"
"github.com/cosmos/gogoproto/proto"
"testing"

"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
indexerevents "github.com/dydxprotocol/v4-chain/protocol/indexer/events"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/mocks"
Expand Down Expand Up @@ -109,9 +109,8 @@ func GetAssetCreateEventsFromIndexerBlock(
if event.Subtype != indexerevents.SubtypeAsset {
continue
}
unmarshaler := common.UnmarshalerImpl{}
var assetEvent indexerevents.AssetCreateEventV1
err := unmarshaler.Unmarshal(event.DataBytes, &assetEvent)
err := proto.Unmarshal(event.DataBytes, &assetEvent)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/testutil/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type callback func(
func initKeepers(t testing.TB, cb callback) sdk.Context {
ctx, stateStore, db := sdktest.NewSdkContextWithMultistore()
// Mount transient store for indexer events, shared by all keepers that emit indexer events.
transientStoreKey := storetypes.NewTransientStoreKey(indexer_manager.IndexerEventsKey)
transientStoreKey := storetypes.NewTransientStoreKey(indexer_manager.TransientStoreKey)
stateStore.MountStoreWithDB(transientStoreKey, storetypes.StoreTypeTransient, db)
cdc := codec.NewProtoCodec(module.InterfaceRegistry)

Expand Down
5 changes: 2 additions & 3 deletions protocol/testutil/keeper/perpetuals.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package keeper
import (
"fmt"
dbm "github.com/cosmos/cosmos-db"
"github.com/cosmos/gogoproto/proto"
"testing"

storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
pricefeedserver_types "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/pricefeed"
"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
indexerevents "github.com/dydxprotocol/v4-chain/protocol/indexer/events"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
Expand Down Expand Up @@ -195,9 +195,8 @@ func GetLiquidityTierUpsertEventsFromIndexerBlock(
if event.Subtype != indexerevents.SubtypeLiquidityTier {
continue
}
unmarshaler := common.UnmarshalerImpl{}
var liquidityTierEvent indexerevents.LiquidityTierUpsertEventV1
err := unmarshaler.Unmarshal(event.DataBytes, &liquidityTierEvent)
err := proto.Unmarshal(event.DataBytes, &liquidityTierEvent)
if err != nil {
panic(err)
}
Expand Down
5 changes: 2 additions & 3 deletions protocol/testutil/keeper/prices.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package keeper

import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"testing"

storetypes "cosmossdk.io/store/types"
Expand All @@ -11,7 +12,6 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
pricefeed_types "github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/types"
pricefeedserver_types "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/pricefeed"
"github.com/dydxprotocol/v4-chain/protocol/indexer/common"
indexerevents "github.com/dydxprotocol/v4-chain/protocol/indexer/events"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
Expand Down Expand Up @@ -182,9 +182,8 @@ func getMarketEventsFromIndexerBlock(
if event.Subtype != indexerevents.SubtypeMarket {
continue
}
unmarshaler := common.UnmarshalerImpl{}
var marketEvent indexerevents.MarketEventV1
err := unmarshaler.Unmarshal(event.DataBytes, &marketEvent)
err := proto.Unmarshal(event.DataBytes, &marketEvent)
if err != nil {
panic(err)
}
Expand Down
Loading
Loading