Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Slinky full integration PR #1141

Merged
merged 19 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 169 additions & 54 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ import (
"github.com/cosmos/ibc-go/modules/capability"
capabilitykeeper "github.com/cosmos/ibc-go/modules/capability/keeper"
capabilitytypes "github.com/cosmos/ibc-go/modules/capability/types"
"github.com/dydxprotocol/v4-chain/protocol/daemons/configs"
"github.com/gorilla/mux"
"github.com/rakyll/statik/fs"
"github.com/spf13/cast"
"go.uber.org/zap"
"google.golang.org/grpc"

// App
Comment on lines 89 to 98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 NOTE
This review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [738-738]

Given the static analysis hint and the context of the PR, it's crucial to ensure secure communication for gRPC servers. The setup for the gRPC server lacks encryption, which could expose sensitive data or allow unauthorized access. Consider using SSL/TLS credentials for a secure connection.

- app.Server = daemonserver.NewServer(logger, grpc.NewServer(), &daemontypes.FileHandlerImpl{}, daemonFlags.Shared.SocketAddress)
+ app.Server = daemonserver.NewServer(logger, grpc.NewServer(grpc.Creds(credentials.NewServerTLSFromFile("cert.pem", "cert.key"))), &daemontypes.FileHandlerImpl{}, daemonFlags.Shared.SocketAddress)

Expand All @@ -103,6 +103,7 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/app/prepare/prices"
"github.com/dydxprotocol/v4-chain/protocol/app/process"

"github.com/dydxprotocol/v4-chain/protocol/app/vote_extensions"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
timelib "github.com/dydxprotocol/v4-chain/protocol/lib/time"
Expand All @@ -113,6 +114,7 @@ import (

// Daemons
bridgeclient "github.com/dydxprotocol/v4-chain/protocol/daemons/bridge/client"
"github.com/dydxprotocol/v4-chain/protocol/daemons/configs"
daemonflags "github.com/dydxprotocol/v4-chain/protocol/daemons/flags"
liquidationclient "github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/client"
metricsclient "github.com/dydxprotocol/v4-chain/protocol/daemons/metrics/client"
Expand Down Expand Up @@ -205,9 +207,16 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"

// Slinky
slinkyproposals "github.com/skip-mev/slinky/abci/proposals"
"github.com/skip-mev/slinky/abci/strategies/aggregator"
compression "github.com/skip-mev/slinky/abci/strategies/codec"
"github.com/skip-mev/slinky/abci/strategies/currencypair"
"github.com/skip-mev/slinky/abci/ve"
oracleconfig "github.com/skip-mev/slinky/oracle/config"
"github.com/skip-mev/slinky/pkg/math/voteweighted"
oracleclient "github.com/skip-mev/slinky/service/clients/oracle"
servicemetrics "github.com/skip-mev/slinky/service/metrics"
promserver "github.com/skip-mev/slinky/service/servers/prometheus"

// Grpc Streaming
streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc"
Expand Down Expand Up @@ -327,11 +336,13 @@ type App struct {
PriceFeedClient *pricefeedclient.Client
LiquidationsClient *liquidationclient.Client
BridgeClient *bridgeclient.Client
SlinkyClient *slinkyclient.Client

DaemonHealthMonitor *daemonservertypes.HealthMonitor

// Slinky
SlinkyClient *slinkyclient.Client
oraclePrometheusServer *promserver.PrometheusServer
oracleMetrics servicemetrics.Metrics
}

// assertAppPreconditions assert invariants required for an application to start.
Expand Down Expand Up @@ -448,6 +459,7 @@ func New(
return nil
},
)
app.initOracleMetrics(appOpts)

app.ParamsKeeper = initParamsKeeper(appCodec, cdc, keys[paramstypes.StoreKey], tkeys[paramstypes.TStoreKey])

Expand Down Expand Up @@ -1359,51 +1371,10 @@ func New(
app.SetPrecommiter(app.Precommitter)
app.SetPrepareCheckStater(app.PrepareCheckStater)

// PrepareProposal setup.
priceUpdateGenerator := prices.NewDefaultPriceUpdateGenerator(app.PricesKeeper)
if appFlags.NonValidatingFullNode {
app.SetPrepareProposal(prepare.FullNodePrepareProposalHandler())
} else {
app.SetPrepareProposal(
prepare.PrepareProposalHandler(
txConfig,
app.BridgeKeeper,
app.ClobKeeper,
app.PerpetualsKeeper,
priceUpdateGenerator,
),
)
}

// ProcessProposal setup.
priceUpdateDecoder := process.NewDefaultUpdateMarketPriceTxDecoder(app.PricesKeeper, app.txConfig.TxDecoder())
if appFlags.NonValidatingFullNode {
// Note: If the command-line flag `--non-validating-full-node` is enabled, this node will use
// an implementation of `ProcessProposal` which always returns `abci.ResponseProcessProposal_ACCEPT`.
// Full-nodes do not participate in consensus, and therefore should not participate in voting / `ProcessProposal`.
app.SetProcessProposal(
process.FullNodeProcessProposalHandler(
txConfig,
app.BridgeKeeper,
app.ClobKeeper,
app.StakingKeeper,
app.PerpetualsKeeper,
priceUpdateDecoder,
),
)
} else {
app.SetProcessProposal(
process.ProcessProposalHandler(
txConfig,
app.BridgeKeeper,
app.ClobKeeper,
app.StakingKeeper,
app.PerpetualsKeeper,
app.PricesKeeper,
priceUpdateDecoder,
),
)
}
// ProposalHandler setup.
prepareProposalHandler, processProposalHandler := app.createProposalHandlers(appFlags, txConfig, appOpts)
app.SetPrepareProposal(prepareProposalHandler)
app.SetProcessProposal(processProposalHandler)

// Note that panics from out of gas errors won't get logged, since the `OutOfGasMiddleware` is added in front of this,
// so error will get handled by that middleware and subsequent middlewares won't get executed.
Expand Down Expand Up @@ -1454,27 +1425,168 @@ func New(
}

func (app *App) initSlinkySidecarClient(appOpts servertypes.AppOptions) oracleclient.OracleClient {
// Slinky setup
// Create the oracle service.
cfg, err := oracleconfig.ReadConfigFromAppOpts(appOpts)
if err != nil {
panic(err)
}
oracleMetrics, err := servicemetrics.NewMetricsFromConfig(cfg, app.ChainID())
if err != nil {
panic(err)
}
// Create the oracle service.
slinkyClient, err := oracleclient.NewClientFromConfig(
cfg,
app.Logger().With("client", "oracle"),
oracleMetrics,
app.oracleMetrics,
Comment on lines +1428 to +1436
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The creation of the oracleClient within initSlinkySidecarClient function does not handle potential errors gracefully. It's important to ensure that the application can recover or exit cleanly in case of configuration errors.

- slinkyClient, err := oracleclient.NewClientFromConfig(cfg, app.Logger().With("client", "oracle"), app.oracleMetrics)
- if err != nil {
-     panic(err)
- }
+ slinkyClient, err := oracleclient.NewClientFromConfig(cfg, app.Logger().With("client", "oracle"), app.oracleMetrics)
+ if err != nil {
+     app.Logger().Error("Failed to create oracle client from config", "error", err)
+     return nil // Or handle the error as appropriate for your application's error handling strategy
+ }

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
// Create the oracle service.
cfg, err := oracleconfig.ReadConfigFromAppOpts(appOpts)
if err != nil {
panic(err)
}
oracleMetrics, err := servicemetrics.NewMetricsFromConfig(cfg, app.ChainID())
if err != nil {
panic(err)
}
// Create the oracle service.
slinkyClient, err := oracleclient.NewClientFromConfig(
cfg,
app.Logger().With("client", "oracle"),
oracleMetrics,
app.oracleMetrics,
// Create the oracle service.
cfg, err := oracleconfig.ReadConfigFromAppOpts(appOpts)
if err != nil {
panic(err)
}
slinkyClient, err := oracleclient.NewClientFromConfig(
cfg,
app.Logger().With("client", "oracle"),
app.oracleMetrics,
if err != nil {
app.Logger().Error("Failed to create oracle client from config", "error", err)
return nil // Or handle the error as appropriate for your application's error handling strategy
}

)
if err != nil {
panic(err)
}
return slinkyClient
}

func (app *App) createProposalHandlers(
appFlags flags.Flags,
txConfig client.TxConfig,
appOpts servertypes.AppOptions,
) (sdk.PrepareProposalHandler, sdk.ProcessProposalHandler) {
var priceUpdateDecoder process.UpdateMarketPriceTxDecoder = process.NewDefaultUpdateMarketPriceTxDecoder(
app.PricesKeeper, app.txConfig.TxDecoder())
// If the node is a NonValidatingFullNode, we don't need to run any oracle code
// Note: If the command-line flag `--non-validating-full-node` is enabled, this node will use
// an implementation of `ProcessProposal` which always returns `abci.ResponseProcessProposal_ACCEPT`.
// Full-nodes do not participate in consensus, and therefore should not participate in voting / `ProcessProposal`.
if appFlags.NonValidatingFullNode {
if app.oracleMetrics == nil {
app.oracleMetrics = servicemetrics.NewNopMetrics()
}
return prepare.FullNodePrepareProposalHandler(), process.FullNodeProcessProposalHandler(
txConfig,
app.BridgeKeeper,
app.ClobKeeper,
app.StakingKeeper,
app.PerpetualsKeeper,
priceUpdateDecoder,
)
}
strategy := currencypair.NewDefaultCurrencyPairStrategy(app.PricesKeeper)
var priceUpdateGenerator prices.PriceUpdateGenerator = prices.NewDefaultPriceUpdateGenerator(app.PricesKeeper)

veCodec := compression.NewCompressionVoteExtensionCodec(
compression.NewDefaultVoteExtensionCodec(),
compression.NewZLibCompressor(),
)
extCommitCodec := compression.NewCompressionExtendedCommitCodec(
compression.NewDefaultExtendedCommitCodec(),
compression.NewZLibCompressor(),
)

// Set Price Update Generators/Decoders for Slinky
if appFlags.VEOracleEnabled {
priceUpdateGenerator = prices.NewSlinkyPriceUpdateGenerator(
aggregator.NewDefaultVoteAggregator(
app.Logger(),
voteweighted.MedianFromContext(
app.Logger(),
app.StakingKeeper,
voteweighted.DefaultPowerThreshold,
),
strategy,
),
extCommitCodec,
veCodec,
strategy,
)
priceUpdateDecoder = process.NewSlinkyMarketPriceDecoder(
priceUpdateDecoder,
priceUpdateGenerator,
)
}
// Generate the dydx handlers
dydxPrepareProposalHandler := prepare.PrepareProposalHandler(
txConfig,
app.BridgeKeeper,
app.ClobKeeper,
app.PerpetualsKeeper,
priceUpdateGenerator,
)

// ProcessProposal setup.
var dydxProcessProposalHandler = process.ProcessProposalHandler(
txConfig,
app.BridgeKeeper,
app.ClobKeeper,
app.StakingKeeper,
app.PerpetualsKeeper,
app.PricesKeeper,
priceUpdateDecoder,
)

// Wrap dydx handlers with slinky handlers
if appFlags.VEOracleEnabled {
app.initOracle(priceUpdateDecoder)
proposalHandler := slinkyproposals.NewProposalHandler(
app.Logger(),
dydxPrepareProposalHandler,
dydxProcessProposalHandler,
ve.NewDefaultValidateVoteExtensionsFn(app.ChainID(), app.StakingKeeper),
veCodec,
extCommitCodec,
strategy,
app.oracleMetrics,
slinkyproposals.RetainOracleDataInWrappedProposalHandler(),
)
return proposalHandler.PrepareProposalHandler(), proposalHandler.ProcessProposalHandler()
}
return dydxPrepareProposalHandler, dydxProcessProposalHandler
}

func (app *App) initOracle(pricesTxDecoder process.UpdateMarketPriceTxDecoder) {
// Vote Extension setup.
slinkyVoteExtensionsHandler := ve.NewVoteExtensionHandler(
app.Logger(),
vote_extensions.NewOraclePrices(app.PricesKeeper),
time.Second,
currencypair.NewDefaultCurrencyPairStrategy(app.PricesKeeper),
compression.NewCompressionVoteExtensionCodec(
compression.NewDefaultVoteExtensionCodec(),
compression.NewZLibCompressor(),
),
app.PreBlocker,
app.oracleMetrics,
)

dydxExtendVoteHandler := vote_extensions.ExtendVoteHandler{
SlinkyExtendVoteHandler: slinkyVoteExtensionsHandler.ExtendVoteHandler(),
PricesTxDecoder: pricesTxDecoder,
PricesKeeper: app.PricesKeeper,
}

app.SetExtendVoteHandler(dydxExtendVoteHandler.ExtendVoteHandler())
app.SetVerifyVoteExtensionHandler(slinkyVoteExtensionsHandler.VerifyVoteExtensionHandler())
}

func (app *App) initOracleMetrics(appOpts servertypes.AppOptions) {
cfg, err := oracleconfig.ReadConfigFromAppOpts(appOpts)
if err != nil {
panic(err)
}
oracleMetrics, err := servicemetrics.NewMetricsFromConfig(cfg, app.ChainID())
if err != nil {
panic(err)
}
// run prometheus metrics
if cfg.MetricsEnabled {
promLogger, err := zap.NewProduction()
if err != nil {
panic(err)
}
app.oraclePrometheusServer, err = promserver.NewPrometheusServer(cfg.PrometheusServerAddress, promLogger)
if err != nil {
panic(err)
}
// start the prometheus server
go app.oraclePrometheusServer.Start()
}
app.oracleMetrics = oracleMetrics
}

// RegisterDaemonWithHealthMonitor registers a daemon service with the update monitor, which will commence monitoring
// the health of the daemon. If the daemon does not register, the method will panic.
func (app *App) RegisterDaemonWithHealthMonitor(
Expand Down Expand Up @@ -1749,6 +1861,9 @@ func (app *App) setAnteHandler(txConfig client.TxConfig) {
// Close invokes an ordered shutdown of routines.
func (app *App) Close() error {
app.BaseApp.Close()
if app.oraclePrometheusServer != nil {
app.oraclePrometheusServer.Close()
}
Comment on lines +1864 to +1866
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing the oraclePrometheusServer without checking if an error occurred during the shutdown can lead to unnoticed issues. It's a good practice to log or handle errors even during shutdown processes.

- app.oraclePrometheusServer.Close()
+ if err := app.oraclePrometheusServer.Close(); err != nil {
+     app.Logger().Error("Failed to close oracle Prometheus server", "error", err)
+ }

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
if app.oraclePrometheusServer != nil {
app.oraclePrometheusServer.Close()
}
if app.oraclePrometheusServer != nil {
if err := app.oraclePrometheusServer.Close(); err != nil {
app.Logger().Error("Failed to close oracle Prometheus server", "error", err)
}
}

return app.closeOnce()
}

Expand Down
1 change: 1 addition & 0 deletions protocol/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func TestAppIsFullyInitialized(t *testing.T) {
"LiquidationsClient",
"BridgeClient",
"SlinkyClient",
"oraclePrometheusServer",

// Any default constructed type can be considered initialized if the default is what is
// expected. getUninitializedStructFields relies on fields being the non-default and
Expand Down
17 changes: 17 additions & 0 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Flags struct {

// Grpc Streaming
GrpcStreamingEnabled bool
VEOracleEnabled bool // Slinky Vote Extensions
}

// List of CLI flags.
Expand All @@ -37,6 +38,9 @@ const (

// Grpc Streaming
GrpcStreamingEnabled = "grpc-streaming-enabled"

// Slinky VEs enabled
VEOracleEnabled = "slinky-vote-extension-oracle-enabled"
)

// Default values.
Expand All @@ -47,6 +51,7 @@ const (
DefaultDdErrorTrackingFormat = false

DefaultGrpcStreamingEnabled = false
DefaultVEOracleEnabled = true
)

// AddFlagsToCmd adds flags to app initialization.
Expand Down Expand Up @@ -80,6 +85,11 @@ func AddFlagsToCmd(cmd *cobra.Command) {
DefaultGrpcStreamingEnabled,
"Whether to enable grpc streaming for full nodes",
)
cmd.Flags().Bool(
VEOracleEnabled,
DefaultVEOracleEnabled,
"Whether to run on-chain oracle via slinky vote extensions",
)
}

// Validate checks that the flags are valid.
Expand Down Expand Up @@ -119,6 +129,7 @@ func GetFlagValuesFromOptions(
GrpcEnable: true,

GrpcStreamingEnabled: DefaultGrpcStreamingEnabled,
VEOracleEnabled: true,
}

// Populate the flags if they exist.
Expand Down Expand Up @@ -164,5 +175,11 @@ func GetFlagValuesFromOptions(
}
}

if option := appOpts.Get(VEOracleEnabled); option != nil {
if v, err := cast.ToBoolE(option); err == nil {
result.VEOracleEnabled = v
}
}

return result
}
2 changes: 1 addition & 1 deletion protocol/app/prepare/full_node_prepare_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ func FullNodePrepareProposalHandler() sdk.PrepareProposalHandler {
recordErrorMetricsWithLabel(metrics.PrepareProposalTxs)

// Return an empty response if the node is running in full-node mode so that the proposal fails.
return &EmptyResponse, nil
return &abci.ResponsePrepareProposal{Txs: [][]byte{}}, nil
}
}
Loading
Loading