Skip to content

Commit

Permalink
analytics refactor - work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
bitrinjani committed Jan 19, 2018
1 parent 82b75b7 commit 2a47c17
Show file tree
Hide file tree
Showing 20 changed files with 261 additions and 168 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ config.json
.idea
.python-version
datastore/
reports/
reports/
sandbox/
5 changes: 5 additions & 0 deletions config_default.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
"ttl": 3000
}
},
"analytics": {
"enabled": false,
"fileName": "SampleHandler.js",
"initialHistory": { "minutes": 30 }
},
"brokers": [
{
"broker": "Coincheck",
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
"**/*.{ts,tsx}",
"!src/index.ts",
"!src/transport/index.ts",
"!src/container.config.ts",
"!**/__tests__/**",
"!**/node_modules/**",
"!**/vendor/**",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { getLogger } = require('@bitr/logger');

const precision = 3;

class SpreadStatHandler {
class SimpleSpreadStatHandler {
// Constructor is called when initial snapshot of spread stat history has arrived.
constructor(history) {
this.log = getLogger(this.constructor.name);
Expand All @@ -14,8 +14,8 @@ class SpreadStatHandler {
this.profitPercentVariance = ss.sampleVariance(profitPercentHistory);
}

// The method is called each time new spread stat has arrived, by default every 3 seconds.
// Return value: part of ConfigRoot or undefined.
// The method is called each time new spread stat has arrived, by default every 3 seconds.
// Return value: part of ConfigRoot or undefined.
// If part of ConfigRoot is returned, the configuration will be merged. If undefined is returned, no update will be made.
async handle(spreadStat) {
const newData = spreadStat.bestCase.profitPercentAgainstNotional;
Expand All @@ -34,14 +34,19 @@ class SpreadStatHandler {
this.sampleSize++;

// set μ + σ to minTargetProfitPercent
const mean = this.profitPercentMean;
const standardDeviation = Math.sqrt(this.profitPercentVariance);
const n = this.sampleSize;
const mean = this.profitPercentMean;
const standardDeviation = Math.sqrt(this.profitPercentVariance * n/(n-1));
const minTargetProfitPercent = _.round(mean + standardDeviation, precision);
this.log.info(`μ: ${_.round(mean, precision)}, σ: ${_.round(standardDeviation, precision)}, n: ${n} => minTargetProfitPercent: ${minTargetProfitPercent}`);
this.log.info(
`μ: ${_.round(mean, precision)}, σ: ${_.round(
standardDeviation,
precision
)}, n: ${n} => minTargetProfitPercent: ${minTargetProfitPercent}`
);
const config = { minTargetProfitPercent };
return config;
}
}

module.exports = SpreadStatHandler;
module.exports = SimpleSpreadStatHandler;
3 changes: 1 addition & 2 deletions src/AppRoot.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { getLogger } from '@bitr/logger';
import t from './intl';
import 'reflect-metadata';
import container from './container';
import symbols from './symbols';
import { BrokerAdapter, ConfigStore } from './types';
import { Container } from 'inversify';
Expand All @@ -15,7 +14,7 @@ export default class AppRoot {
private readonly log = getLogger(this.constructor.name);
private services: { start: () => Promise<void>; stop: () => Promise<void> }[];

constructor(private readonly ioc: Container = container) {}
constructor(private readonly ioc: Container) {}

async start(): Promise<void> {
try {
Expand Down
82 changes: 0 additions & 82 deletions src/ComputeEngine/ComputeEngine.ts

This file was deleted.

4 changes: 0 additions & 4 deletions src/ComputeEngine/index.ts

This file was deleted.

64 changes: 39 additions & 25 deletions src/JsonConfigStore.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { injectable } from 'inversify';
import { ConfigStore, ConfigRoot } from './types';
import { ConfigStore, ConfigRoot, ConfigRequest } from './types';
import { getConfigRoot, getConfigPath } from './configUtil';
import ConfigValidator from './ConfigValidator';
import { setTimeout } from 'timers';
Expand All @@ -15,35 +15,18 @@ const writeFile = promisify(fs.writeFile);

@injectable()
export default class JsonConfigStore implements ConfigStore {
timer: NodeJS.Timer;
private readonly log = getLogger(this.constructor.name);
private readonly pullSocket: Socket;
private TTL = 5 * 1000;
private timer: NodeJS.Timer;
private readonly server: Socket;
private readonly TTL = 5 * 1000;
private cache?: ConfigRoot;

constructor(private readonly configValidator: ConfigValidator) {
this.pullSocket = socket('pull');
this.pullSocket.connect(configStoreSocketUrl);
this.pullSocket.on('message', async message => {
try {
const newConfig = parseBuffer(message);
if (newConfig === undefined) {
this.log.debug(`Invalid message received. Message: ${message.toString()}`);
return;
}
await this.setConfig(_.merge({}, getConfigRoot(), newConfig));
this.log.debug(`Config updated with ${JSON.stringify(newConfig)}`);
} catch (ex) {
this.log.warn(`Failed to write config. Error: ${ex.message}`);
this.log.debug(ex.stack);
}
});
this.server = socket('rep');
this.server.on('message', message => this.messageHandler(message));
this.server.bindSync(configStoreSocketUrl);
}

close() {
this.pullSocket.close();
}

get config(): ConfigRoot {
if (this.cache) {
return this.cache;
Expand All @@ -54,12 +37,43 @@ export default class JsonConfigStore implements ConfigStore {
return config;
}

private async setConfig(config: ConfigRoot) {
async set(config: ConfigRoot) {
this.configValidator.validate(config);
await writeFile(getConfigPath(), JSON.stringify(config, undefined, 2));
this.updateCache(config);
}

close() {
this.server.close();
}

private async messageHandler(message: Buffer) {
const parsed = parseBuffer<ConfigRequest>(message);
if (parsed === undefined) {
this.log.debug(`Invalid message received. Message: ${message.toString()}`);
return;
}
switch (parsed.type) {
case 'set':
try {
const newConfig = parsed.data;
await this.set(_.merge({}, getConfigRoot(), newConfig));
this.server.send(JSON.stringify({ success: true }));
this.log.debug(`Config updated with ${JSON.stringify(newConfig)}`);
} catch (ex) {
this.log.warn(`Failed to update config. Error: ${ex.message}`);
this.log.debug(ex.stack);
}
break;
case 'get':
this.server.send(JSON.stringify(getConfigRoot()));
break;
default:
this.log.warn(`ConfigStore received an invalid message. Message: ${parsed}`);
break;
}
}

private updateCache(config: ConfigRoot) {
this.cache = config;
clearTimeout(this.timer);
Expand Down
35 changes: 16 additions & 19 deletions src/ReportService.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,38 @@
import SpreadAnalyzer from './SpreadAnalyzer';
import { injectable, inject } from 'inversify';
import symbols from './symbols';
import { SpreadStatTimeSeries, Quote, ConfigStore } from './types';
import { SpreadStatTimeSeries, Quote, ConfigStore, ZmqSocket } from './types';
import QuoteAggregator from './QuoteAggregator';
import { spreadStatToCsv, spreadStatCsvHeader } from './SpreadStatTimeSeries';
import * as fs from 'fs';
import * as mkdirp from 'mkdirp';
import { promisify } from 'util';
import { socket, Socket } from 'zeromq';
import { socket } from 'zeromq';
import { fork, ChildProcess } from 'child_process';
import { subDays } from 'date-fns';
import { reportServicePubUrl, reportServiceRepUrl } from './constants';
import { getLogger } from '@bitr/logger';

// patch for @types/zeromq
interface ZmqSocket extends Socket {
removeAllListeners: any;
}
import { cwd } from './util';
import { Duration, DateTime } from 'luxon';

const writeFile = promisify(fs.writeFile);

@injectable()
export default class ReportService {
private readonly log = getLogger(this.constructor.name);
private readonly computeEnginePath = `${__dirname}/ComputeEngine`;
private readonly nDaysForSnapshot = 3;
private readonly reportDir = `${process.cwd()}/reports`;
private readonly analyticsPath = `${__dirname}/analytics`;
private readonly reportDir = `${cwd()}/reports`;
private readonly spreadStatReport = `${this.reportDir}/spreadStat.csv`;
private readonly spreadStatWriteStream: fs.WriteStream;
private spreadStatWriteStream: fs.WriteStream;
private readonly streamPublisher: ZmqSocket;
private readonly snapshotResponder: ZmqSocket;
private computeEngineProcess: ChildProcess;
private analyticsProcess: ChildProcess;

constructor(
private readonly quoteAggregator: QuoteAggregator,
private readonly spreadAnalyzer: SpreadAnalyzer,
@inject(symbols.SpreadStatTimeSeries) private readonly spreadStatTimeSeries: SpreadStatTimeSeries,
@inject(symbols.ConfigStore) private readonly configStore: ConfigStore
) {
this.spreadStatWriteStream = fs.createWriteStream(this.spreadStatReport, { flags: 'a' });
this.snapshotResponder = socket('rep') as ZmqSocket;
this.streamPublisher = socket('pub') as ZmqSocket;
}
Expand All @@ -49,11 +43,14 @@ export default class ReportService {
if (!fs.existsSync(this.spreadStatReport)) {
await writeFile(this.spreadStatReport, spreadStatCsvHeader, { flag: 'a' });
}
this.spreadStatWriteStream = fs.createWriteStream(this.spreadStatReport, { flags: 'a' });
this.quoteAggregator.onQuoteUpdated.set(this.constructor.name, quotes => this.quoteUpdated(quotes));
const { analytics } = this.configStore.config;
if (analytics && analytics.enabled) {
const start = subDays(new Date(), analytics.days || this.nDaysForSnapshot);
const end = new Date();
const duration = Duration.fromObject(analytics.initialHistory);
const dt = DateTime.local();
const start = dt.minus(duration).toJSDate();
const end = dt.toJSDate();
const snapshot = await this.spreadStatTimeSeries.query({ start, end });
this.snapshotResponder.on('message', request => {
if (request.toString() === 'spreadStatSnapshot') {
Expand All @@ -62,7 +59,7 @@ export default class ReportService {
});
this.streamPublisher.bindSync(reportServicePubUrl);
this.snapshotResponder.bindSync(reportServiceRepUrl);
this.computeEngineProcess = fork(this.computeEnginePath, [], { stdio: [0, 1, 2, 'ipc'] });
this.analyticsProcess = fork(this.analyticsPath, [], { stdio: [0, 1, 2, 'ipc'] });
}
this.log.debug('Started.');
}
Expand All @@ -73,8 +70,8 @@ export default class ReportService {
this.spreadStatWriteStream.close();
const { analytics } = this.configStore.config;
if (analytics && analytics.enabled) {
await promisify(this.computeEngineProcess.send).bind(this.computeEngineProcess)('stop');
this.computeEngineProcess.kill();
await promisify(this.analyticsProcess.send).bind(this.analyticsProcess)('stop');
this.analyticsProcess.kill();
this.streamPublisher.unbindSync(reportServicePubUrl);
this.streamPublisher.removeAllListeners('message');
this.snapshotResponder.unbindSync(reportServiceRepUrl);
Expand Down

0 comments on commit 2a47c17

Please sign in to comment.