Skip to content

Commit

Permalink
[Backport v4.x] backport full node streaming to v4.x branch (#1270)
Browse files Browse the repository at this point in the history
* [CT-645] Move off chain updates and v1 to a different package (#1131)

* [CT-645] Add protos for orderbook stream query service

* move removal reasons to a separate package

* [CT-645] Add protos for orderbook stream query service (#1133)

* [CT-645] Add protos for orderbook stream query service

* make update not nullable

* fix build

* [CT-644] instantiate grpc stream manager (#1134)

* [CT-644] instantiate grpc stream manager

* update type

* update channel type

* [CT-646] stream offchain updates through stream manager (#1138)

* [CT-646] stream offchain updates through stream manager

* comments

* fix lint

* get rid of finished

* comments

* comments

* [CT-652] add command line flag for full node streaming (#1145)

* [CT-647] construct the initial orderbook snapshot (#1147)

* [CT-647] construct the initial orderbook snapshot

* [CT-647] initialize new streams and send orderbook snapshot (#1152)

* [CT-647] initialize new streams and send orderbook snapshot

* use sync once

* comments

* [CT-700] separate indexer and grpc streaming events (#1209)

* [CT-700] separate indexer and grpc streaming events

* fix tests

* comments

* update

* [CT-700] only send response when there is at least one update (#1216)

* [CT-712] send order update when short term order state fill amounts are pruned (#1241)

* [CT-712] send fill amount updates for reverted operations (#1240)

* [CT-723] add block number + stage to grpc updates (#1252)

* [CT-723] add block number + stage to grpc updates

* add indexer changes

* [CT-727] avoid state reads when sending updates (#1261)
  • Loading branch information
jayy04 committed Mar 28, 2024
1 parent 295c487 commit a87b05b
Show file tree
Hide file tree
Showing 57 changed files with 2,264 additions and 496 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Rpc } from "../../helpers";
import * as _m0 from "protobufjs/minimal";
import { QueryClient, createProtobufRpcClient } from "@cosmjs/stargate";
import { QueryGetClobPairRequest, QueryClobPairResponse, QueryAllClobPairRequest, QueryClobPairAllResponse, MevNodeToNodeCalculationRequest, MevNodeToNodeCalculationResponse, QueryEquityTierLimitConfigurationRequest, QueryEquityTierLimitConfigurationResponse, QueryBlockRateLimitConfigurationRequest, QueryBlockRateLimitConfigurationResponse, QueryLiquidationsConfigurationRequest, QueryLiquidationsConfigurationResponse } from "./query";
import { QueryGetClobPairRequest, QueryClobPairResponse, QueryAllClobPairRequest, QueryClobPairAllResponse, MevNodeToNodeCalculationRequest, MevNodeToNodeCalculationResponse, QueryEquityTierLimitConfigurationRequest, QueryEquityTierLimitConfigurationResponse, QueryBlockRateLimitConfigurationRequest, QueryBlockRateLimitConfigurationResponse, QueryLiquidationsConfigurationRequest, QueryLiquidationsConfigurationResponse, StreamOrderbookUpdatesRequest, StreamOrderbookUpdatesResponse } from "./query";
/** Query defines the gRPC querier service. */

export interface Query {
Expand All @@ -22,6 +22,9 @@ export interface Query {
/** Queries LiquidationsConfiguration. */

liquidationsConfiguration(request?: QueryLiquidationsConfigurationRequest): Promise<QueryLiquidationsConfigurationResponse>;
/** Streams orderbook updates. */

streamOrderbookUpdates(request: StreamOrderbookUpdatesRequest): Promise<StreamOrderbookUpdatesResponse>;
}
export class QueryClientImpl implements Query {
private readonly rpc: Rpc;
Expand All @@ -34,6 +37,7 @@ export class QueryClientImpl implements Query {
this.equityTierLimitConfiguration = this.equityTierLimitConfiguration.bind(this);
this.blockRateLimitConfiguration = this.blockRateLimitConfiguration.bind(this);
this.liquidationsConfiguration = this.liquidationsConfiguration.bind(this);
this.streamOrderbookUpdates = this.streamOrderbookUpdates.bind(this);
}

clobPair(request: QueryGetClobPairRequest): Promise<QueryClobPairResponse> {
Expand Down Expand Up @@ -74,6 +78,12 @@ export class QueryClientImpl implements Query {
return promise.then(data => QueryLiquidationsConfigurationResponse.decode(new _m0.Reader(data)));
}

streamOrderbookUpdates(request: StreamOrderbookUpdatesRequest): Promise<StreamOrderbookUpdatesResponse> {
const data = StreamOrderbookUpdatesRequest.encode(request).finish();
const promise = this.rpc.request("dydxprotocol.clob.Query", "StreamOrderbookUpdates", data);
return promise.then(data => StreamOrderbookUpdatesResponse.decode(new _m0.Reader(data)));
}

}
export const createRpcQueryExtension = (base: QueryClient) => {
const rpc = createProtobufRpcClient(base);
Expand Down Expand Up @@ -101,6 +111,10 @@ export const createRpcQueryExtension = (base: QueryClient) => {

liquidationsConfiguration(request?: QueryLiquidationsConfigurationRequest): Promise<QueryLiquidationsConfigurationResponse> {
return queryService.liquidationsConfiguration(request);
},

streamOrderbookUpdates(request: StreamOrderbookUpdatesRequest): Promise<StreamOrderbookUpdatesResponse> {
return queryService.streamOrderbookUpdates(request);
}

};
Expand Down
203 changes: 203 additions & 0 deletions indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ClobPair, ClobPairSDKType } from "./clob_pair";
import { EquityTierLimitConfiguration, EquityTierLimitConfigurationSDKType } from "./equity_tier_limit_config";
import { BlockRateLimitConfiguration, BlockRateLimitConfigurationSDKType } from "./block_rate_limit_config";
import { LiquidationsConfig, LiquidationsConfigSDKType } from "./liquidations_config";
import { OffChainUpdateV1, OffChainUpdateV1SDKType } from "../indexer/off_chain_updates/off_chain_updates";
import * as _m0 from "protobufjs/minimal";
import { DeepPartial, Long } from "../../helpers";
/** QueryGetClobPairRequest is request type for the ClobPair method. */
Expand Down Expand Up @@ -198,6 +199,76 @@ export interface QueryLiquidationsConfigurationResponse {
export interface QueryLiquidationsConfigurationResponseSDKType {
liquidations_config?: LiquidationsConfigSDKType;
}
/**
* StreamOrderbookUpdatesRequest is a request message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesRequest {
/** Clob pair ids to stream orderbook updates for. */
clobPairId: number[];
}
/**
* StreamOrderbookUpdatesRequest is a request message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesRequestSDKType {
/** Clob pair ids to stream orderbook updates for. */
clob_pair_id: number[];
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesResponse {
/** Orderbook updates for the clob pair. */
updates: OffChainUpdateV1[];
/**
* Snapshot indicates if the response is from a snapshot of the orderbook.
* This is true for the initial response and false for all subsequent updates.
* Note that if the snapshot is true, then all previous entries should be
* discarded and the orderbook should be resynced.
*/

snapshot: boolean;
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

blockHeight: number;
/** Exec mode of the updates. */

execMode: number;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
* StreamOrderbookUpdates method.
*/

export interface StreamOrderbookUpdatesResponseSDKType {
/** Orderbook updates for the clob pair. */
updates: OffChainUpdateV1SDKType[];
/**
* Snapshot indicates if the response is from a snapshot of the orderbook.
* This is true for the initial response and false for all subsequent updates.
* Note that if the snapshot is true, then all previous entries should be
* discarded and the orderbook should be resynced.
*/

snapshot: boolean;
/**
* ---Additional fields used to debug issues---
* Block height of the updates.
*/

block_height: number;
/** Exec mode of the updates. */

exec_mode: number;
}

function createBaseQueryGetClobPairRequest(): QueryGetClobPairRequest {
return {
Expand Down Expand Up @@ -789,4 +860,136 @@ export const QueryLiquidationsConfigurationResponse = {
return message;
}

};

function createBaseStreamOrderbookUpdatesRequest(): StreamOrderbookUpdatesRequest {
return {
clobPairId: []
};
}

export const StreamOrderbookUpdatesRequest = {
encode(message: StreamOrderbookUpdatesRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
writer.uint32(10).fork();

for (const v of message.clobPairId) {
writer.uint32(v);
}

writer.ldelim();
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): StreamOrderbookUpdatesRequest {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseStreamOrderbookUpdatesRequest();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
if ((tag & 7) === 2) {
const end2 = reader.uint32() + reader.pos;

while (reader.pos < end2) {
message.clobPairId.push(reader.uint32());
}
} else {
message.clobPairId.push(reader.uint32());
}

break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<StreamOrderbookUpdatesRequest>): StreamOrderbookUpdatesRequest {
const message = createBaseStreamOrderbookUpdatesRequest();
message.clobPairId = object.clobPairId?.map(e => e) || [];
return message;
}

};

function createBaseStreamOrderbookUpdatesResponse(): StreamOrderbookUpdatesResponse {
return {
updates: [],
snapshot: false,
blockHeight: 0,
execMode: 0
};
}

export const StreamOrderbookUpdatesResponse = {
encode(message: StreamOrderbookUpdatesResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
for (const v of message.updates) {
OffChainUpdateV1.encode(v!, writer.uint32(10).fork()).ldelim();
}

if (message.snapshot === true) {
writer.uint32(16).bool(message.snapshot);
}

if (message.blockHeight !== 0) {
writer.uint32(24).uint32(message.blockHeight);
}

if (message.execMode !== 0) {
writer.uint32(32).uint32(message.execMode);
}

return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): StreamOrderbookUpdatesResponse {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseStreamOrderbookUpdatesResponse();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
message.updates.push(OffChainUpdateV1.decode(reader, reader.uint32()));
break;

case 2:
message.snapshot = reader.bool();
break;

case 3:
message.blockHeight = reader.uint32();
break;

case 4:
message.execMode = reader.uint32();
break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<StreamOrderbookUpdatesResponse>): StreamOrderbookUpdatesResponse {
const message = createBaseStreamOrderbookUpdatesResponse();
message.updates = object.updates?.map(e => OffChainUpdateV1.fromPartial(e)) || [];
message.snapshot = object.snapshot ?? false;
message.blockHeight = object.blockHeight ?? 0;
message.execMode = object.execMode ?? 0;
return message;
}

};
35 changes: 35 additions & 0 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import "dydxprotocol/clob/clob_pair.proto";
import "dydxprotocol/clob/equity_tier_limit_config.proto";
import "dydxprotocol/clob/liquidations_config.proto";
import "dydxprotocol/clob/mev.proto";
import "dydxprotocol/indexer/off_chain_updates/off_chain_updates.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/x/clob/types";

Expand Down Expand Up @@ -50,6 +51,12 @@ service Query {
returns (QueryLiquidationsConfigurationResponse) {
option (google.api.http).get = "/dydxprotocol/clob/liquidations_config";
}

// GRPC Streams

// Streams orderbook updates.
rpc StreamOrderbookUpdates(StreamOrderbookUpdatesRequest)
returns (stream StreamOrderbookUpdatesResponse);
}

// QueryGetClobPairRequest is request type for the ClobPair method.
Expand Down Expand Up @@ -126,3 +133,31 @@ message QueryLiquidationsConfigurationRequest {}
message QueryLiquidationsConfigurationResponse {
LiquidationsConfig liquidations_config = 1 [ (gogoproto.nullable) = false ];
}

// StreamOrderbookUpdatesRequest is a request message for the
// StreamOrderbookUpdates method.
message StreamOrderbookUpdatesRequest {
// Clob pair ids to stream orderbook updates for.
repeated uint32 clob_pair_id = 1;
}

// StreamOrderbookUpdatesResponse is a response message for the
// StreamOrderbookUpdates method.
message StreamOrderbookUpdatesResponse {
// Orderbook updates for the clob pair.
repeated dydxprotocol.indexer.off_chain_updates.OffChainUpdateV1 updates = 1
[ (gogoproto.nullable) = false ];

// Snapshot indicates if the response is from a snapshot of the orderbook.
// This is true for the initial response and false for all subsequent updates.
// Note that if the snapshot is true, then all previous entries should be
// discarded and the orderbook should be resynced.
bool snapshot = 2;

// ---Additional fields used to debug issues---
// Block height of the updates.
uint32 block_height = 3;

// Exec mode of the updates.
uint32 exec_mode = 4;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package dydxprotocol.indexer.off_chain_updates;
import "dydxprotocol/indexer/shared/removal_reason.proto";
import "dydxprotocol/indexer/protocol/v1/clob.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates";
option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types";

// Do not make any breaking changes to these protos, a new version should be
// created if a breaking change is needed.
Expand Down
2 changes: 1 addition & 1 deletion proto/dydxprotocol/indexer/protocol/v1/clob.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package dydxprotocol.indexer.protocol.v1;
import "dydxprotocol/indexer/protocol/v1/subaccount.proto";
import "gogoproto/gogo.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1";
option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types";

// Initial copy of protos from dYdX chain application state protos for the clob
// module for use to send Indexer specific messages. Do not make any breaking
Expand Down
2 changes: 1 addition & 1 deletion proto/dydxprotocol/indexer/protocol/v1/subaccount.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package dydxprotocol.indexer.protocol.v1;
import "cosmos_proto/cosmos.proto";
import "gogoproto/gogo.proto";

option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1";
option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types";

// Initial copy of protos from dYdX chain application state protos for the
// subaccount module for use to send Indexer specific messages. Do not make any
Expand Down
2 changes: 1 addition & 1 deletion proto/dydxprotocol/indexer/shared/removal_reason.proto
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
syntax = "proto3";
package dydxprotocol.indexer.shared;

option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/shared";
option go_package = "github.com/dydxprotocol/v4-chain/protocol/indexer/shared/types";

// TODO(DEC-869): Update reasons/statuses for Advanced Orders.

Expand Down
Loading

0 comments on commit a87b05b

Please sign in to comment.