Skip to content

Commit

Permalink
perf: optimize slinky_adapter via in-memory cache [SKI-13] (#1182)
Browse files Browse the repository at this point in the history
* add caching logic for all currency-pairs

* reference lib/slinky

* linting

* evict market-params with updated pair from cache

* make cpCache thread-safe + load all cps on app-start

* lint
  • Loading branch information
nivasan1 committed Mar 22, 2024
1 parent 52a5783 commit 952ac60
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 8 deletions.
14 changes: 14 additions & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,9 @@ func New(

// Hydrate the keeper in-memory data structures.
app.hydrateKeeperInMemoryDataStructures()

// load the x/prices keeper currency-pair ID cache
app.loadCurrencyPairIDsForMarkets()
}
app.initializeRateLimiters()

Expand Down Expand Up @@ -1610,6 +1613,17 @@ func (app *App) DisableHealthMonitorForTesting() {
app.DaemonHealthMonitor.DisableForTesting()
}

// loadCurrencyPairIDsForMarkets loads the currency pair IDs for the markets from the x/prices state.
func (app *App) loadCurrencyPairIDsForMarkets() {
// Create an `uncachedCtx` where the underlying MultiStore is the `rootMultiStore`.
// We use this to load the `currencyPairIDs` with market-params from the
// x/prices state according to the underlying `rootMultiStore`.
uncachedCtx := app.BaseApp.NewUncachedContext(true, tmproto.Header{})

// Load the currency pair IDs for the markets from the x/prices state.
app.PricesKeeper.LoadCurrencyPairIDCache(uncachedCtx)
}

// hydrateMemStores hydrates the memStores used for caching state.
func (app *App) hydrateMemStores() {
// Create an `uncachedCtx` where the underlying MultiStore is the `rootMultiStore`.
Expand Down
73 changes: 73 additions & 0 deletions protocol/x/prices/keeper/currency_pair_id_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package keeper

import (
"sync"
)

// CurrencyPairIDCache handles the caching logic of currency-pairs to their corresponding IDs. This
// data-structure is thread-safe, allowing concurrent reads + synchronized writes.
type CurrencyPairIDCache struct {
// ID -> CurrencyPair
idToCurrencyPair map[uint64]string
// CurrencyPair -> ID
currencyPairToID map[string]uint64
// lock
sync.RWMutex
}

// NewCurrencyPairIDCache creates a new CurrencyPairIDCache
func NewCurrencyPairIDCache() *CurrencyPairIDCache {
return &CurrencyPairIDCache{
idToCurrencyPair: make(map[uint64]string),
currencyPairToID: make(map[string]uint64),
}
}

// AddCurrencyPair adds a currency pair to the cache. This method takes out a write lock on the cache
// or blocks until one is available before updating the cache.
func (c *CurrencyPairIDCache) AddCurrencyPair(id uint64, currencyPair string) {
// acquire write lock
c.Lock()
defer c.Unlock()

// update cache
c.idToCurrencyPair[id] = currencyPair
c.currencyPairToID[currencyPair] = id
}

// GetCurrencyPairFromID returns the currency pair from the cache
func (c *CurrencyPairIDCache) GetCurrencyPairFromID(id uint64) (string, bool) {
// acquire read lock
c.RLock()
defer c.RUnlock()

// get currency pair from cache
currencyPair, found := c.idToCurrencyPair[id]
return currencyPair, found
}

// GetIDForCurrencyPair returns the ID for the currency pair from the cache
func (c *CurrencyPairIDCache) GetIDForCurrencyPair(currencyPair string) (uint64, bool) {
// acquire read lock
c.RLock()
defer c.RUnlock()

// get ID for currency pair from cache
id, found := c.currencyPairToID[currencyPair]
return id, found
}

// Remove removes the currency-pair (by ID) from the cache. This method takes out a write lock on the
// cache or blocks until one is available before updating the cache.
func (c *CurrencyPairIDCache) Remove(id uint64) {
// acquire write lock
c.Lock()
defer c.Unlock()

// remove currency pair from cache
currencyPair, found := c.idToCurrencyPair[id]
if found {
delete(c.idToCurrencyPair, id)
delete(c.currencyPairToID, currencyPair)
}
}
2 changes: 2 additions & 0 deletions protocol/x/prices/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type (
indexerEventManager indexer_manager.IndexerEventManager
marketToCreatedAt map[uint32]time.Time
authorities map[string]struct{}
currencyPairIDCache *CurrencyPairIDCache
}
)

Expand All @@ -45,6 +46,7 @@ func NewKeeper(
indexerEventManager: indexerEventManager,
marketToCreatedAt: map[uint32]time.Time{},
authorities: lib.UniqueSliceToSet(authorities),
currencyPairIDCache: NewCurrencyPairIDCache(),
}
}

Expand Down
10 changes: 10 additions & 0 deletions protocol/x/prices/keeper/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
indexerevents "github.com/dydxprotocol/v4-chain/protocol/indexer/events"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/lib/slinky"
"github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
)

Expand Down Expand Up @@ -55,6 +56,15 @@ func (k Keeper) CreateMarket(
marketPriceStore := k.getMarketPriceStore(ctx)
marketPriceStore.Set(lib.Uint32ToKey(marketPrice.Id), priceBytes)

// add the pair to the currency-pair-id cache
cp, err := slinky.MarketPairToCurrencyPair(marketParam.Pair)
if err != nil {
k.Logger(ctx).Error("failed to add currency pair to cache", "pair", marketParam.Pair, "err", err)
} else {
// add the pair to the currency-pair-id cache
k.currencyPairIDCache.AddCurrencyPair(uint64(marketParam.Id), cp.String())
}

// Generate indexer event.
k.GetIndexerEventManager().AddTxnEvent(
ctx,
Expand Down
35 changes: 35 additions & 0 deletions protocol/x/prices/keeper/market_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

errorsmod "cosmossdk.io/errors"
"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/metrics"
"github.com/dydxprotocol/v4-chain/protocol/lib/slinky"

"cosmossdk.io/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -54,6 +55,20 @@ func (k Keeper) ModifyMarketParam(
b := k.cdc.MustMarshal(&updatedMarketParam)
marketParamStore.Set(lib.Uint32ToKey(updatedMarketParam.Id), b)

// if the market pair has been changed, we need to update the in-memory market pair cache
if existingParam.Pair != updatedMarketParam.Pair {
// remove the old cache entry
k.currencyPairIDCache.Remove(uint64(existingParam.Id))

// add the new cache entry
cp, err := slinky.MarketPairToCurrencyPair(updatedMarketParam.Pair)
if err == nil {
k.currencyPairIDCache.AddCurrencyPair(uint64(updatedMarketParam.Id), cp.String())
} else {
k.Logger(ctx).Error("failed to add currency pair to cache", "pair", updatedMarketParam.Pair)
}
}

// Generate indexer event.
k.GetIndexerEventManager().AddTxnEvent(
ctx,
Expand Down Expand Up @@ -92,6 +107,26 @@ func (k Keeper) GetMarketParam(
return market, true
}

// LoadCurrencyPairIDCache loads the currency pair id cache from the store.
func (k Keeper) LoadCurrencyPairIDCache(ctx sdk.Context) {
marketParamStore := k.getMarketParamStore(ctx)

iterator := marketParamStore.Iterator(nil, nil)
defer iterator.Close()

for ; iterator.Valid(); iterator.Next() {
marketParam := types.MarketParam{}
k.cdc.MustUnmarshal(iterator.Value(), &marketParam)

cp, err := slinky.MarketPairToCurrencyPair(marketParam.Pair)
if err == nil {
k.currencyPairIDCache.AddCurrencyPair(uint64(marketParam.Id), cp.String())
} else {
k.Logger(ctx).Error("failed to add currency pair to cache", "pair", marketParam.Pair)
}
}
}

// GetAllMarketParams returns all market params.
func (k Keeper) GetAllMarketParams(ctx sdk.Context) []types.MarketParam {
marketParamStore := k.getMarketParamStore(ctx)
Expand Down
57 changes: 57 additions & 0 deletions protocol/x/prices/keeper/market_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/dydxprotocol/v4-chain/protocol/daemons/pricefeed/metrics"
"github.com/dydxprotocol/v4-chain/protocol/lib/slinky"

errorsmod "cosmossdk.io/errors"

Expand Down Expand Up @@ -45,6 +46,62 @@ func TestModifyMarketParam(t *testing.T) {
}
}

func TestModifyMarketParamUpdatesCache(t *testing.T) {
ctx, keeper, _, _, mockTimeProvider := keepertest.PricesKeepers(t)
mockTimeProvider.On("Now").Return(constants.TimeT)
ctx = ctx.WithTxBytes(constants.TestTxBytes)

id := uint32(1)
oldParam := types.MarketParam{
Id: id,
Pair: "foo-bar",
MinExchanges: uint32(2),
Exponent: 8,
MinPriceChangePpm: uint32(50),
ExchangeConfigJson: `{"id":"1"}`,
}
mp, err := keeper.CreateMarket(ctx, oldParam, types.MarketPrice{
Id: id,
Exponent: 8,
Price: 1,
})
require.NoError(t, err)

// check that the existing entry exists
cp, err := slinky.MarketPairToCurrencyPair(mp.Pair)
require.NoError(t, err)

// check that the existing entry exists
cpID, found := keeper.GetIDForCurrencyPair(ctx, cp)
require.True(t, found)
require.Equal(t, uint64(id), cpID)

// modify the market param
newParam, err := keeper.ModifyMarketParam(
ctx,
types.MarketParam{
Id: id,
Pair: "bar-foo",
MinExchanges: uint32(2),
Exponent: 8,
MinPriceChangePpm: uint32(50),
ExchangeConfigJson: `{"id":"1"}`,
},
)
require.NoError(t, err)

// check that the existing entry does not exist
_, found = keeper.GetIDForCurrencyPair(ctx, cp)
require.False(t, found)

// check that the new entry exists
cp, err = slinky.MarketPairToCurrencyPair(newParam.Pair)
require.NoError(t, err)
cpID, found = keeper.GetIDForCurrencyPair(ctx, cp)
require.True(t, found)
require.Equal(t, uint64(id), cpID)
}

func TestModifyMarketParam_Errors(t *testing.T) {
validExchangeConfigJson := `{"exchanges":[{"exchangeName":"Binance","ticker":"BTCUSDT"}]}`
tests := map[string]struct {
Expand Down
38 changes: 30 additions & 8 deletions protocol/x/prices/keeper/slinky_adapter.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package keeper

import (
"fmt"
"strings"

"cosmossdk.io/math"
"fmt"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/slinky"
slinkytypes "github.com/skip-mev/slinky/pkg/types"
oracletypes "github.com/skip-mev/slinky/x/oracle/types"

"github.com/dydxprotocol/v4-chain/protocol/lib/slinky"
"strings"
)

/*
Expand All @@ -21,41 +19,65 @@ import (
*/

func (k Keeper) GetCurrencyPairFromID(ctx sdk.Context, id uint64) (cp slinkytypes.CurrencyPair, found bool) {
// check in the keeper's cache first
pair, found := k.currencyPairIDCache.GetCurrencyPairFromID(id)
if found {
cp, err := slinkytypes.CurrencyPairFromString(pair)
if err != nil {
k.Logger(ctx).Error("CurrencyPairFromString", "error", err)
return cp, false
}
return cp, true
}

mp, found := k.GetMarketParam(ctx, uint32(id))
if !found {
return cp, false
}
cp, err := slinky.MarketPairToCurrencyPair(mp.Pair)
pair = mp.Pair

cp, err := slinky.MarketPairToCurrencyPair(pair)
if err != nil {
k.Logger(ctx).Error("CurrencyPairFromString", "error", err)
return cp, false
}

return cp, true
}

func (k Keeper) GetIDForCurrencyPair(ctx sdk.Context, cp slinkytypes.CurrencyPair) (uint64, bool) {
// check in the keeper's cache first
id, found := k.currencyPairIDCache.GetIDForCurrencyPair(cp.String())
if found {
return id, true
}

// if not found, iterate through all market params and find the id
mps := k.GetAllMarketParams(ctx)
for _, mp := range mps {
mpCp, err := slinky.MarketPairToCurrencyPair(mp.Pair)
if err != nil {
k.Logger(ctx).Error("market param pair invalid format", "pair", mp.Pair)
continue
}

// compare the currency pairs to the one that we're looking for
if strings.EqualFold(mpCp.String(), cp.String()) {
return uint64(mp.Id), true
}
}

return 0, false
}

func (k Keeper) GetPriceForCurrencyPair(ctx sdk.Context, cp slinkytypes.CurrencyPair) (oracletypes.QuotePrice, error) {
id, found := k.GetIDForCurrencyPair(ctx, cp)
if !found {
return oracletypes.QuotePrice{}, fmt.Errorf("id for currency pair %s not found", cp.String())
return oracletypes.QuotePrice{}, fmt.Errorf("currency pair %s not found", cp.String())
}
mp, err := k.GetMarketPrice(ctx, uint32(id))
if err != nil {
return oracletypes.QuotePrice{}, fmt.Errorf("market price not found for currency pair %s", cp.String())
return oracletypes.QuotePrice{}, fmt.Errorf("currency pair %s not found", cp.String())
}
return oracletypes.QuotePrice{
Price: math.NewIntFromUint64(mp.Price),
Expand Down

0 comments on commit 952ac60

Please sign in to comment.