diff --git a/indexer/packages/postgres/__tests__/lib/parent-subaccount-helpers.ts b/indexer/packages/postgres/__tests__/lib/parent-subaccount-helpers.ts new file mode 100644 index 0000000000..b31d98ba98 --- /dev/null +++ b/indexer/packages/postgres/__tests__/lib/parent-subaccount-helpers.ts @@ -0,0 +1,15 @@ +import { + getParentSubaccountNum, +} from '../../src/lib/parent-subaccount-helpers'; + +describe('getParentSubaccountNum', () => { + it('Gets the parent subaccount number from a child subaccount number', () => { + expect(getParentSubaccountNum(0)).toEqual(0); + expect(getParentSubaccountNum(128)).toEqual(0); + expect(getParentSubaccountNum(128 * 999 - 1)).toEqual(127); + }); + + it('Throws an error if the child subaccount number is greater than the max child subaccount number', () => { + expect(() => getParentSubaccountNum(128001)).toThrowError('Child subaccount number must be less than or equal to 128000'); + }); +}); diff --git a/indexer/packages/postgres/src/index.ts b/indexer/packages/postgres/src/index.ts index 9d10366af3..e846aace61 100644 --- a/indexer/packages/postgres/src/index.ts +++ b/indexer/packages/postgres/src/index.ts @@ -48,6 +48,7 @@ export * as uuid from './helpers/uuid'; export * as protocolTranslations from './lib/protocol-translations'; export * as orderTranslations from './lib/order-translations'; export * as apiTranslations from './lib/api-translations'; +export * as parentSubaccountHelpers from './lib/parent-subaccount-helpers'; export * as dbHelpers from './helpers/db-helpers'; export * as storeHelpers from './helpers/stores-helpers'; diff --git a/indexer/packages/postgres/src/lib/parent-subaccount-helpers.ts b/indexer/packages/postgres/src/lib/parent-subaccount-helpers.ts new file mode 100644 index 0000000000..b58ebee12c --- /dev/null +++ b/indexer/packages/postgres/src/lib/parent-subaccount-helpers.ts @@ -0,0 +1,11 @@ +import { + CHILD_SUBACCOUNT_MULTIPLIER, + MAX_PARENT_SUBACCOUNTS, +} from '../constants'; + +export function getParentSubaccountNum(childSubaccountNum: number): number { + if (childSubaccountNum > MAX_PARENT_SUBACCOUNTS * CHILD_SUBACCOUNT_MULTIPLIER) { + throw new Error(`Child subaccount number must be less than or equal to ${MAX_PARENT_SUBACCOUNTS * CHILD_SUBACCOUNT_MULTIPLIER}`); + } + return childSubaccountNum % MAX_PARENT_SUBACCOUNTS; +} diff --git a/indexer/services/comlink/__tests__/lib/helpers.test.ts b/indexer/services/comlink/__tests__/lib/helpers.test.ts index 9f6728b522..b2444d96f1 100644 --- a/indexer/services/comlink/__tests__/lib/helpers.test.ts +++ b/indexer/services/comlink/__tests__/lib/helpers.test.ts @@ -39,7 +39,8 @@ import { getSignedNotionalAndRisk, getTotalUnsettledFunding, getPerpetualPositionsWithUpdatedFunding, - initializePerpetualPositionsWithFunding, getChildSubaccountNums, getParentSubaccountNum, + initializePerpetualPositionsWithFunding, + getChildSubaccountNums, } from '../../src/lib/helpers'; import _ from 'lodash'; import Big from 'big.js'; @@ -720,18 +721,4 @@ describe('helpers', () => { expect(() => getChildSubaccountNums(128)).toThrowError('Parent subaccount number must be less than 128'); }); }); - - describe('getParentSubaccountNum', () => { - it('Gets the parent subaccount number from a child subaccount number', () => { - expect(getParentSubaccountNum(0)).toEqual(0); - expect(getParentSubaccountNum(128)).toEqual(0); - expect(getParentSubaccountNum(128 * 999 - 1)).toEqual(127); - }); - }); - - describe('getParentSubaccountNum', () => { - it('Throws an error if the child subaccount number is greater than the max child subaccount number', () => { - expect(() => getParentSubaccountNum(128001)).toThrowError('Child subaccount number must be less than 128000'); - }); - }); }); diff --git a/indexer/services/comlink/src/lib/helpers.ts b/indexer/services/comlink/src/lib/helpers.ts index eab74bf228..a40275aad9 100644 --- a/indexer/services/comlink/src/lib/helpers.ts +++ b/indexer/services/comlink/src/lib/helpers.ts @@ -525,16 +525,3 @@ export function getChildSubaccountIds(address: string, parentSubaccountNum: numb (subaccountNumber: number): string => SubaccountTable.uuid(address, subaccountNumber), ); } - -/** - * Gets the parent subaccount number from a child subaccount number - * Parent subaccount = childSubaccount % 128 - * @param childSubaccountNum - * @returns - */ -export function getParentSubaccountNum(childSubaccountNum: number): number { - if (childSubaccountNum > MAX_PARENT_SUBACCOUNTS * CHILD_SUBACCOUNT_MULTIPLIER) { - throw new Error(`Child subaccount number must be less than ${MAX_PARENT_SUBACCOUNTS * CHILD_SUBACCOUNT_MULTIPLIER}`); - } - return childSubaccountNum % MAX_PARENT_SUBACCOUNTS; -} diff --git a/indexer/services/comlink/src/request-helpers/request-transformer.ts b/indexer/services/comlink/src/request-helpers/request-transformer.ts index c173cb9b64..3c7012b275 100644 --- a/indexer/services/comlink/src/request-helpers/request-transformer.ts +++ b/indexer/services/comlink/src/request-helpers/request-transformer.ts @@ -26,13 +26,13 @@ import { TradingRewardFromDatabase, TransferFromDatabase, TransferType, + parentSubaccountHelpers, } from '@dydxprotocol-indexer/postgres'; import { OrderbookLevels, PriceLevel } from '@dydxprotocol-indexer/redis'; import { RedisOrder } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; import _ from 'lodash'; -import { getParentSubaccountNum } from '../lib/helpers'; import { AssetById, AssetPositionResponseObject, @@ -244,12 +244,15 @@ export function transferToParentSubaccountResponseObject( const senderParentSubaccountNum = transfer.senderWalletAddress ? undefined - : getParentSubaccountNum(subaccountMap[transfer.senderSubaccountId!].subaccountNumber, + : parentSubaccountHelpers.getParentSubaccountNum( + subaccountMap[transfer.senderSubaccountId!].subaccountNumber, ); const recipientParentSubaccountNum = transfer.recipientWalletAddress ? undefined - : getParentSubaccountNum(subaccountMap[transfer.recipientSubaccountId!].subaccountNumber); + : parentSubaccountHelpers.getParentSubaccountNum( + subaccountMap[transfer.recipientSubaccountId!].subaccountNumber, + ); // Determine transfer type based on parent subaccount number. let transferType: TransferType = TransferType.TRANSFER_IN; diff --git a/indexer/services/socks/__tests__/constants.ts b/indexer/services/socks/__tests__/constants.ts index 10b9361330..8d78444bc3 100644 --- a/indexer/services/socks/__tests__/constants.ts +++ b/indexer/services/socks/__tests__/constants.ts @@ -5,6 +5,9 @@ import { SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, TRADES_WEBSOCKET_MESSAGE_VERSION, } from '@dydxprotocol-indexer/kafka'; +import { + MAX_PARENT_SUBACCOUNTS, +} from '@dydxprotocol-indexer/postgres'; import { CandleMessage, CandleMessage_Resolution, @@ -28,10 +31,20 @@ export const defaultTxIndex: number = 1; export const defaultEventIndex: number = 3; export const defaultOwner: string = 'owner'; export const defaultAccNumber: number = 4; +export const defaultChildAccNumber: number = defaultAccNumber + MAX_PARENT_SUBACCOUNTS; +export const defaultChildAccNumber2: number = defaultAccNumber + 2 * MAX_PARENT_SUBACCOUNTS; export const defaultSubaccountId: SubaccountId = { owner: defaultOwner, number: defaultAccNumber, }; +export const defaultChildSubaccountId: SubaccountId = { + owner: defaultOwner, + number: defaultChildAccNumber, +}; +export const defaultChildSubaccountId2: SubaccountId = { + owner: defaultOwner, + number: defaultChildAccNumber2, +}; export const defaultContents: Object = { prop: 'property', field: 'field', @@ -82,3 +95,10 @@ export const tradesMessage: TradeMessage = { contents: defaultContentsString, version: TRADES_WEBSOCKET_MESSAGE_VERSION, }; + +export const childSubaccountMessage: SubaccountMessage = { + ...commonMsgProps, + subaccountId: defaultChildSubaccountId, + contents: defaultContentsString, + version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, +}; diff --git a/indexer/services/socks/__tests__/helpers/from-kafka-helpers.test.ts b/indexer/services/socks/__tests__/helpers/from-kafka-helpers.test.ts index 1c2be43da2..7415f12f18 100644 --- a/indexer/services/socks/__tests__/helpers/from-kafka-helpers.test.ts +++ b/indexer/services/socks/__tests__/helpers/from-kafka-helpers.test.ts @@ -1,4 +1,4 @@ -import { getChannel, getMessageToForward } from '../../src/helpers/from-kafka-helpers'; +import { getChannels, getMessageToForward } from '../../src/helpers/from-kafka-helpers'; import { InvalidForwardMessageError, InvalidTopicError } from '../../src/lib/errors'; import { Channel, @@ -17,7 +17,9 @@ import { marketsMessage, orderbookMessage, subaccountMessage, + childSubaccountMessage, tradesMessage, + defaultChildAccNumber, } from '../constants'; import { KafkaMessage } from 'kafkajs'; import { createKafkaMessage } from './kafka'; @@ -39,17 +41,20 @@ import { describe('from-kafka-helpers', () => { describe('getChannel', () => { it.each([ - [WebsocketTopics.TO_WEBSOCKETS_CANDLES, Channel.V4_CANDLES], - [WebsocketTopics.TO_WEBSOCKETS_MARKETS, Channel.V4_MARKETS], - [WebsocketTopics.TO_WEBSOCKETS_ORDERBOOKS, Channel.V4_ORDERBOOK], - [WebsocketTopics.TO_WEBSOCKETS_SUBACCOUNTS, Channel.V4_ACCOUNTS], - [WebsocketTopics.TO_WEBSOCKETS_TRADES, Channel.V4_TRADES], - ])('gets correct channel for topic %s', (topic: WebsocketTopics, channel: Channel) => { - expect(getChannel(topic)).toEqual(channel); + [WebsocketTopics.TO_WEBSOCKETS_CANDLES, [Channel.V4_CANDLES]], + [WebsocketTopics.TO_WEBSOCKETS_MARKETS, [Channel.V4_MARKETS]], + [WebsocketTopics.TO_WEBSOCKETS_ORDERBOOKS, [Channel.V4_ORDERBOOK]], + [ + WebsocketTopics.TO_WEBSOCKETS_SUBACCOUNTS, + [Channel.V4_ACCOUNTS, Channel.V4_PARENT_ACCOUNTS], + ], + [WebsocketTopics.TO_WEBSOCKETS_TRADES, [Channel.V4_TRADES]], + ])('gets correct channel for topic %s', (topic: WebsocketTopics, channels: Channel[]) => { + expect(getChannels(topic)).toEqual(channels); }); it('throws InvalidTopicError for invalid topic', () => { - expect(() => { getChannel(invalidTopic); }).toThrow(new InvalidTopicError(invalidTopic)); + expect(() => { getChannels(invalidTopic); }).toThrow(new InvalidTopicError(invalidTopic)); }); }); @@ -132,6 +137,22 @@ describe('from-kafka-helpers', () => { expect(messageToForward.contents).toEqual(defaultContents); }); + it('gets correct MessageToForward for subaccount message for parent subaccount channel', () => { + const message: KafkaMessage = createKafkaMessage( + Buffer.from(Uint8Array.from(SubaccountMessage.encode(childSubaccountMessage).finish())), + ); + const messageToForward: MessageToForward = getMessageToForward( + Channel.V4_PARENT_ACCOUNTS, + message, + ); + + expect(messageToForward.channel).toEqual(Channel.V4_PARENT_ACCOUNTS); + expect(messageToForward.id).toEqual(`${defaultOwner}/${defaultAccNumber}`); + expect(messageToForward.contents).toEqual(defaultContents); + expect(messageToForward.subaccountNumber).toBeDefined(); + expect(messageToForward.subaccountNumber).toEqual(defaultChildAccNumber); + }); + it('throws InvalidForwardMessageError for empty message', () => { const message: KafkaMessage = createKafkaMessage(null); diff --git a/indexer/services/socks/__tests__/lib/message-forwarder.test.ts b/indexer/services/socks/__tests__/lib/message-forwarder.test.ts index 89b24c91d7..22bab44d5b 100644 --- a/indexer/services/socks/__tests__/lib/message-forwarder.test.ts +++ b/indexer/services/socks/__tests__/lib/message-forwarder.test.ts @@ -32,6 +32,10 @@ import { dbHelpers, testMocks, perpetualMarketRefresher } from '@dydxprotocol-in import { btcClobPairId, btcTicker, + defaultChildAccNumber, + defaultChildAccNumber2, + defaultChildSubaccountId, + defaultChildSubaccountId2, defaultSubaccountId, ethClobPairId, ethTicker, @@ -64,6 +68,16 @@ describe('message-forwarder', () => { version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, }; + const childSubaccountMessage: SubaccountMessage = { + ...baseSubaccountMessage, + subaccountId: defaultChildSubaccountId, + }; + + const childSubaccount2Message: SubaccountMessage = { + ...baseSubaccountMessage, + subaccountId: defaultChildSubaccountId2, + }; + const btcTradesMessages: TradeMessage[] = [ { ...baseTradeMessage, @@ -123,6 +137,36 @@ describe('message-forwarder', () => { }, ]; + const childSubaccountMessages: SubaccountMessage[] = [ + { + ...childSubaccountMessage, + contents: JSON.stringify({ val: '1' }), + }, + { + ...childSubaccountMessage, + contents: JSON.stringify({ val: '2' }), + }, + ]; + + const childSubaccount2Messages: SubaccountMessage[] = [ + { + ...childSubaccount2Message, + contents: JSON.stringify({ val: '3' }), + }, + { + ...childSubaccount2Message, + contents: JSON.stringify({ val: '4' }), + }, + ]; + + // Interleave messages of different child subaccounts + const allChildSubaccountMessages: SubaccountMessage[] = [ + childSubaccountMessages[0], + childSubaccount2Messages[0], + childSubaccountMessages[1], + childSubaccount2Messages[1], + ]; + const mockAxiosResponse: Object = { a: 'b' }; const subaccountInitialMessage: Object = { ...mockAxiosResponse, @@ -160,14 +204,14 @@ describe('message-forwarder', () => { await dbHelpers.teardown(); }); - beforeEach(async () => { + beforeEach(() => { jest.clearAllMocks(); - config.WS_PORT += 1; + // Increment port with a large number to ensure it's not used for any other service. + config.WS_PORT += 1679; WS_HOST = `ws://localhost:${config.WS_PORT}`; wss = new Wss(); - await wss.start(); subscriptions = new Subscriptions(); index = new Index(wss, subscriptions); (axiosRequest as jest.Mock).mockImplementation(() => (JSON.stringify(mockAxiosResponse))); @@ -330,6 +374,94 @@ describe('message-forwarder', () => { }); }); + it('Batch sends subaccount messages to parent subaccount channel', (done: jest.DoneCallback) => { + const channel: Channel = Channel.V4_PARENT_ACCOUNTS; + const id: string = `${defaultSubaccountId.owner}/${defaultSubaccountId.number}`; + + const messageForwarder: MessageForwarder = new MessageForwarder(subscriptions, index); + subscriptions.start(messageForwarder.forwardToClient); + messageForwarder.start(); + + const ws = new WebSocket(WS_HOST); + let connectionId: string; + + ws.on(WebsocketEvents.MESSAGE, async (message) => { + const msg: OutgoingMessage = JSON.parse(message.toString()) as OutgoingMessage; + if (msg.message_id === 0) { + connectionId = msg.connection_id; + } + + if (msg.message_id === 1) { + // Check that the initial message is correct. + checkInitialMessage( + msg as SubscribedMessage, + connectionId, + channel, + id, + subaccountInitialMessage, + ); + + // await each message to ensure they are sent in order + for (const subaccountMessage of allChildSubaccountMessages) { + await producer.send({ + topic: WebsocketTopics.TO_WEBSOCKETS_SUBACCOUNTS, + messages: [{ + value: Buffer.from( + Uint8Array.from( + SubaccountMessage.encode(subaccountMessage).finish(), + ), + ), + partition: 0, + timestamp: `${Date.now()}`, + }], + }); + } + } + + if (msg.message_id === 2) { + const batchMsg: ChannelBatchDataMessage = JSON.parse( + message.toString(), + ) as ChannelBatchDataMessage; + + checkBatchMessage( + batchMsg, + connectionId, + channel, + id, + SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + childSubaccountMessages, + defaultChildAccNumber, + ); + } + + if (msg.message_id === 3) { + const batchMsg: ChannelBatchDataMessage = JSON.parse( + message.toString(), + ) as ChannelBatchDataMessage; + + checkBatchMessage( + batchMsg, + connectionId, + channel, + id, + SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + childSubaccount2Messages, + defaultChildAccNumber2, + ); + done(); + } + }); + + ws.on('open', () => { + ws.send(JSON.stringify({ + type: IncomingMessageType.SUBSCRIBE, + channel, + id, + batched: true, + })); + }); + }); + it('forwards messages', (done: jest.DoneCallback) => { const channel: Channel = Channel.V4_TRADES; const id: string = ethTicker; @@ -426,6 +558,7 @@ function checkBatchMessage( id: string, version: string, expectedMessages: {contents: string}[], + subaccountNumber?: number, ): void { expect(batchMsg.connection_id).toBe(connectionId); expect(batchMsg.type).toBe(OutgoingMessageType.CHANNEL_BATCH_DATA); @@ -433,6 +566,7 @@ function checkBatchMessage( expect(batchMsg.id).toBe(id); expect(batchMsg.contents.length).toBe(expectedMessages.length); expect(batchMsg.version).toBe(version); + expect(batchMsg.subaccountNumber).toBe(subaccountNumber); batchMsg.contents.forEach( (individualMessage: Object, idx: number) => { expect(individualMessage).toEqual(JSON.parse(expectedMessages[idx].contents)); diff --git a/indexer/services/socks/__tests__/lib/subscriptions.test.ts b/indexer/services/socks/__tests__/lib/subscriptions.test.ts index 3d31f00f1d..5a375d7cc6 100644 --- a/indexer/services/socks/__tests__/lib/subscriptions.test.ts +++ b/indexer/services/socks/__tests__/lib/subscriptions.test.ts @@ -4,7 +4,7 @@ import { Subscriptions } from '../../src/lib/subscription'; import { sendMessage, sendMessageString } from '../../src/helpers/wss'; import { RateLimiter } from '../../src/lib/rate-limit'; import { - dbHelpers, testMocks, perpetualMarketRefresher, CandleResolution, + dbHelpers, testMocks, perpetualMarketRefresher, CandleResolution, MAX_PARENT_SUBACCOUNTS, } from '@dydxprotocol-indexer/postgres'; import { btcTicker, invalidChannel, invalidTicker } from '../constants'; import { axiosRequest } from '../../src/lib/axios'; @@ -35,6 +35,7 @@ describe('Subscriptions', () => { [Channel.V4_MARKETS]: defaultId, [Channel.V4_ORDERBOOK]: btcTicker, [Channel.V4_TRADES]: btcTicker, + [Channel.V4_PARENT_ACCOUNTS]: mockSubaccountId, }; const invalidIdsMap: Record, string[]> = { [Channel.V4_ACCOUNTS]: [invalidTicker], @@ -45,6 +46,7 @@ describe('Subscriptions', () => { ], [Channel.V4_ORDERBOOK]: [invalidTicker], [Channel.V4_TRADES]: [invalidTicker], + [Channel.V4_PARENT_ACCOUNTS]: [`address/${MAX_PARENT_SUBACCOUNTS}`], }; const initialResponseUrlPatterns: Record = { [Channel.V4_ACCOUNTS]: [ @@ -55,6 +57,10 @@ describe('Subscriptions', () => { [Channel.V4_MARKETS]: ['/v4/perpetualMarkets'], [Channel.V4_ORDERBOOK]: ['/v4/orderbooks/perpetualMarket/.+'], [Channel.V4_TRADES]: ['/v4/trades/perpetualMarket/.+'], + [Channel.V4_PARENT_ACCOUNTS]: [ + '/v4/addresses/.+/parentSubaccountNumber/.+', + '/v4/orders/parentSubaccountNumber?.+OPEN,UNTRIGGERED,BEST_EFFORT_OPENED', + ], }; const initialMessage: Object = { a: 'b' }; const country: string = 'AR'; @@ -90,6 +96,7 @@ describe('Subscriptions', () => { [Channel.V4_MARKETS, validIds[Channel.V4_MARKETS]], [Channel.V4_ORDERBOOK, validIds[Channel.V4_ORDERBOOK]], [Channel.V4_TRADES, validIds[Channel.V4_TRADES]], + [Channel.V4_PARENT_ACCOUNTS, validIds[Channel.V4_PARENT_ACCOUNTS]], ])('handles valid subscription request to channel %s', async ( channel: Channel, id: string, @@ -136,6 +143,7 @@ describe('Subscriptions', () => { [Channel.V4_CANDLES, invalidIdsMap[Channel.V4_CANDLES]], [Channel.V4_ORDERBOOK, invalidIdsMap[Channel.V4_ORDERBOOK]], [Channel.V4_TRADES, invalidIdsMap[Channel.V4_TRADES]], + [Channel.V4_PARENT_ACCOUNTS, invalidIdsMap[Channel.V4_PARENT_ACCOUNTS]], ])('sends error message if invalid subscription request to channel %s', async ( channel: Channel, invalidIds: string[], diff --git a/indexer/services/socks/package.json b/indexer/services/socks/package.json index 1e86abea75..9b068a0742 100644 --- a/indexer/services/socks/package.json +++ b/indexer/services/socks/package.json @@ -11,7 +11,7 @@ "coverage": "pnpm test -- --coverage", "lint": "eslint --ext .ts,.js .", "lint:fix": "eslint --ext .ts,.js . --fix", - "test": "NODE_ENV=test jest --runInBand --forceExit" + "test": "NODE_ENV=test jest --maxWorkers 1 --forceExit" }, "author": "", "license": "AGPL-3.0", diff --git a/indexer/services/socks/src/helpers/from-kafka-helpers.ts b/indexer/services/socks/src/helpers/from-kafka-helpers.ts index ac432050db..6577712435 100644 --- a/indexer/services/socks/src/helpers/from-kafka-helpers.ts +++ b/indexer/services/socks/src/helpers/from-kafka-helpers.ts @@ -2,6 +2,7 @@ import { logger } from '@dydxprotocol-indexer/base'; import { perpetualMarketRefresher, PROTO_TO_CANDLE_RESOLUTION, + parentSubaccountHelpers, } from '@dydxprotocol-indexer/postgres'; import { CandleMessage, @@ -16,7 +17,7 @@ import { TOPIC_TO_CHANNEL, V4_MARKETS_ID } from '../lib/constants'; import { InvalidForwardMessageError, InvalidTopicError } from '../lib/errors'; import { Channel, MessageToForward, WebsocketTopics } from '../types'; -export function getChannel(topic: string): Channel | undefined { +export function getChannels(topic: string): Channel[] { if (!Object.values(WebsocketTopics) .some((topicName: string) => { return topicName === topic; })) { throw new InvalidTopicError(topic); @@ -84,6 +85,16 @@ export function getMessageToForward( version: tradeMessage.version, }; } + case Channel.V4_PARENT_ACCOUNTS: { + const subaccountMessage: SubaccountMessage = SubaccountMessage.decode(messageBinary); + return { + channel, + id: getParentSubaccountMessageId(subaccountMessage), + subaccountNumber: subaccountMessage.subaccountId!.number, + contents: JSON.parse(subaccountMessage.contents), + version: subaccountMessage.version, + }; + } default: throw new InvalidForwardMessageError(`Unknown channel: ${channel}`); } @@ -102,6 +113,13 @@ function getSubaccountMessageId(subaccountMessage: SubaccountMessage): string { return `${subaccountMessage.subaccountId!.owner}/${subaccountMessage.subaccountId!.number}`; } +function getParentSubaccountMessageId(subaccountMessage: SubaccountMessage): string { + const parentSubaccountNumber: number = parentSubaccountHelpers.getParentSubaccountNum( + subaccountMessage.subaccountId!.number, + ); + return `${subaccountMessage.subaccountId!.owner}/${parentSubaccountNumber}`; +} + function getCandleMessageId(candleMessage: CandleMessage): string { const ticker: string = getTickerOrThrow(candleMessage.clobPairId); if (candleMessage.resolution === CandleMessage_Resolution.UNRECOGNIZED) { diff --git a/indexer/services/socks/src/helpers/message.ts b/indexer/services/socks/src/helpers/message.ts index 53b7f1c59f..2bec8327d8 100644 --- a/indexer/services/socks/src/helpers/message.ts +++ b/indexer/services/socks/src/helpers/message.ts @@ -38,6 +38,7 @@ export function createChannelDataMessage( version: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any contents: any, + subaccountNumber?: number, ): ChannelDataMessage { if (channel === Channel.V4_MARKETS) { return { @@ -58,6 +59,7 @@ export function createChannelDataMessage( channel, version, contents, + subaccountNumber, }; } @@ -69,6 +71,7 @@ export function createChannelBatchDataMessage( version: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any contents: any[], + subaccountNumber?: number, ): ChannelBatchDataMessage { if (channel === Channel.V4_MARKETS) { return { @@ -89,6 +92,7 @@ export function createChannelBatchDataMessage( channel, version, contents, + subaccountNumber, }; } diff --git a/indexer/services/socks/src/lib/constants.ts b/indexer/services/socks/src/lib/constants.ts index fa7885e952..acf32e73f8 100644 --- a/indexer/services/socks/src/lib/constants.ts +++ b/indexer/services/socks/src/lib/constants.ts @@ -20,12 +20,12 @@ export const WEBSOCKET_NOT_OPEN: string = 'ws not open'; export const V4_MARKETS_ID: string = 'v4_markets'; -export const TOPIC_TO_CHANNEL: Record = { - [WebsocketTopics.TO_WEBSOCKETS_CANDLES]: Channel.V4_CANDLES, - [WebsocketTopics.TO_WEBSOCKETS_MARKETS]: Channel.V4_MARKETS, - [WebsocketTopics.TO_WEBSOCKETS_ORDERBOOKS]: Channel.V4_ORDERBOOK, - [WebsocketTopics.TO_WEBSOCKETS_SUBACCOUNTS]: Channel.V4_ACCOUNTS, - [WebsocketTopics.TO_WEBSOCKETS_TRADES]: Channel.V4_TRADES, +export const TOPIC_TO_CHANNEL: Record = { + [WebsocketTopics.TO_WEBSOCKETS_CANDLES]: [Channel.V4_CANDLES], + [WebsocketTopics.TO_WEBSOCKETS_MARKETS]: [Channel.V4_MARKETS], + [WebsocketTopics.TO_WEBSOCKETS_ORDERBOOKS]: [Channel.V4_ORDERBOOK], + [WebsocketTopics.TO_WEBSOCKETS_SUBACCOUNTS]: [Channel.V4_ACCOUNTS, Channel.V4_PARENT_ACCOUNTS], + [WebsocketTopics.TO_WEBSOCKETS_TRADES]: [Channel.V4_TRADES], }; export const MAX_TIMEOUT_INTEGER: number = 2147483647; diff --git a/indexer/services/socks/src/lib/message-forwarder.ts b/indexer/services/socks/src/lib/message-forwarder.ts index abe08d8c57..f088ecc1e8 100644 --- a/indexer/services/socks/src/lib/message-forwarder.ts +++ b/indexer/services/socks/src/lib/message-forwarder.ts @@ -11,7 +11,7 @@ import _ from 'lodash'; import config from '../config'; import { - getChannel, + getChannels, getMessageToForward, } from '../helpers/from-kafka-helpers'; import { @@ -35,6 +35,7 @@ const BUFFER_KEY_SEPARATOR: string = ':'; type VersionedContents = { contents: string; version: string; + subaccountNumber?: number; }; export class MessageForwarder { @@ -102,8 +103,8 @@ export class MessageForwarder { offset: message.offset, }; - const channel: Channel | undefined = getChannel(topic); - if (channel === undefined) { + const channels: Channel[] = getChannels(topic); + if (channels.length === 0) { logger.error({ ...errProps, at: loggerAt, @@ -111,46 +112,48 @@ export class MessageForwarder { }); return; } - errProps.channel = channel; + errProps.channels = channels; - let messageToForward: MessageToForward; - try { - messageToForward = getMessageToForward(channel, message); - } catch (error) { - logger.error({ - ...errProps, - at: loggerAt, - message: 'Failed to get message to forward from kafka message', - kafkaMessage: safeJsonStringify(message), - error, - }); - return; - } - - const startForwardMessage: number = Date.now(); - this.forwardMessage(messageToForward); - const end: number = Date.now(); - stats.timing( - `${config.SERVICE_NAME}.forward_message`, - end - startForwardMessage, - config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, - { - topic, - channel: String(channel), - }, - ); + for (const channel of channels) { + let messageToForward: MessageToForward; + try { + messageToForward = getMessageToForward(channel, message); + } catch (error) { + logger.error({ + ...errProps, + at: loggerAt, + message: 'Failed to get message to forward from kafka message', + kafkaMessage: safeJsonStringify(message), + error, + }); + return; + } - const originalMessageTimestamp = message.headers?.message_received_timestamp; - if (originalMessageTimestamp !== undefined) { + const startForwardMessage: number = Date.now(); + this.forwardMessage(messageToForward); + const end: number = Date.now(); stats.timing( - `${config.SERVICE_NAME}.message_time_since_received`, - startForwardMessage - Number(originalMessageTimestamp), - STATS_NO_SAMPLING, + `${config.SERVICE_NAME}.forward_message`, + end - startForwardMessage, + config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE, { topic, - event_type: String(message.headers?.event_type), + channel: String(channel), }, ); + + const originalMessageTimestamp = message.headers?.message_received_timestamp; + if (originalMessageTimestamp !== undefined) { + stats.timing( + `${config.SERVICE_NAME}.message_time_since_received`, + startForwardMessage - Number(originalMessageTimestamp), + STATS_NO_SAMPLING, + { + topic, + event_type: String(message.headers?.event_type), + }, + ); + } } } @@ -209,6 +212,7 @@ export class MessageForwarder { this.messageBuffer[bufferKey].push({ contents: message.contents, version: message.version, + subaccountNumber: message.subaccountNumber, } as VersionedContents); forwardedToSubscribers = true; } @@ -261,28 +265,12 @@ export class MessageForwarder { .batchedSubscriptions[channelString][id]; batchedSubscribers.forEach( (batchedSubscriber: SubscriptionInfo) => { - const batchedVersionedMessages: _.Dictionary = _.groupBy( + this.forwardBatchedVersionedMessagesBySubaccountNumber( batchedMessages, - (c) => c.version, + batchedSubscriber, + channel, + id, ); - _.forEach(batchedVersionedMessages, (msgs, version) => { - try { - this.forwardToClientBatch( - msgs, - batchedSubscriber.connectionId, - channel, - id, - version, - ); - } catch (error) { - logger.error({ - at: 'message-forwarder#forwardBatchedMessages', - message: error.message, - connectionId: batchedSubscriber.connectionId, - error, - }); - } - }); }, ); } @@ -291,12 +279,53 @@ export class MessageForwarder { this.messageBuffer = {}; } + private forwardBatchedVersionedMessagesBySubaccountNumber( + batchedMessages: VersionedContents[], + batchedSubscriber: SubscriptionInfo, + channel: Channel, + id: string, + ): void { + const batchedVersionedMessages: _.Dictionary = _.groupBy( + batchedMessages, + (c) => c.version, + ); + _.forEach(batchedVersionedMessages, (versionedMsgs, version) => { + const batchedMessagesBySubaccountNumber: _.Dictionary = _.groupBy( + versionedMsgs, + (c) => c.subaccountNumber, + ); + _.forEach(batchedMessagesBySubaccountNumber, (msgs, subaccountNumberKey) => { + const subaccountNumber: number | undefined = Number.isNaN(Number(subaccountNumberKey)) + ? undefined + : Number(subaccountNumberKey); + try { + this.forwardToClientBatch( + msgs, + batchedSubscriber.connectionId, + channel, + id, + version, + subaccountNumber, + ); + } catch (error) { + logger.error({ + at: 'message-forwarder#forwardBatchedMessages', + message: error.message, + connectionId: batchedSubscriber.connectionId, + error, + }); + } + }); + }); + } + public forwardToClientBatch( batchedMessages: VersionedContents[], connectionId: string, channel: Channel, id: string, version: string, + subaccountNumber?: number, ): void { const connection: Connection = this.index.connections[connectionId]; if (!connection) { @@ -322,6 +351,7 @@ export class MessageForwarder { id, version, batchedMessages.map((c) => c.contents), + subaccountNumber, ), ); } @@ -371,6 +401,7 @@ export class MessageForwarder { message.id, message.version, message.contents, + message.subaccountNumber, ), ); return 1; diff --git a/indexer/services/socks/src/lib/subscription.ts b/indexer/services/socks/src/lib/subscription.ts index 7940dd3f33..a89c4261d5 100644 --- a/indexer/services/socks/src/lib/subscription.ts +++ b/indexer/services/socks/src/lib/subscription.ts @@ -3,7 +3,9 @@ import { logger, stats, } from '@dydxprotocol-indexer/base'; -import { CandleResolution, perpetualMarketRefresher } from '@dydxprotocol-indexer/postgres'; +import { + CHILD_SUBACCOUNT_MULTIPLIER, CandleResolution, MAX_PARENT_SUBACCOUNTS, perpetualMarketRefresher, +} from '@dydxprotocol-indexer/postgres'; import WebSocket from 'ws'; import config from '../config'; @@ -378,17 +380,10 @@ export class Subscriptions { } switch (channel) { case (Channel.V4_ACCOUNTS): { - if (id === undefined) { - return false; - } - const parts: string[] = id.split('/'); - - // Id for subaccounts channel should be of the format {address}/{subaccountNumber} - if (parts.length !== 2) { - return false; - } - - return true; + return this.validateSubaccountChannelId( + id, + MAX_PARENT_SUBACCOUNTS * CHILD_SUBACCOUNT_MULTIPLIER, + ); } case (Channel.V4_MARKETS): { return true; @@ -417,6 +412,9 @@ export class Subscriptions { return resolution !== undefined; } + case (Channel.V4_PARENT_ACCOUNTS): { + return this.validateSubaccountChannelId(id, MAX_PARENT_SUBACCOUNTS); + } default: { throw new InvalidChannelError(channel); } @@ -434,6 +432,27 @@ export class Subscriptions { return id ?? V4_MARKETS_ID; } + private validateSubaccountChannelId(id?: string, maxSubaccountNumber?: number): boolean { + if (id === undefined) { + return false; + } + // Id for subaccounts channel should be of the format {address}/{subaccountNumber} + const parts: string[] = id.split('/'); + if (parts.length !== 2) { + return false; + } + + if (Number.isNaN(Number(parts[1]))) { + return false; + } + + if (maxSubaccountNumber !== undefined && Number(Number(parts[1]) >= maxSubaccountNumber)) { + return false; + } + + return true; + } + /** * Gets the initial response endpoint for a subscription based on the channel and id. * @param channel Channel to get the initial response endpoint for. @@ -547,6 +566,71 @@ export class Subscriptions { } } + private async getInitialResponseForParentSubaccountSubscription( + id?: string, + country?: string, + ): Promise { + if (id === undefined) { + throw new Error('Invalid undefined id'); + } + + try { + const { + address, + subaccountNumber, + } : { + address: string, + subaccountNumber: string, + } = this.parseSubaccountChannelId(id); + + const [ + subaccountsResponse, + ordersResponse, + ]: [ + string, + string, + ] = await Promise.all([ + axiosRequest({ + method: RequestMethod.GET, + url: `${COMLINK_URL}/v4/addresses/${address}/parentSubaccountNumber/${subaccountNumber}`, + timeout: config.INITIAL_GET_TIMEOUT_MS, + headers: { + 'cf-ipcountry': country, + }, + transformResponse: (res) => res, + }), + // TODO(DEC-1462): Use the /active-orders endpoint once it's added. + axiosRequest({ + method: RequestMethod.GET, + url: `${COMLINK_URL}/v4/orders/parentSubaccountNumber?address=${address}&subaccountNumber=${subaccountNumber}&status=OPEN,UNTRIGGERED,BEST_EFFORT_OPENED`, + timeout: config.INITIAL_GET_TIMEOUT_MS, + headers: { + 'cf-ipcountry': country, + }, + transformResponse: (res) => res, + }), + ]); + + return JSON.stringify({ + ...JSON.parse(subaccountsResponse), + orders: JSON.parse(ordersResponse), + }); + } catch (error) { + // The subaccounts API endpoint returns a 404 for subaccounts that are not indexed, however + // such subaccounts can be subscribed to and events can be sent when the subaccounts are + // indexed to an existing subscription. + if (error instanceof AxiosSafeServerError && (error as AxiosSafeServerError).status === 404) { + return EMPTY_INITIAL_RESPONSE; + } + // 403 indicates a blocked address. Throw a specific error for blocked addresses with a + // specific error message detailing why the subscription failed due to a blocked address. + if (error instanceof AxiosSafeServerError && (error as AxiosSafeServerError).status === 403) { + throw new BlockedError(); + } + throw error; + } + } + private parseSubaccountChannelId(id: string): { address: string, subaccountNumber: string, @@ -585,6 +669,9 @@ export class Subscriptions { if (channel === Channel.V4_ACCOUNTS) { return this.getInitialResponseForSubaccountSubscription(id, country); } + if (channel === Channel.V4_PARENT_ACCOUNTS) { + return this.getInitialResponseForParentSubaccountSubscription(id, country); + } const endpoint: string | undefined = this.getInitialEndpointForSubscription(channel, id); // If no endpoint exists, return an empty initial response. if (endpoint === undefined) { diff --git a/indexer/services/socks/src/types.ts b/indexer/services/socks/src/types.ts index c4074bbda9..0f66f7784e 100644 --- a/indexer/services/socks/src/types.ts +++ b/indexer/services/socks/src/types.ts @@ -21,6 +21,7 @@ export enum Channel { V4_TRADES = 'v4_trades', V4_MARKETS = 'v4_markets', V4_CANDLES = 'v4_candles', + V4_PARENT_ACCOUNTS = 'v4_parent_subaccounts', } export const ALL_CHANNELS = Object.values(Channel); @@ -85,6 +86,7 @@ export interface ChannelDataMessage extends OutgoingMessage { channel: Channel; id?: string; version: string; + subaccountNumber?: number; } export interface ChannelBatchDataMessage extends OutgoingMessage { @@ -93,6 +95,7 @@ export interface ChannelBatchDataMessage extends OutgoingMessage { channel: Channel; id?: string; version: string; + subaccountNumber?: number; } export interface ConnectedMessage extends OutgoingMessage {} @@ -128,6 +131,7 @@ export interface MessageToForward { // eslint-disable-next-line @typescript-eslint/no-explicit-any contents: any; version: string; + subaccountNumber?: number; } export interface ResponseWithBody extends express.Response {