Skip to content

Commit

Permalink
basic implementation for #55. work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
bitrinjani committed Jan 18, 2018
1 parent fd4451b commit 82b75b7
Show file tree
Hide file tree
Showing 51 changed files with 360 additions and 113 deletions.
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"dependencies": {
"@bitr/castable": "^1.0.1",
"@bitr/chronodb": "^1.0.2",
"@bitr/logger": "^1.0.0",
"chalk": "^2.3.0",
"date-fns": "^1.28.5",
"decimal.js": "^7.3.0",
Expand All @@ -32,8 +33,10 @@
"node-fetch": "^1.7.3",
"pino": "^4.10.2",
"reflect-metadata": "^0.1.10",
"simple-statistics": "^5.2.1",
"split2": "^2.2.0",
"uuid": "^3.1.0"
"uuid": "^3.1.0",
"zeromq": "^4.6.0"
},
"devDependencies": {
"@types/chalk": "^2.2.0",
Expand All @@ -49,6 +52,7 @@
"@types/pino": "^4.7.0",
"@types/split2": "^2.1.6",
"@types/uuid": "^3.4.2",
"@types/zeromq": "^4.5.3",
"coveralls": "^3.0.0",
"cpy-cli": "^1.0.1",
"jest": "^21.0.0",
Expand Down
47 changes: 47 additions & 0 deletions plugins/SpreadStatHandler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
const _ = require('lodash');
const ss = require('simple-statistics');
const { getLogger } = require('@bitr/logger');

const precision = 3;

class SpreadStatHandler {
// Constructor is called when initial snapshot of spread stat history has arrived.
constructor(history) {
this.log = getLogger(this.constructor.name);
const profitPercentHistory = history.map(x => x.bestCase.profitPercentAgainstNotional);
this.sampleSize = profitPercentHistory.length;
this.profitPercentMean = ss.mean(profitPercentHistory);
this.profitPercentVariance = ss.sampleVariance(profitPercentHistory);
}

// The method is called each time new spread stat has arrived, by default every 3 seconds.
// Return value: part of ConfigRoot or undefined.
// If part of ConfigRoot is returned, the configuration will be merged. If undefined is returned, no update will be made.
async handle(spreadStat) {
const newData = spreadStat.bestCase.profitPercentAgainstNotional;
// add new data to mean
this.profitPercentMean = ss.addToMean(this.profitPercentMean, this.sampleSize, newData);
// add new data to variance
this.profitPercentVariance = ss.combineVariances(
this.profitPercentVariance,
this.profitPercentMean,
this.sampleSize,
0,
newData,
1
);

this.sampleSize++;

// set μ + σ to minTargetProfitPercent
const mean = this.profitPercentMean;
const standardDeviation = Math.sqrt(this.profitPercentVariance);
const n = this.sampleSize;
const minTargetProfitPercent = _.round(mean + standardDeviation, precision);
this.log.info(`μ: ${_.round(mean, precision)}, σ: ${_.round(standardDeviation, precision)}, n: ${n} => minTargetProfitPercent: ${minTargetProfitPercent}`);
const config = { minTargetProfitPercent };
return config;
}
}

module.exports = SpreadStatHandler;
2 changes: 1 addition & 1 deletion src/AppRoot.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getLogger } from './logger';
import { getLogger } from '@bitr/logger';
import t from './intl';
import 'reflect-metadata';
import container from './container';
Expand Down
2 changes: 1 addition & 1 deletion src/Arbitrager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getLogger } from './logger';
import { getLogger } from '@bitr/logger';
import { injectable, inject } from 'inversify';
import * as _ from 'lodash';
import { ConfigStore, Quote } from './types';
Expand Down
2 changes: 1 addition & 1 deletion src/Bitflyer/BrokerAdapterImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
Quote,
BrokerConfigType
} from '../types';
import { getLogger } from '../logger';
import { getLogger } from '@bitr/logger';
import * as _ from 'lodash';
import BrokerApi from './BrokerApi';
import { ChildOrdersParam, SendChildOrderRequest, ChildOrder, BoardResponse } from './types';
Expand Down
2 changes: 1 addition & 1 deletion src/BrokerAdapterRouter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Broker, BrokerAdapter, BrokerMap, Order, Quote } from './types';
import { getLogger } from './logger';
import { getLogger } from '@bitr/logger';
import * as _ from 'lodash';
import { injectable, multiInject } from 'inversify';
import symbols from './symbols';
Expand Down
2 changes: 1 addition & 1 deletion src/Coincheck/BrokerAdapterImpl.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getLogger } from '../logger';
import { getLogger } from '@bitr/logger';
import { addMinutes } from 'date-fns';
import * as _ from 'lodash';
import BrokerApi from './BrokerApi';
Expand Down
82 changes: 82 additions & 0 deletions src/ComputeEngine/ComputeEngine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { socket, Socket } from 'zeromq';
import { SpreadStat } from '../types';
import { getLogger } from '@bitr/logger';
import { reportServicePubUrl, reportServiceRepUrl, configStoreSocketUrl } from '../constants';
import { parseBuffer } from '../util';

export default class ComputeEngine {
private isHandling: boolean;
private readonly log = getLogger(this.constructor.name);
private readonly pluginPath = `${process.cwd()}/plugins/SpreadStatHandler`;
private readonly streamSubscriber: Socket;
private readonly snapshotRequester: Socket;
private readonly configUpdater: Socket;
private spreadStatHandler?: { handle: (spreadStat: SpreadStat) => any };

constructor() {
this.configUpdater = socket('push');
this.snapshotRequester = socket('req');
this.streamSubscriber = socket('sub');
}

start() {
this.log.debug('Starting ComputeEngine');
this.configUpdater.bindSync(configStoreSocketUrl);
this.snapshotRequester.connect(reportServiceRepUrl);
this.snapshotRequester.on('message', message => this.handleSnapshotMessage(message));
this.snapshotRequester.send('spreadStatSnapshot');
this.streamSubscriber.connect(reportServicePubUrl);
this.streamSubscriber.subscribe('spreadStat');
this.streamSubscriber.on('message', (topic, message) => this.handleStreamMessage(topic, message));
process.on('message', message => {
if (message === 'stop') {
this.stop();
}
});
this.log.debug('Started.');
}

stop() {
this.log.debug('Stopping ComputeEngine...');
this.streamSubscriber.close();
this.snapshotRequester.close();
this.configUpdater.unbindSync(configStoreSocketUrl);
this.log.debug('Stopped.');
}

private async handleSnapshotMessage(message: Buffer) {
const snapshot = parseBuffer<SpreadStat[]>(message);
if (snapshot === undefined) {
this.log.warn('Failed to parse the initial snapshot message.');
return;
}
try {
const SpreadStatHandler = await import(this.pluginPath);
this.spreadStatHandler = new SpreadStatHandler(snapshot);
} catch (ex) {
this.log.warn(`Failed to import SpreadStatHandler plugin. ${ex.message}`);
this.log.debug(ex.stack);
}
}

private async handleStreamMessage(topic: Buffer, message: Buffer) {
if (this.isHandling) {
return;
}
try {
this.isHandling = true;
if (topic.toString() === 'spreadStat') {
const spreadStat = parseBuffer<SpreadStat>(message);
if (spreadStat && this.spreadStatHandler) {
const config = await this.spreadStatHandler.handle(spreadStat);
this.configUpdater.send(JSON.stringify(config));
}
}
} catch (ex) {
this.log.warn(`${ex.message}`);
this.log.debug(ex.stack);
} finally {
this.isHandling = false;
}
}
}
4 changes: 4 additions & 0 deletions src/ComputeEngine/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import ComputeEngine from './ComputeEngine';

const computeEngine = new ComputeEngine();
computeEngine.start();
52 changes: 48 additions & 4 deletions src/JsonConfigStore.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,68 @@
import { injectable } from 'inversify';
import { ConfigStore, ConfigRoot } from './types';
import { getConfigRoot } from './configUtil';
import { getConfigRoot, getConfigPath } from './configUtil';
import ConfigValidator from './ConfigValidator';
import { setTimeout } from 'timers';
import { socket, Socket } from 'zeromq';
import { configStoreSocketUrl } from './constants';
import { parseBuffer } from './util';
import * as _ from 'lodash';
import * as fs from 'fs';
import { promisify } from 'util';
import { getLogger } from '@bitr/logger';

const writeFile = promisify(fs.writeFile);

@injectable()
export default class JsonConfigStore implements ConfigStore {
timer: NodeJS.Timer;
private readonly log = getLogger(this.constructor.name);
private readonly pullSocket: Socket;
private TTL = 5 * 1000;
private cache?: ConfigRoot;

constructor(private readonly configValidator: ConfigValidator) {}
constructor(private readonly configValidator: ConfigValidator) {
this.pullSocket = socket('pull');
this.pullSocket.connect(configStoreSocketUrl);
this.pullSocket.on('message', async message => {
try {
const newConfig = parseBuffer(message);
if (newConfig === undefined) {
this.log.debug(`Invalid message received. Message: ${message.toString()}`);
return;
}
await this.setConfig(_.merge({}, getConfigRoot(), newConfig));
this.log.debug(`Config updated with ${JSON.stringify(newConfig)}`);
} catch (ex) {
this.log.warn(`Failed to write config. Error: ${ex.message}`);
this.log.debug(ex.stack);
}
});
}

close() {
this.pullSocket.close();
}

get config(): ConfigRoot {
if (this.cache) {
return this.cache;
}
const config = getConfigRoot();
this.configValidator.validate(config);
this.cache = config;
setTimeout(() => (this.cache = undefined), this.TTL);
this.updateCache(config);
return config;
}

private async setConfig(config: ConfigRoot) {
this.configValidator.validate(config);
await writeFile(getConfigPath(), JSON.stringify(config, undefined, 2));
this.updateCache(config);
}

private updateCache(config: ConfigRoot) {
this.cache = config;
clearTimeout(this.timer);
this.timer = setTimeout(() => (this.cache = undefined), this.TTL);
}
} /* istanbul ignore next */
2 changes: 1 addition & 1 deletion src/MainLimitChecker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
LimitChecker,
OrderPair
} from './types';
import { getLogger } from './logger';
import { getLogger } from '@bitr/logger';
import * as _ from 'lodash';
import t from './intl';
import PositionService from './PositionService';
Expand Down
2 changes: 1 addition & 1 deletion src/OpportunitySearcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getLogger } from './logger';
import { getLogger } from '@bitr/logger';
import { injectable, inject } from 'inversify';
import * as _ from 'lodash';
import { ConfigStore, SpreadAnalysisResult, ActivePairStore, Quote, OrderPair } from './types';
Expand Down
2 changes: 1 addition & 1 deletion src/PairTrader.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getLogger } from './logger';
import { getLogger } from '@bitr/logger';
import { injectable, inject } from 'inversify';
import * as _ from 'lodash';
import OrderImpl from './OrderImpl';
Expand Down
2 changes: 1 addition & 1 deletion src/PositionService.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { injectable, inject } from 'inversify';
import { ConfigStore, BrokerConfig, BrokerMap, BrokerPosition } from './types';
import { getLogger } from './logger';
import { getLogger } from '@bitr/logger';
import * as _ from 'lodash';
import Decimal from 'decimal.js';
import BrokerPositionImpl from './BrokerPositionImpl';
Expand Down
2 changes: 1 addition & 1 deletion src/Quoine/BrokerAdapterImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
BrokerConfigType
} from '../types';
import BrokerApi from './BrokerApi';
import { getLogger } from '../logger';
import { getLogger } from '@bitr/logger';
import * as _ from 'lodash';
import { PriceLevelsResponse, SendOrderRequest, OrdersResponse, CashMarginTypeStrategy } from './types';
import { timestampToDate, toExecution, toQuote } from '../util';
Expand Down
2 changes: 1 addition & 1 deletion src/QuoteAggregator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { injectable, inject } from 'inversify';
import { ConfigStore, BrokerConfig, QuoteSide, Broker, Quote } from './types';
import { getLogger } from './logger';
import { getLogger } from '@bitr/logger';
import * as _ from 'lodash';
import symbols from './symbols';
import QuoteImpl from './QuoteImpl';
Expand Down
Loading

0 comments on commit 82b75b7

Please sign in to comment.