Skip to content

Commit

Permalink
[CT-644] instantiate grpc stream manager (#1134)
Browse files Browse the repository at this point in the history
* [CT-644] instantiate grpc stream manager

* update type

* update channel type
  • Loading branch information
jayy04 committed Mar 15, 2024
1 parent 4714d8e commit 936ae97
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 16 deletions.
1 change: 1 addition & 0 deletions protocol/app/ante_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func newTestHandlerOptions() HandlerOptions {
nil,
nil,
nil,
nil,
flags.GetDefaultClobFlags(),
rate_limit.NewNoOpRateLimiter[*types.MsgPlaceOrder](),
rate_limit.NewNoOpRateLimiter[*types.MsgCancelOrder](),
Expand Down
27 changes: 25 additions & 2 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"

// Grpc Streaming
streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
)

var (
Expand Down Expand Up @@ -284,8 +288,9 @@ type App struct {
// module configurator
configurator module.Configurator

IndexerEventManager indexer_manager.IndexerEventManager
Server *daemonserver.Server
IndexerEventManager indexer_manager.IndexerEventManager
GrpcStreamingManager streamingtypes.GrpcStreamingManager
Server *daemonserver.Server

// startDaemons encapsulates the logic that starts all daemons and daemon services. This function contains a
// closure of all relevant data structures that are shared with various keepers. Daemon services startup is
Expand Down Expand Up @@ -593,6 +598,9 @@ func New(
tkeys[indexer_manager.TransientStoreKey],
indexerFlags.SendOffchainData,
)

app.GrpcStreamingManager = getGrpcStreamingManagerFromOptions(appFlags, appOpts, logger)

timeProvider := &timelib.TimeProviderImpl{}

app.EpochsKeeper = *epochsmodulekeeper.NewKeeper(
Expand Down Expand Up @@ -899,6 +907,7 @@ func New(
app.StatsKeeper,
app.RewardsKeeper,
app.IndexerEventManager,
app.GrpcStreamingManager,
txConfig.TxDecoder(),
clobFlags,
rate_limit.NewPanicRateLimiter[*clobmoduletypes.MsgPlaceOrder](),
Expand Down Expand Up @@ -1592,3 +1601,17 @@ func getIndexerFromOptions(
}
return indexerMessageSender, indexerFlags
}

// getGrpcStreamingManagerFromOptions returns an instance of a streamingtypes.GrpcStreamingManager from the specified
// options. This function will default to returning a no-op instance.
func getGrpcStreamingManagerFromOptions(
appFlags flags.Flags,
appOpts servertypes.AppOptions,
logger log.Logger,
) (manager streamingtypes.GrpcStreamingManager) {
// TODO(CT-625): add command line flags for full node streaming.
if appFlags.NonValidatingFullNode {
return streaming.NewGrpcStreamingManager()
}
return streaming.NewNoopGrpcStreamingManager()
}
38 changes: 38 additions & 0 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package grpc

import (
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil)

// GrpcStreamingManagerImpl is an implementation for managing gRPC streaming subscriptions.
type GrpcStreamingManagerImpl struct {
}

func NewGrpcStreamingManager() *GrpcStreamingManagerImpl {
return &GrpcStreamingManagerImpl{}
}

func (sm *GrpcStreamingManagerImpl) Enabled() bool {
return true
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *GrpcStreamingManagerImpl) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
) {
return nil, nil
}

// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
) {
}
33 changes: 33 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package grpc

import (
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

var _ types.GrpcStreamingManager = (*NoopGrpcStreamingManager)(nil)

type NoopGrpcStreamingManager struct{}

func NewNoopGrpcStreamingManager() *NoopGrpcStreamingManager {
return &NoopGrpcStreamingManager{}
}

func (sm *NoopGrpcStreamingManager) Enabled() bool {
return false
}

func (sm *NoopGrpcStreamingManager) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
) {
return nil, nil
}

func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
) {
}
19 changes: 19 additions & 0 deletions protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package types

import (
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

type GrpcStreamingManager interface {
Enabled() bool

// L3+ Orderbook updates.
Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
)
SendOrderbookUpdates(*clobtypes.OffchainUpdates)
}
26 changes: 12 additions & 14 deletions protocol/testutil/keeper/clob.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,35 @@ package keeper
import (
"testing"

"github.com/dydxprotocol/v4-chain/protocol/lib"

indexerevents "github.com/dydxprotocol/v4-chain/protocol/indexer/events"
"github.com/dydxprotocol/v4-chain/protocol/mocks"
clobtest "github.com/dydxprotocol/v4-chain/protocol/testutil/clob"
delaymsgmoduletypes "github.com/dydxprotocol/v4-chain/protocol/x/delaymsg/types"
"github.com/stretchr/testify/require"

"github.com/dydxprotocol/v4-chain/protocol/x/clob/flags"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/rate_limit"

"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/testutil/constants"

db "github.com/cometbft/cometbft-db"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
bankkeeper "github.com/cosmos/cosmos-sdk/x/bank/keeper"
liquidationtypes "github.com/dydxprotocol/v4-chain/protocol/daemons/server/types/liquidations"
indexerevents "github.com/dydxprotocol/v4-chain/protocol/indexer/events"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/mocks"
streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc"
clobtest "github.com/dydxprotocol/v4-chain/protocol/testutil/clob"
"github.com/dydxprotocol/v4-chain/protocol/testutil/constants"
asskeeper "github.com/dydxprotocol/v4-chain/protocol/x/assets/keeper"
blocktimekeeper "github.com/dydxprotocol/v4-chain/protocol/x/blocktime/keeper"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/flags"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/keeper"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/rate_limit"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
delaymsgmoduletypes "github.com/dydxprotocol/v4-chain/protocol/x/delaymsg/types"
feetierskeeper "github.com/dydxprotocol/v4-chain/protocol/x/feetiers/keeper"
perpkeeper "github.com/dydxprotocol/v4-chain/protocol/x/perpetuals/keeper"
priceskeeper "github.com/dydxprotocol/v4-chain/protocol/x/prices/keeper"
rewardskeeper "github.com/dydxprotocol/v4-chain/protocol/x/rewards/keeper"
statskeeper "github.com/dydxprotocol/v4-chain/protocol/x/stats/keeper"
subkeeper "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/keeper"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
"github.com/stretchr/testify/require"
)

type ClobKeepersTestContext struct {
Expand Down Expand Up @@ -214,6 +211,7 @@ func createClobKeeper(
statsKeeper,
rewardsKeeper,
indexerEventManager,
streaming.NewNoopGrpcStreamingManager(),
constants.TestEncodingCfg.TxConfig.TxDecoder(),
flags.GetDefaultClobFlags(),
rate_limit.NewNoOpRateLimiter[*types.MsgPlaceOrder](),
Expand Down
8 changes: 8 additions & 0 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/rate_limit"

"github.com/cometbft/cometbft/libs/log"
Expand Down Expand Up @@ -43,6 +44,7 @@ type (
statsKeeper types.StatsKeeper
rewardsKeeper types.RewardsKeeper
indexerEventManager indexer_manager.IndexerEventManager
streamingManager streamingtypes.GrpcStreamingManager

memStoreInitialized *atomic.Bool

Expand Down Expand Up @@ -83,6 +85,7 @@ func NewKeeper(
statsKeeper types.StatsKeeper,
rewardsKeeper types.RewardsKeeper,
indexerEventManager indexer_manager.IndexerEventManager,
grpcStreamingManager streamingtypes.GrpcStreamingManager,
txDecoder sdk.TxDecoder,
clobFlags flags.ClobFlags,
placeOrderRateLimiter rate_limit.RateLimiter[*types.MsgPlaceOrder],
Expand All @@ -107,6 +110,7 @@ func NewKeeper(
statsKeeper: statsKeeper,
rewardsKeeper: rewardsKeeper,
indexerEventManager: indexerEventManager,
streamingManager: grpcStreamingManager,
memStoreInitialized: &atomic.Bool{},
txDecoder: txDecoder,
mevTelemetryConfig: MevTelemetryConfig{
Expand Down Expand Up @@ -136,6 +140,10 @@ func (k Keeper) GetIndexerEventManager() indexer_manager.IndexerEventManager {
return k.indexerEventManager
}

func (k Keeper) GetGrpcStreamingManager() streamingtypes.GrpcStreamingManager {
return k.streamingManager
}

func (k Keeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger().With(
sdklog.ModuleKey, "x/clob",
Expand Down

0 comments on commit 936ae97

Please sign in to comment.