Skip to content

Commit

Permalink
Update to Lightstreamer API to bring in Market price subscription and…
Browse files Browse the repository at this point in the history
… associated test cases. (#613)

Co-authored-by: Brookie Cribb <brookie.cribb@gmail.com>
  • Loading branch information
Brookie144 and Brookie Cribb committed May 6, 2024
1 parent 756e7a8 commit 4fdb71e
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 3 deletions.
5 changes: 4 additions & 1 deletion src/demo/stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {initClient} from './initClient';
import {ChartResolution} from '../lightstreamer/interfaces';
import {CandleStick, TickPrice} from '../market';
import {CandleStick, TickPrice, MarketUpdates} from '../market';
import {tradeSubscriptionUpdate} from '../dealing';
import {AccountUpdate} from '../account';

Expand All @@ -22,6 +22,9 @@ async function main(): Promise<void> {
client.stream.subscribeAccount((accountId: string, accountUpdate: AccountUpdate) => {
console.info('Streaming API Event (subscribeAccount) : ', accountId, accountUpdate);
});
client.stream.subscribeMarketUpdates(['IX.D.NIKKEI.IFA.IP'], (epic: string, marketUpdate: MarketUpdates) => {
console.info('Streaming API Event (MarketUpdated) : ', epic, marketUpdate);
});
}

main().catch(error => {
Expand Down
65 changes: 65 additions & 0 deletions src/lightstreamer/LightstreamerAPI.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,69 @@ describe('LightstreamerAPI', () => {
});
});
});
describe('subscribeMarketUpdate', () => {
it('can subscribe and re-subscribe to market update data', async () => {
nock(APIClient.URL_DEMO)
.post(LoginAPI.URL.SESSION)
.query(true)
.reply(
200,
JSON.stringify({
accountId: 'ABC123',
clientId: '133721337',
lightstreamerEndpoint: 'https://demo-apd.marketdatasystems.com',
oauthToken: {
access_token: '6ba8e2bd-1337-40e5-9299-68f60474f986',
expires_in: '60',
refresh_token: '83c056b8-1337-46d3-821d-92a1dffd7f1e',
scope: 'profile',
token_type: 'Bearer',
},
timezoneOffset: 1,
})
);

nock(APIClient.URL_DEMO)
.get(LoginAPI.URL.SESSION + '?fetchSessionTokens=true')
.reply(
200,
JSON.stringify({
accountId: 'ABC123',
clientId: '133721337',
currency: 'EUR',
lightstreamerEndpoint: 'https://demo-apd.marketdatasystems.com',
locale: 'de_DE',
timezoneOffset: 1,
}),
{
cst: 'a608da13371337e4f600bfa82e3ea43520eb664f22ce18b15a36879bd0eb28CU01113',
'x-security-token': '5e6843dea133713375fe000a5e8b5ec6da09946c0e97bd557f13a6d699CD01113',
}
);

await global.client.rest.login.createSession('test-user', 'test-password');

global.client.stream.subscribeMarketUpdates(['CS.D.BITCOIN.TODAY.IP'], (epic, MarketUpdate) => {
expect(epic).toBe('CS.D.BITCOIN.TODAY.IP');
expect(MarketUpdate.BID).toBe(30);
});

const listeners = global.client.stream.marketUpdateSubscription!.getListeners()[0];
if (listeners.onItemUpdate) {
listeners.onItemUpdate({
getItemName() {
return 'MARKET:CS.D.BITCOIN.TODAY.IP';
},
getValue() {
return '30';
},
} as unknown as ItemUpdate);
}

global.client.stream.subscribeMarketUpdates(['CS.D.BITCOIN.TODAY.IP'], (epic, MarketUpdate) => {
expect(epic).toBe('CS.D.BITCOIN.TODAY.IP');
expect(MarketUpdate.BID).toBe(30);
});
});
});
});
60 changes: 58 additions & 2 deletions src/lightstreamer/LightstreamerAPI.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import {ItemUpdate, LightstreamerClient, Subscription} from 'lightstreamer-client-node';
import {DateTime} from 'luxon';
import {Authorization} from '../client';
import {CandleStick, TickPrice} from '../market';
import {CandleStick, TickPrice, MarketUpdates} from '../market';
import {AccountUpdate} from '../account';
import {ChartFields, ChartResolution, AccountFields, ChartTickFields, TradeSubTypes} from './interfaces';
import {
ChartFields,
ChartResolution,
AccountFields,
ChartTickFields,
TradeSubTypes,
MarketUpdateFields,
} from './interfaces';
import {tradeSubscriptionUpdate} from '../dealing';

export class LightstreamerAPI {
Expand All @@ -13,6 +20,7 @@ export class LightstreamerAPI {
accountSubscription?: Subscription;
orderSubscription?: Subscription;
tradeSubscription?: Subscription;
marketUpdateSubscription?: Subscription;

constructor(private readonly auth: Authorization) {}

Expand Down Expand Up @@ -242,4 +250,52 @@ export class LightstreamerAPI {
lightstream.subscribe(this.tradeSubscription);
return lightstream;
}
subscribeMarketUpdates(
epicList: string[],
onMarketUpdate: (epic: string, item: MarketUpdates) => void
): LightstreamerClient {
const lightstream = this.createLightStream();

const fields = [
MarketUpdateFields.BID,
MarketUpdateFields.OFFER,
MarketUpdateFields.HIGH,
MarketUpdateFields.LOW,
MarketUpdateFields.MID_OPEN,
MarketUpdateFields.CHANGE,
MarketUpdateFields.CHANGE_PCT,
MarketUpdateFields.MARKET_DELAY,
MarketUpdateFields.MARKET_STATE,
MarketUpdateFields.UPDATE_TIME,
];

if (this.marketUpdateSubscription) {
lightstream.unsubscribe(this.marketUpdateSubscription);
}
const epics = epicList.map(x => `MARKET:${x}`);
this.marketUpdateSubscription = new Subscription('MERGE', epics, fields);

this.marketUpdateSubscription.addListener({
onItemUpdate: (item: ItemUpdate) => {
const epic = item.getItemName().split(':')[1];
const UpdateResponse: MarketUpdates = {
BID: parseFloat(item.getValue(MarketUpdateFields.BID)),
CHANGE: parseFloat(item.getValue(MarketUpdateFields.CHANGE)),
CHANGE_PCT: parseFloat(item.getValue(MarketUpdateFields.CHANGE_PCT)),
HIGH: parseFloat(item.getValue(MarketUpdateFields.HIGH)),
LOW: parseFloat(item.getValue(MarketUpdateFields.LOW)),
MARKET_DELAY: item.getValue(MarketUpdateFields.MARKET_DELAY) === 'true',
MARKET_STATE: item.getValue(MarketUpdateFields.MARKET_STATE),
MID_OPEN: parseFloat(item.getValue(MarketUpdateFields.MID_OPEN)),
OFFER: parseFloat(item.getValue(MarketUpdateFields.OFFER)),
UPDATE_TIME: item.getValue(MarketUpdateFields.UPDATE_TIME),
};
onMarketUpdate(epic, UpdateResponse);
},
});

lightstream.connect();
lightstream.subscribe(this.marketUpdateSubscription);
return lightstream;
}
}
13 changes: 13 additions & 0 deletions src/lightstreamer/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,16 @@ export enum WorkingOrderUpdate {
TIME_IN_FORCE = 'timeInForce',
TIMESTAMP = 'timestamp',
}

export enum MarketUpdateFields {
BID = 'BID',
CHANGE = 'CHANGE',
CHANGE_PCT = 'CHANGE_PCT',
HIGH = 'HIGH',
LOW = 'LOW',
MARKET_DELAY = 'MARKET_DELAY',
MARKET_STATE = 'MARKET_STATE',
MID_OPEN = 'MID_OPEN',
OFFER = 'OFFER',
UPDATE_TIME = 'UPDATE_TIME',
}
13 changes: 13 additions & 0 deletions src/market/MarketAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,19 @@ export interface MarketSearch {
markets: Market[];
}

export interface MarketUpdates {
BID: number;
CHANGE: number;
CHANGE_PCT: number;
HIGH: number;
LOW: number;
MARKET_DELAY: boolean;
MARKET_STATE: string;
MID_OPEN: number;
OFFER: number;
UPDATE_TIME: string;
}

export class MarketAPI {
static readonly URL = {
MARKETNAVIGATION: `/marketnavigation`,
Expand Down

0 comments on commit 4fdb71e

Please sign in to comment.