Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-644] instantiate grpc stream manager #1134

Merged
merged 3 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer"
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"

// Grpc Streaming
streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
)

var (
Expand Down Expand Up @@ -298,8 +302,9 @@ type App struct {
// module configurator
configurator module.Configurator

IndexerEventManager indexer_manager.IndexerEventManager
Server *daemonserver.Server
IndexerEventManager indexer_manager.IndexerEventManager
GrpcStreamingManager streamingtypes.GrpcStreamingManager
Server *daemonserver.Server

// startDaemons encapsulates the logic that starts all daemons and daemon services. This function contains a
// closure of all relevant data structures that are shared with various keepers. Daemon services startup is
Expand Down Expand Up @@ -679,6 +684,9 @@ func New(
tkeys[indexer_manager.TransientStoreKey],
indexerFlags.SendOffchainData,
)

app.GrpcStreamingManager = getGrpcStreamingManagerFromOptions(appFlags, appOpts, logger)

Comment on lines +688 to +689
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initialization of GrpcStreamingManager using getGrpcStreamingManagerFromOptions is a flexible approach that allows for configuration-based instantiation. However, it's important to ensure that the options and flags used for determining the manager type are clearly documented and validated. Additionally, consider adding error handling or logging within getGrpcStreamingManagerFromOptions to handle potential issues during initialization.

- return streaming.NewGrpcStreamingManager()
+ manager, err := streaming.NewGrpcStreamingManager()
+ if err != nil {
+     logger.Error("Failed to initialize GrpcStreamingManager", "error", err)
+     return nil
+ }
+ return manager

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
app.GrpcStreamingManager = getGrpcStreamingManagerFromOptions(appFlags, appOpts, logger)

timeProvider := &timelib.TimeProviderImpl{}

app.EpochsKeeper = *epochsmodulekeeper.NewKeeper(
Expand Down Expand Up @@ -976,6 +984,7 @@ func New(
app.StatsKeeper,
app.RewardsKeeper,
app.IndexerEventManager,
app.GrpcStreamingManager,
txConfig.TxDecoder(),
clobFlags,
rate_limit.NewPanicRateLimiter[*clobmoduletypes.MsgPlaceOrder](),
Expand Down Expand Up @@ -1740,3 +1749,17 @@ func getIndexerFromOptions(
}
return indexerMessageSender, indexerFlags
}

// getGrpcStreamingManagerFromOptions returns an instance of a streamingtypes.GrpcStreamingManager from the specified
// options. This function will default to returning a no-op instance.
func getGrpcStreamingManagerFromOptions(
appFlags flags.Flags,
appOpts servertypes.AppOptions,
logger log.Logger,
) (manager streamingtypes.GrpcStreamingManager) {
// TODO(CT-625): add command line flags for full node streaming.
if appFlags.NonValidatingFullNode {
return streaming.NewGrpcStreamingManager()
}
return streaming.NewNoopGrpcStreamingManager()
}
Comment on lines +1752 to +1765
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function getGrpcStreamingManagerFromOptions correctly returns a GrpcStreamingManager instance based on application flags. However, the current implementation defaults to a no-op manager without considering any specific flags for enabling streaming. This might be intentional for a phased rollout or testing, but it's crucial to implement and document the flag-based logic for enabling the streaming manager in future iterations. Additionally, the TODO comment about adding command-line flags should be addressed to provide clarity on the intended usage and configuration options.

Consider implementing the flag-based logic for initializing the GrpcStreamingManager and removing the TODO comment once resolved.

38 changes: 38 additions & 0 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package grpc

import (
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil)

// GrpcStreamingManagerImpl is an implementation for managing gRPC streaming subscriptions.
type GrpcStreamingManagerImpl struct {
}

func NewGrpcStreamingManager() *GrpcStreamingManagerImpl {
return &GrpcStreamingManagerImpl{}
}

func (sm *GrpcStreamingManagerImpl) Enabled() bool {
return true
}

// Subscribe subscribes to the orderbook updates stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment about lifecycles of the req, srv, finished variables. i think there's a stream and a chan going on so not too sure when the finished chan returns a bool.

func (sm *GrpcStreamingManagerImpl) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
) {
return nil, nil
}

// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
) {
}
33 changes: 33 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package grpc

import (
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

var _ types.GrpcStreamingManager = (*NoopGrpcStreamingManager)(nil)

type NoopGrpcStreamingManager struct{}

func NewNoopGrpcStreamingManager() *NoopGrpcStreamingManager {
return &NoopGrpcStreamingManager{}
}

func (sm *NoopGrpcStreamingManager) Enabled() bool {
return false
}

func (sm *NoopGrpcStreamingManager) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
) {
return nil, nil
}

func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
) {
}
19 changes: 19 additions & 0 deletions protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package types

import (
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
)

type GrpcStreamingManager interface {
Enabled() bool

// L3+ Orderbook updates.
Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
srv clobtypes.Query_StreamOrderbookUpdatesServer,
) (
finished chan bool,
err error,
)
SendOrderbookUpdates(*clobtypes.OffchainUpdates)
}
2 changes: 2 additions & 0 deletions protocol/testutil/keeper/clob.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/mocks"
streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc"
clobtest "github.com/dydxprotocol/v4-chain/protocol/testutil/clob"
"github.com/dydxprotocol/v4-chain/protocol/testutil/constants"
asskeeper "github.com/dydxprotocol/v4-chain/protocol/x/assets/keeper"
Expand Down Expand Up @@ -214,6 +215,7 @@ func createClobKeeper(
statsKeeper,
rewardsKeeper,
indexerEventManager,
streaming.NewNoopGrpcStreamingManager(),
constants.TestEncodingCfg.TxConfig.TxDecoder(),
flags.GetDefaultClobFlags(),
rate_limit.NewNoOpRateLimiter[*types.MsgPlaceOrder](),
Expand Down
27 changes: 18 additions & 9 deletions protocol/x/clob/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
flags "github.com/dydxprotocol/v4-chain/protocol/x/clob/flags"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/rate_limit"
"github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
Expand All @@ -31,16 +32,18 @@ type (
UntriggeredConditionalOrders map[types.ClobPairId]*UntriggeredConditionalOrders
PerpetualIdToClobPairId map[uint32][]types.ClobPairId

subaccountsKeeper types.SubaccountsKeeper
assetsKeeper types.AssetsKeeper
bankKeeper types.BankKeeper
blockTimeKeeper types.BlockTimeKeeper
feeTiersKeeper types.FeeTiersKeeper
perpetualsKeeper types.PerpetualsKeeper
pricesKeeper types.PricesKeeper
statsKeeper types.StatsKeeper
rewardsKeeper types.RewardsKeeper
subaccountsKeeper types.SubaccountsKeeper
assetsKeeper types.AssetsKeeper
bankKeeper types.BankKeeper
blockTimeKeeper types.BlockTimeKeeper
feeTiersKeeper types.FeeTiersKeeper
perpetualsKeeper types.PerpetualsKeeper
pricesKeeper types.PricesKeeper
statsKeeper types.StatsKeeper
rewardsKeeper types.RewardsKeeper

indexerEventManager indexer_manager.IndexerEventManager
streamingManager streamingtypes.GrpcStreamingManager

memStoreInitialized *atomic.Bool

Expand Down Expand Up @@ -82,6 +85,7 @@ func NewKeeper(
statsKeeper types.StatsKeeper,
rewardsKeeper types.RewardsKeeper,
indexerEventManager indexer_manager.IndexerEventManager,
grpcStreamingManager streamingtypes.GrpcStreamingManager,
txDecoder sdk.TxDecoder,
clobFlags flags.ClobFlags,
placeOrderRateLimiter rate_limit.RateLimiter[*types.MsgPlaceOrder],
Expand All @@ -107,6 +111,7 @@ func NewKeeper(
statsKeeper: statsKeeper,
rewardsKeeper: rewardsKeeper,
indexerEventManager: indexerEventManager,
streamingManager: grpcStreamingManager,
memStoreInitialized: &atomic.Bool{},
txDecoder: txDecoder,
mevTelemetryConfig: MevTelemetryConfig{
Expand Down Expand Up @@ -136,6 +141,10 @@ func (k Keeper) GetIndexerEventManager() indexer_manager.IndexerEventManager {
return k.indexerEventManager
}

func (k Keeper) GetGrpcStreamingManager() streamingtypes.GrpcStreamingManager {
return k.streamingManager
}

func (k Keeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger().With(
log.ModuleKey, "x/clob",
Expand Down
Loading