Skip to content

Commit

Permalink
[Backport v3.x] backport full node streaming - stateful order updates (
Browse files Browse the repository at this point in the history
…#1269)

* [CT-700] separate indexer and grpc streaming events (#1209)

* [CT-700] separate indexer and grpc streaming events

* fix tests

* comments

* update

* [CT-700] only send response when there is at least one update (#1216)

* [CT-712] send order update when short term order state fill amounts are pruned (#1241)

* [CT-712] send fill amount updates for reverted operations (#1240)

* [CT-723] add block number + stage to grpc updates (#1252)

* [CT-723] add block number + stage to grpc updates

* add indexer changes

* [CT-727] avoid state reads when sending updates (#1261)

* fix lint|

* [CT-712] send updates for both normal order matches and liquidation (#1280)

* fix test

* fix test

* update type
  • Loading branch information
jayy04 authored Apr 1, 2024
1 parent 2a47cf5 commit dc1387d
Show file tree
Hide file tree
Showing 25 changed files with 518 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,15 @@ export interface StreamOrderbookUpdatesResponse {
*/

snapshot: boolean;
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

blockHeight: number;
/** Exec mode of the updates. */

execMode: number;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
Expand All @@ -250,6 +259,15 @@ export interface StreamOrderbookUpdatesResponseSDKType {
*/

snapshot: boolean;
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

block_height: number;
/** Exec mode of the updates. */

exec_mode: number;
}

function createBaseQueryGetClobPairRequest(): QueryGetClobPairRequest {
Expand Down Expand Up @@ -904,7 +922,9 @@ export const StreamOrderbookUpdatesRequest = {
function createBaseStreamOrderbookUpdatesResponse(): StreamOrderbookUpdatesResponse {
return {
updates: [],
snapshot: false
snapshot: false,
blockHeight: 0,
execMode: 0
};
}

Expand All @@ -918,6 +938,14 @@ export const StreamOrderbookUpdatesResponse = {
writer.uint32(16).bool(message.snapshot);
}

if (message.blockHeight !== 0) {
writer.uint32(24).uint32(message.blockHeight);
}

if (message.execMode !== 0) {
writer.uint32(32).uint32(message.execMode);
}

return writer;
},

Expand All @@ -938,6 +966,14 @@ export const StreamOrderbookUpdatesResponse = {
message.snapshot = reader.bool();
break;

case 3:
message.blockHeight = reader.uint32();
break;

case 4:
message.execMode = reader.uint32();
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -951,6 +987,8 @@ export const StreamOrderbookUpdatesResponse = {
const message = createBaseStreamOrderbookUpdatesResponse();
message.updates = object.updates?.map(e => OffChainUpdateV1.fromPartial(e)) || [];
message.snapshot = object.snapshot ?? false;
message.blockHeight = object.blockHeight ?? 0;
message.execMode = object.execMode ?? 0;
return message;
}

Expand Down
7 changes: 7 additions & 0 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,11 @@ message StreamOrderbookUpdatesResponse {
// Note that if the snapshot is true, then all previous entries should be
// discarded and the orderbook should be resynced.
bool snapshot = 2;

// ---Additional fields used to debug issues---
// Block height of the updates.
uint32 block_height = 3;

// Exec mode of the updates.
uint32 exec_mode = 4;
}
5 changes: 2 additions & 3 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,9 +885,8 @@ func New(
clobFlags := clobflags.GetClobFlagValuesFromOptions(appOpts)
logger.Info("Parsed CLOB flags", "Flags", clobFlags)

memClob := clobmodulememclob.NewMemClobPriceTimePriority(
app.IndexerEventManager.Enabled() || app.GrpcStreamingManager.Enabled(),
)
memClob := clobmodulememclob.NewMemClobPriceTimePriority(app.IndexerEventManager.Enabled())
memClob.SetGenerateOrderbookUpdates(app.GrpcStreamingManager.Enabled())

app.ClobKeeper = clobmodulekeeper.NewKeeper(
appCodec,
Expand Down
8 changes: 8 additions & 0 deletions protocol/lib/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@ package lib

import (
"fmt"

"github.com/cometbft/cometbft/crypto/tmhash"
)

// Custom exec modes
const (
ExecModeBeginBlock = uint32(100)
ExecModeEndBlock = uint32(101)
ExecModePrepareCheckState = uint32(102)
)

type TxHash string

func GetTxHash(tx []byte) TxHash {
Expand Down
5 changes: 5 additions & 0 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions protocol/mocks/MemClob.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions protocol/mocks/MemClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 17 additions & 9 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(
func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode uint32,
) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
Expand Down Expand Up @@ -103,17 +105,23 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
updatesToSend := make([]ocutypes.OffChainUpdateV1, 0)
for _, clobPairId := range subscription.clobPairIds {
if updates, ok := v1updates[clobPairId]; ok {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updates,
Snapshot: snapshot,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
break
}
updatesToSend = append(updatesToSend, updates...)
}
}

if len(updatesToSend) > 0 {
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: updatesToSend,
Snapshot: snapshot,
BlockHeight: blockHeight,
ExecMode: execMode,
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions protocol/streaming/grpc/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func (sm *NoopGrpcStreamingManager) Subscribe(
func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
updates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode uint32,
) {
}

Expand Down
2 changes: 2 additions & 0 deletions protocol/streaming/grpc/types/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ type GrpcStreamingManager interface {
SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
snapshot bool,
blockHeight uint32,
execMode uint32,
)
}
7 changes: 7 additions & 0 deletions protocol/testutil/memclob/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,10 @@ func (f *FakeMemClobKeeper) ValidateSubaccountEquityTierLimitForNewOrder(ctx sdk
func (f *FakeMemClobKeeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger()
}

func (f *FakeMemClobKeeper) SendOrderbookUpdates(
ctx sdk.Context,
offchainUpdates *types.OffchainUpdates,
snapshot bool,
) {
}
44 changes: 44 additions & 0 deletions protocol/x/clob/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func BeginBlocker(
ctx sdk.Context,
keeper types.ClobKeeper,
) {
ctx = ctx.WithValue("ExecMode", lib.ExecModeBeginBlock)
// Initialize the set of process proposer match events for the next block effectively
// removing any events that occurred in the last block.
keeper.MustSetProcessProposerMatchesEvents(
Expand All @@ -35,6 +36,8 @@ func EndBlocker(
ctx sdk.Context,
keeper keeper.Keeper,
) {
ctx = ctx.WithValue("ExecMode", lib.ExecModeEndBlock)

processProposerMatchesEvents := keeper.GetProcessProposerMatchesEvents(ctx)

// Prune any fill amounts from state which are now past their `pruneableBlockHeight`.
Expand Down Expand Up @@ -117,6 +120,8 @@ func PrepareCheckState(
ctx sdk.Context,
keeper *keeper.Keeper,
) {
ctx = ctx.WithValue("ExecMode", lib.ExecModePrepareCheckState)

// Get the events generated from processing the matches in the latest block.
processProposerMatchesEvents := keeper.GetProcessProposerMatchesEvents(ctx)
if ctx.BlockHeight() != int64(processProposerMatchesEvents.BlockHeight) {
Expand Down Expand Up @@ -152,6 +157,45 @@ func PrepareCheckState(
offchainUpdates,
)

// For orders that are filled in the last block, send an orderbook update to the grpc streams.
if keeper.GetGrpcStreamingManager().Enabled() {
allUpdates := types.NewOffchainUpdates()
orderIdsToSend := make(map[types.OrderId]bool)

// Send an update for reverted local operations.
for _, operation := range localValidatorOperationsQueue {
if match := operation.GetMatch(); match != nil {
// For normal order matches, we send an update for the taker and maker orders.
if matchedOrders := match.GetMatchOrders(); matchedOrders != nil {
orderIdsToSend[matchedOrders.TakerOrderId] = true
for _, fill := range matchedOrders.Fills {
orderIdsToSend[fill.MakerOrderId] = true
}
}
// For liquidation matches, we send an update for the maker orders.
if matchedLiquidation := match.GetMatchPerpetualLiquidation(); matchedLiquidation != nil {
for _, fill := range matchedLiquidation.Fills {
orderIdsToSend[fill.MakerOrderId] = true
}
}
}
}

// Send an update for orders that were proposed.
for _, orderId := range processProposerMatchesEvents.OrderIdsFilledInLastBlock {
orderIdsToSend[orderId] = true
}

// Send update.
for orderId := range orderIdsToSend {
if _, exists := keeper.MemClob.GetOrder(ctx, orderId); exists {
orderbookUpdate := keeper.MemClob.GetOrderbookUpdatesForOrderUpdate(ctx, orderId)
allUpdates.Append(orderbookUpdate)
}
}
keeper.SendOrderbookUpdates(ctx, allUpdates, false)
}

// 3. Place all stateful order placements included in the last block on the memclob.
// Note telemetry is measured outside of the function call because `PlaceStatefulOrdersFromLastBlock`
// is called within `PlaceConditionalOrdersTriggeredInLastBlock`.
Expand Down
6 changes: 3 additions & 3 deletions protocol/x/clob/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestEndBlocker_Failure(t *testing.T) {

for _, orderId := range tc.expiredStatefulOrderIds {
mockIndexerEventManager.On("AddTxnEvent",
ctx,
mock.Anything,
indexerevents.SubtypeStatefulOrder,
indexerevents.StatefulOrderEventVersion,
indexer_manager.GetBytes(
Expand Down Expand Up @@ -764,7 +764,7 @@ func TestEndBlocker_Success(t *testing.T) {
// Assert that the indexer events for Expired Stateful Orders were emitted.
for _, orderId := range tc.expectedProcessProposerMatchesEvents.ExpiredStatefulOrderIds {
mockIndexerEventManager.On("AddTxnEvent",
ctx,
mock.Anything,
indexerevents.SubtypeStatefulOrder,
indexerevents.StatefulOrderEventVersion,
indexer_manager.GetBytes(
Expand All @@ -779,7 +779,7 @@ func TestEndBlocker_Success(t *testing.T) {
// Assert that the indexer events for triggered conditional orders were emitted.
for _, orderId := range tc.expectedTriggeredConditionalOrderIds {
mockIndexerEventManager.On("AddTxnEvent",
ctx,
mock.Anything,
indexerevents.SubtypeStatefulOrder,
indexerevents.StatefulOrderEventVersion,
indexer_manager.GetBytes(
Expand Down
Loading

0 comments on commit dc1387d

Please sign in to comment.