From ea3fce96703decba331604f01f4b3e2cce184e3b Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 6 Feb 2024 10:36:32 -0500 Subject: [PATCH 1/5] remove geoblocking from socks endpoints --- .../socks/__tests__/websocket/index.test.ts | 47 ------------------- .../socks/src/helpers/header-utils.ts | 8 ++++ indexer/services/socks/src/websocket/index.ts | 10 +--- 3 files changed, 10 insertions(+), 55 deletions(-) create mode 100644 indexer/services/socks/src/helpers/header-utils.ts diff --git a/indexer/services/socks/__tests__/websocket/index.test.ts b/indexer/services/socks/__tests__/websocket/index.test.ts index 06dde78ca0..2520144f56 100644 --- a/indexer/services/socks/__tests__/websocket/index.test.ts +++ b/indexer/services/socks/__tests__/websocket/index.test.ts @@ -14,15 +14,12 @@ import { } from '../../src/types'; import { InvalidMessageHandler } from '../../src/lib/invalid-message'; import { PingHandler } from '../../src/lib/ping'; -import config from '../../src/config'; -import { isRestrictedCountryHeaders, COUNTRY_HEADER_KEY } from '@dydxprotocol-indexer/compliance'; jest.mock('uuid'); jest.mock('../../src/helpers/wss'); jest.mock('../../src/lib/subscription'); jest.mock('../../src/lib/invalid-message'); jest.mock('../../src/lib/ping'); -jest.mock('@dydxprotocol-indexer/compliance'); describe('Index', () => { let index: Index; @@ -32,12 +29,10 @@ describe('Index', () => { let mockConnect: (ws: WebSocket, req: IncomingMessage) => void; let wsOnSpy: jest.SpyInstance; let wsPingSpy: jest.SpyInstance; - let wsTerminateSpy: jest.SpyInstance; let invalidMsgHandlerSpy: jest.SpyInstance; let pingHandlerSpy: jest.SpyInstance; const connectionId: string = 'conId'; - const defaultGeoblockingEnabled: boolean = config.INDEXER_LEVEL_GEOBLOCKING_ENABLED; const countryCode: string = 'AR'; beforeAll(() => { @@ -58,7 +53,6 @@ describe('Index', () => { websocket = new WebSocket(null); wsOnSpy = jest.spyOn(websocket, 'on'); wsPingSpy = jest.spyOn(websocket, 'ping').mockImplementation(jest.fn()); - wsTerminateSpy = jest.spyOn(websocket, 'terminate').mockImplementation(jest.fn()); mockWss.onConnection = jest.fn().mockImplementation( (cb: (ws: WebSocket, req: IncomingMessage) => void) => { mockConnect = cb; @@ -97,46 +91,6 @@ describe('Index', () => { }), ); }); - - describe('geoblocking', () => { - const isRestrictedCountrySpy: jest.Mock = isRestrictedCountryHeaders as unknown as jest.Mock; - - beforeAll(() => { - config.INDEXER_LEVEL_GEOBLOCKING_ENABLED = true; - }); - - afterAll(() => { - config.INDEXER_LEVEL_GEOBLOCKING_ENABLED = defaultGeoblockingEnabled; - }); - - it('rejects connection if from restricted country', () => { - jest.spyOn(websocket, 'terminate').mockImplementation(jest.fn()); - // restricted country headers - isRestrictedCountrySpy.mockReturnValue(true); - - const message: IncomingMessage = new IncomingMessage(new Socket()); - mockConnect(websocket, message); - expect(websocket.terminate).toHaveBeenCalled(); - expect(Object.keys(index.connections)).toHaveLength(0); - expect(wsOnSpy).not.toHaveBeenCalled(); - expect(wsTerminateSpy).toHaveBeenCalled(); - expect(sendMessage).not.toHaveBeenCalled(); - }); - - it('does not reject connection if from restricted country', () => { - (v4 as unknown as jest.Mock).mockReturnValueOnce(connectionId); - // non-restricted country headers - isRestrictedCountrySpy.mockReturnValue(false); - - const message: IncomingMessage = new IncomingMessage(new Socket()); - mockConnect(websocket, message); - - // Test that the connection is tracked. - expect(index.connections[connectionId]).not.toBeUndefined(); - expect(index.connections[connectionId].ws).toEqual(websocket); - expect(index.connections[connectionId].messageId).toEqual(0); - }); - }); }); describe('handlers', () => { @@ -144,7 +98,6 @@ describe('Index', () => { // Connect to the index before starting each test. (v4 as unknown as jest.Mock).mockReturnValueOnce(connectionId); const incomingMessage: IncomingMessage = new IncomingMessage(new Socket()); - incomingMessage.headers[COUNTRY_HEADER_KEY] = countryCode; mockConnect(websocket, incomingMessage); }); diff --git a/indexer/services/socks/src/helpers/header-utils.ts b/indexer/services/socks/src/helpers/header-utils.ts new file mode 100644 index 0000000000..0ce77bd07f --- /dev/null +++ b/indexer/services/socks/src/helpers/header-utils.ts @@ -0,0 +1,8 @@ +import { CountryHeaders } from '@dydxprotocol-indexer/compliance'; + +import { IncomingMessage } from '../types'; + +export function getCountry(req: IncomingMessage): string | undefined { + const countryHeaders: CountryHeaders = req.headers as CountryHeaders; + return countryHeaders['cf-ipcountry']; +} diff --git a/indexer/services/socks/src/websocket/index.ts b/indexer/services/socks/src/websocket/index.ts index 944e4243b9..003532c2ee 100644 --- a/indexer/services/socks/src/websocket/index.ts +++ b/indexer/services/socks/src/websocket/index.ts @@ -5,6 +5,7 @@ import { v4 as uuidv4 } from 'uuid'; import WebSocket from 'ws'; import config from '../config'; +import { getCountry } from '../helpers/header-utils'; import { createErrorMessage, createConnectedMessage, @@ -26,7 +27,6 @@ import { ALL_CHANNELS, WebsocketEvents, } from '../types'; -import { CountryRestrictor } from './restrict-countries'; const HEARTBEAT_INTERVAL_MS: number = config.WS_HEARTBEAT_INTERVAL_MS; const HEARTBEAT_TIMEOUT_MS: number = config.WS_HEARTBEAT_TIMEOUT_MS; @@ -42,7 +42,6 @@ export class Index { // Handlers for pings and invalid messages. private pingHandler: PingHandler; private invalidMessageHandler: InvalidMessageHandler; - private countryRestrictor: CountryRestrictor; constructor(wss: Wss, subscriptions: Subscriptions) { this.wss = wss; @@ -50,7 +49,6 @@ export class Index { this.subscriptions = subscriptions; this.pingHandler = new PingHandler(); this.invalidMessageHandler = new InvalidMessageHandler(); - this.countryRestrictor = new CountryRestrictor(); // Attach the new connection handler to the websocket server. this.wss.onConnection((ws: WebSocket, req: IncomingMessage) => this.onConnection(ws, req)); @@ -99,17 +97,13 @@ export class Index { * @param req HTTP request accompanying new connection request. */ private onConnection(ws: WebSocket, req: IncomingMessage): void { - // Terminate the connection if the connection requestion originated from a restricted country - if (this.countryRestrictor.isRestrictedCountry(req)) { - return ws.terminate(); - } const connectionId: string = uuidv4(); this.connections[connectionId] = { ws, messageId: 0, - countryCode: this.countryRestrictor.getCountry(req), + countryCode: getCountry(req), }; const numConcurrentConnections: number = Object.keys(this.connections).length; From 71f98d6a7ca06224b090e1213397fd951e37c715 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 6 Feb 2024 10:46:25 -0500 Subject: [PATCH 2/5] fix unit tests --- indexer/services/socks/__tests__/websocket/index.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/indexer/services/socks/__tests__/websocket/index.test.ts b/indexer/services/socks/__tests__/websocket/index.test.ts index 2520144f56..092f8dd664 100644 --- a/indexer/services/socks/__tests__/websocket/index.test.ts +++ b/indexer/services/socks/__tests__/websocket/index.test.ts @@ -14,6 +14,7 @@ import { } from '../../src/types'; import { InvalidMessageHandler } from '../../src/lib/invalid-message'; import { PingHandler } from '../../src/lib/ping'; +import { COUNTRY_HEADER_KEY } from '@dydxprotocol-indexer/compliance'; jest.mock('uuid'); jest.mock('../../src/helpers/wss'); @@ -98,6 +99,7 @@ describe('Index', () => { // Connect to the index before starting each test. (v4 as unknown as jest.Mock).mockReturnValueOnce(connectionId); const incomingMessage: IncomingMessage = new IncomingMessage(new Socket()); + incomingMessage.headers[COUNTRY_HEADER_KEY] = countryCode; mockConnect(websocket, incomingMessage); }); From 9d850b7e72ed4f5d17d338df3cb2f224c48dbc66 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Wed, 21 Feb 2024 11:43:59 -0500 Subject: [PATCH 3/5] wip --- .../socks/__tests__/lib/subscriptions.test.ts | 10 --------- .../services/socks/src/lib/subscription.ts | 16 ++------------ .../socks/src/websocket/restrict-countries.ts | 21 ------------------- 3 files changed, 2 insertions(+), 45 deletions(-) delete mode 100644 indexer/services/socks/src/websocket/restrict-countries.ts diff --git a/indexer/services/socks/__tests__/lib/subscriptions.test.ts b/indexer/services/socks/__tests__/lib/subscriptions.test.ts index 04fdf84587..2899a62ef6 100644 --- a/indexer/services/socks/__tests__/lib/subscriptions.test.ts +++ b/indexer/services/socks/__tests__/lib/subscriptions.test.ts @@ -10,7 +10,6 @@ import { btcTicker, invalidChannel, invalidTicker } from '../constants'; import { axiosRequest } from '../../src/lib/axios'; import { AxiosSafeServerError, makeAxiosSafeServerError } from '@dydxprotocol-indexer/base'; import { BlockedError } from '../../src/lib/errors'; -import { isRestrictedCountry } from '@dydxprotocol-indexer/compliance'; jest.mock('ws'); jest.mock('../../src/helpers/wss'); @@ -83,9 +82,6 @@ describe('Subscriptions', () => { axiosRequestMock = (axiosRequest as jest.Mock); axiosRequestMock.mockClear(); axiosRequestMock.mockImplementation(() => (JSON.stringify(initialMessage))); - (isRestrictedCountry as jest.Mock).mockImplementation((country: string): boolean => { - return country === restrictedCountry; - }); }); describe('subscribe', () => { @@ -106,7 +102,6 @@ describe('Subscriptions', () => { initialMsgId, id, false, - nonRestrictedCountry, ); expect(sendMessageStringMock).toHaveBeenCalledTimes(1); @@ -150,7 +145,6 @@ describe('Subscriptions', () => { initialMsgId, id, false, - nonRestrictedCountry, ); expect(sendMessageMock).toHaveBeenCalledTimes(1); @@ -179,7 +173,6 @@ describe('Subscriptions', () => { initialMsgId, defaultId, false, - nonRestrictedCountry, ); }, ).rejects.toEqual(new Error(`Invalid channel: ${invalidChannel}`)); @@ -194,7 +187,6 @@ describe('Subscriptions', () => { initialMsgId, mockSubaccountId, false, - nonRestrictedCountry, ); expect(sendMessageMock).toHaveBeenCalledTimes(1); @@ -217,7 +209,6 @@ describe('Subscriptions', () => { initialMsgId, mockSubaccountId, false, - nonRestrictedCountry, ); expect(sendMessageMock).toHaveBeenCalledTimes(1); @@ -386,7 +377,6 @@ describe('Subscriptions', () => { initialMsgId, validIds[channel], false, - nonRestrictedCountry, ); })); diff --git a/indexer/services/socks/src/lib/subscription.ts b/indexer/services/socks/src/lib/subscription.ts index 02aaff590a..0c7b1837fb 100644 --- a/indexer/services/socks/src/lib/subscription.ts +++ b/indexer/services/socks/src/lib/subscription.ts @@ -3,7 +3,6 @@ import { logger, stats, } from '@dydxprotocol-indexer/base'; -import { isRestrictedCountry } from '@dydxprotocol-indexer/compliance'; import { CandleResolution, perpetualMarketRefresher } from '@dydxprotocol-indexer/postgres'; import WebSocket from 'ws'; @@ -78,7 +77,6 @@ export class Subscriptions { messageId: number, id?: string, batched?: boolean, - country?: string, ): Promise { if (this.forwardMessage === undefined) { throw new Error('Unexpected error, subscription object is uninitialized.'); @@ -131,7 +129,7 @@ export class Subscriptions { let initialResponse: string; const startGetInitialResponse: number = Date.now(); try { - initialResponse = await this.getInitialResponsesForChannels(channel, id, country); + initialResponse = await this.getInitialResponsesForChannels(channel, id); } catch (error) { logger.info({ at: 'Subscription#subscribe', @@ -485,19 +483,10 @@ export class Subscriptions { private async getInitialResponseForSubaccountSubscription( id?: string, - country?: string, ): Promise { if (id === undefined) { throw new Error('Invalid undefined id'); } - - // TODO(IND-508): Change this to match technical spec for persistent geo-blocking. This may - // either have to replicate any blocking logic added on comlink, or re-direct to comlink to - // determine if subscribing to a specific subaccount is blocked. - if (country !== undefined && isRestrictedCountry(country)) { - throw new BlockedError(); - } - try { const { address, @@ -582,10 +571,9 @@ export class Subscriptions { private async getInitialResponsesForChannels( channel: Channel, id?: string, - country?: string, ): Promise { if (channel === Channel.V4_ACCOUNTS) { - return this.getInitialResponseForSubaccountSubscription(id, country); + return this.getInitialResponseForSubaccountSubscription(id); } const endpoint: string | undefined = this.getInitialEndpointForSubscription(channel, id); // If no endpoint exists, return an empty initial response. diff --git a/indexer/services/socks/src/websocket/restrict-countries.ts b/indexer/services/socks/src/websocket/restrict-countries.ts deleted file mode 100644 index f079c90660..0000000000 --- a/indexer/services/socks/src/websocket/restrict-countries.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { - CountryHeaders, - isRestrictedCountryHeaders, -} from '@dydxprotocol-indexer/compliance'; - -import { IncomingMessage } from '../types'; - -export class CountryRestrictor { - public isRestrictedCountry(req: IncomingMessage): boolean { - if (isRestrictedCountryHeaders(req.headers as CountryHeaders)) { - return true; - } - - return false; - } - - public getCountry(req: IncomingMessage): string | undefined { - const countryHeaders: CountryHeaders = req.headers as CountryHeaders; - return countryHeaders['cf-ipcountry']; - } -} From 8b1f4c08c1a34b4d4bf31fee8bdcdcc911e6a347 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Wed, 21 Feb 2024 11:54:50 -0500 Subject: [PATCH 4/5] pass country header to comlink --- .../socks/__tests__/lib/subscriptions.test.ts | 29 +++---------------- .../services/socks/src/lib/subscription.ts | 18 ++++++++++-- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/indexer/services/socks/__tests__/lib/subscriptions.test.ts b/indexer/services/socks/__tests__/lib/subscriptions.test.ts index 2899a62ef6..57af5b2706 100644 --- a/indexer/services/socks/__tests__/lib/subscriptions.test.ts +++ b/indexer/services/socks/__tests__/lib/subscriptions.test.ts @@ -102,6 +102,7 @@ describe('Subscriptions', () => { initialMsgId, id, false, + nonRestrictedCountry, ); expect(sendMessageStringMock).toHaveBeenCalledTimes(1); @@ -121,6 +122,9 @@ describe('Subscriptions', () => { for (const urlPattern of urlPatterns) { expect(axiosRequestMock).toHaveBeenCalledWith(expect.objectContaining({ url: expect.stringMatching(RegExp(urlPattern)), + headers: { + 'cf-ipcountry': nonRestrictedCountry, + }, })); } } else { @@ -260,31 +264,6 @@ describe('Subscriptions', () => { expect(subscriptions.subscriptionLists[connectionId]).toBeUndefined(); }); - it('sends blocked error if subscribing to subaccount from restricted country', async () => { - const expectedError: BlockedError = new BlockedError(); - await subscriptions.subscribe( - mockWs, - Channel.V4_ACCOUNTS, - connectionId, - initialMsgId, - mockSubaccountId, - false, - restrictedCountry, - ); - - expect(sendMessageMock).toHaveBeenCalledTimes(1); - expect(sendMessageMock).toHaveBeenCalledWith( - mockWs, - connectionId, - expect.objectContaining({ - connection_id: connectionId, - type: 'error', - message: expectedError.message, - })); - expect(subscriptions.subscriptions[Channel.V4_ACCOUNTS]).toBeUndefined(); - expect(subscriptions.subscriptionLists[connectionId]).toBeUndefined(); - }); - it('sends empty contents if initial message request fails with 404 for accounts', async () => { axiosRequestMock.mockImplementation(() => { return Promise.reject(makeAxiosSafeServerError(404, '', '')); diff --git a/indexer/services/socks/src/lib/subscription.ts b/indexer/services/socks/src/lib/subscription.ts index 0c7b1837fb..ac46466caa 100644 --- a/indexer/services/socks/src/lib/subscription.ts +++ b/indexer/services/socks/src/lib/subscription.ts @@ -3,6 +3,7 @@ import { logger, stats, } from '@dydxprotocol-indexer/base'; +import { isRestrictedCountry } from '@dydxprotocol-indexer/compliance'; import { CandleResolution, perpetualMarketRefresher } from '@dydxprotocol-indexer/postgres'; import WebSocket from 'ws'; @@ -77,6 +78,7 @@ export class Subscriptions { messageId: number, id?: string, batched?: boolean, + country?: string, ): Promise { if (this.forwardMessage === undefined) { throw new Error('Unexpected error, subscription object is uninitialized.'); @@ -129,7 +131,7 @@ export class Subscriptions { let initialResponse: string; const startGetInitialResponse: number = Date.now(); try { - initialResponse = await this.getInitialResponsesForChannels(channel, id); + initialResponse = await this.getInitialResponsesForChannels(channel, id, country); } catch (error) { logger.info({ at: 'Subscription#subscribe', @@ -483,10 +485,12 @@ export class Subscriptions { private async getInitialResponseForSubaccountSubscription( id?: string, + country?: string, ): Promise { if (id === undefined) { throw new Error('Invalid undefined id'); } + try { const { address, @@ -507,6 +511,9 @@ export class Subscriptions { method: RequestMethod.GET, url: `${COMLINK_URL}/v4/addresses/${address}/subaccountNumber/${subaccountNumber}`, timeout: config.INITIAL_GET_TIMEOUT_MS, + headers: { + 'cf-ipcountry': country, + }, transformResponse: (res) => res, }), // TODO(DEC-1462): Use the /active-orders endpoint once it's added. @@ -514,6 +521,9 @@ export class Subscriptions { method: RequestMethod.GET, url: `${COMLINK_URL}/v4/orders?address=${address}&subaccountNumber=${subaccountNumber}&status=OPEN,UNTRIGGERED,BEST_EFFORT_OPENED`, timeout: config.INITIAL_GET_TIMEOUT_MS, + headers: { + 'cf-ipcountry': country, + }, transformResponse: (res) => res, }), ]); @@ -571,9 +581,10 @@ export class Subscriptions { private async getInitialResponsesForChannels( channel: Channel, id?: string, + country?: string, ): Promise { if (channel === Channel.V4_ACCOUNTS) { - return this.getInitialResponseForSubaccountSubscription(id); + return this.getInitialResponseForSubaccountSubscription(id, country); } const endpoint: string | undefined = this.getInitialEndpointForSubscription(channel, id); // If no endpoint exists, return an empty initial response. @@ -585,6 +596,9 @@ export class Subscriptions { method: RequestMethod.GET, url: endpoint, timeout: config.INITIAL_GET_TIMEOUT_MS, + headers: { + 'cf-ipcountry': country, + }, transformResponse: (res) => res, // Disables JSON parsing }); } From 25f7f074c71b2c81ce4ae37e74fa8f0732226045 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Wed, 21 Feb 2024 12:44:06 -0500 Subject: [PATCH 5/5] lint --- .../socks/__tests__/lib/subscriptions.test.ts | 15 +++++++-------- indexer/services/socks/src/lib/subscription.ts | 1 - 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/indexer/services/socks/__tests__/lib/subscriptions.test.ts b/indexer/services/socks/__tests__/lib/subscriptions.test.ts index 57af5b2706..3d31f00f1d 100644 --- a/indexer/services/socks/__tests__/lib/subscriptions.test.ts +++ b/indexer/services/socks/__tests__/lib/subscriptions.test.ts @@ -57,8 +57,7 @@ describe('Subscriptions', () => { [Channel.V4_TRADES]: ['/v4/trades/perpetualMarket/.+'], }; const initialMessage: Object = { a: 'b' }; - const restrictedCountry: string = 'US'; - const nonRestrictedCountry: string = 'AR'; + const country: string = 'AR'; beforeAll(async () => { await dbHelpers.migrate(); @@ -102,7 +101,7 @@ describe('Subscriptions', () => { initialMsgId, id, false, - nonRestrictedCountry, + country, ); expect(sendMessageStringMock).toHaveBeenCalledTimes(1); @@ -123,7 +122,7 @@ describe('Subscriptions', () => { expect(axiosRequestMock).toHaveBeenCalledWith(expect.objectContaining({ url: expect.stringMatching(RegExp(urlPattern)), headers: { - 'cf-ipcountry': nonRestrictedCountry, + 'cf-ipcountry': country, }, })); } @@ -248,7 +247,7 @@ describe('Subscriptions', () => { initialMsgId, mockSubaccountId, false, - nonRestrictedCountry, + country, ); expect(sendMessageMock).toHaveBeenCalledTimes(1); @@ -275,7 +274,7 @@ describe('Subscriptions', () => { initialMsgId, mockSubaccountId, false, - nonRestrictedCountry, + country, ); expect(sendMessageStringMock).toHaveBeenCalledTimes(1); @@ -312,7 +311,7 @@ describe('Subscriptions', () => { initialMsgId, id, false, - nonRestrictedCountry, + country, ); subscriptions.unsubscribe( connectionId, @@ -332,7 +331,7 @@ describe('Subscriptions', () => { initialMsgId, mockSubaccountId, false, - nonRestrictedCountry, + country, ); subscriptions.unsubscribe( connectionId, diff --git a/indexer/services/socks/src/lib/subscription.ts b/indexer/services/socks/src/lib/subscription.ts index ac46466caa..7940dd3f33 100644 --- a/indexer/services/socks/src/lib/subscription.ts +++ b/indexer/services/socks/src/lib/subscription.ts @@ -3,7 +3,6 @@ import { logger, stats, } from '@dydxprotocol-indexer/base'; -import { isRestrictedCountry } from '@dydxprotocol-indexer/compliance'; import { CandleResolution, perpetualMarketRefresher } from '@dydxprotocol-indexer/postgres'; import WebSocket from 'ws';