-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CT-644] instantiate grpc stream manager #1134
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -198,6 +198,10 @@ import ( | |
"github.com/dydxprotocol/v4-chain/protocol/indexer" | ||
"github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager" | ||
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender" | ||
|
||
// Grpc Streaming | ||
streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc" | ||
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" | ||
) | ||
|
||
var ( | ||
|
@@ -298,8 +302,9 @@ type App struct { | |
// module configurator | ||
configurator module.Configurator | ||
|
||
IndexerEventManager indexer_manager.IndexerEventManager | ||
Server *daemonserver.Server | ||
IndexerEventManager indexer_manager.IndexerEventManager | ||
GrpcStreamingManager streamingtypes.GrpcStreamingManager | ||
Server *daemonserver.Server | ||
|
||
// startDaemons encapsulates the logic that starts all daemons and daemon services. This function contains a | ||
// closure of all relevant data structures that are shared with various keepers. Daemon services startup is | ||
|
@@ -679,6 +684,9 @@ func New( | |
tkeys[indexer_manager.TransientStoreKey], | ||
indexerFlags.SendOffchainData, | ||
) | ||
|
||
app.GrpcStreamingManager = getGrpcStreamingManagerFromOptions(appFlags, appOpts, logger) | ||
|
||
timeProvider := &timelib.TimeProviderImpl{} | ||
|
||
app.EpochsKeeper = *epochsmodulekeeper.NewKeeper( | ||
|
@@ -976,6 +984,7 @@ func New( | |
app.StatsKeeper, | ||
app.RewardsKeeper, | ||
app.IndexerEventManager, | ||
app.GrpcStreamingManager, | ||
txConfig.TxDecoder(), | ||
clobFlags, | ||
rate_limit.NewPanicRateLimiter[*clobmoduletypes.MsgPlaceOrder](), | ||
|
@@ -1740,3 +1749,17 @@ func getIndexerFromOptions( | |
} | ||
return indexerMessageSender, indexerFlags | ||
} | ||
|
||
// getGrpcStreamingManagerFromOptions returns an instance of a streamingtypes.GrpcStreamingManager from the specified | ||
// options. This function will default to returning a no-op instance. | ||
func getGrpcStreamingManagerFromOptions( | ||
appFlags flags.Flags, | ||
appOpts servertypes.AppOptions, | ||
logger log.Logger, | ||
) (manager streamingtypes.GrpcStreamingManager) { | ||
// TODO(CT-625): add command line flags for full node streaming. | ||
if appFlags.NonValidatingFullNode { | ||
return streaming.NewGrpcStreamingManager() | ||
} | ||
return streaming.NewNoopGrpcStreamingManager() | ||
} | ||
Comment on lines
+1752
to
+1765
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The function Consider implementing the flag-based logic for initializing the |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package grpc | ||
|
||
import ( | ||
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" | ||
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" | ||
) | ||
|
||
var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil) | ||
|
||
// GrpcStreamingManagerImpl is an implementation for managing gRPC streaming subscriptions. | ||
type GrpcStreamingManagerImpl struct { | ||
} | ||
|
||
func NewGrpcStreamingManager() *GrpcStreamingManagerImpl { | ||
return &GrpcStreamingManagerImpl{} | ||
} | ||
|
||
func (sm *GrpcStreamingManagerImpl) Enabled() bool { | ||
return true | ||
} | ||
|
||
// Subscribe subscribes to the orderbook updates stream. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe add a comment about lifecycles of the req, srv, finished variables. i think there's a stream and a chan going on so not too sure when the finished chan returns a bool. |
||
func (sm *GrpcStreamingManagerImpl) Subscribe( | ||
req clobtypes.StreamOrderbookUpdatesRequest, | ||
srv clobtypes.Query_StreamOrderbookUpdatesServer, | ||
) ( | ||
finished chan<- bool, | ||
err error, | ||
) { | ||
return nil, nil | ||
} | ||
|
||
// SendOrderbookUpdates groups updates by their clob pair ids and | ||
// sends messages to the subscribers. | ||
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( | ||
updates *clobtypes.OffchainUpdates, | ||
) { | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package grpc | ||
|
||
import ( | ||
"github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" | ||
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" | ||
) | ||
|
||
var _ types.GrpcStreamingManager = (*NoopGrpcStreamingManager)(nil) | ||
|
||
type NoopGrpcStreamingManager struct{} | ||
|
||
func NewNoopGrpcStreamingManager() *NoopGrpcStreamingManager { | ||
return &NoopGrpcStreamingManager{} | ||
} | ||
|
||
func (sm *NoopGrpcStreamingManager) Enabled() bool { | ||
return false | ||
} | ||
|
||
func (sm *NoopGrpcStreamingManager) Subscribe( | ||
req clobtypes.StreamOrderbookUpdatesRequest, | ||
srv clobtypes.Query_StreamOrderbookUpdatesServer, | ||
) ( | ||
finished chan<- bool, | ||
err error, | ||
) { | ||
return nil, nil | ||
} | ||
|
||
func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( | ||
updates *clobtypes.OffchainUpdates, | ||
) { | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package types | ||
|
||
import ( | ||
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" | ||
) | ||
|
||
type GrpcStreamingManager interface { | ||
Enabled() bool | ||
|
||
// L3+ Orderbook updates. | ||
Subscribe( | ||
req clobtypes.StreamOrderbookUpdatesRequest, | ||
srv clobtypes.Query_StreamOrderbookUpdatesServer, | ||
) ( | ||
finished chan<- bool, | ||
err error, | ||
) | ||
SendOrderbookUpdates(*clobtypes.OffchainUpdates) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initialization of
GrpcStreamingManager
usinggetGrpcStreamingManagerFromOptions
is a flexible approach that allows for configuration-based instantiation. However, it's important to ensure that the options and flags used for determining the manager type are clearly documented and validated. Additionally, consider adding error handling or logging withingetGrpcStreamingManagerFromOptions
to handle potential issues during initialization.Committable suggestion