Skip to content

Commit

Permalink
Merge cdffe96 into 8d82cca
Browse files Browse the repository at this point in the history
  • Loading branch information
lsaether committed Sep 25, 2018
2 parents 8d82cca + cdffe96 commit 6b80ef6
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 170 deletions.
230 changes: 102 additions & 128 deletions package-lock.json

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions src/Config/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ export default class Config implements IConfigParams {
public eac: any;
public economicStrategy?: IEconomicStrategy;
public logger?: ILogger;
public maxRetries?: number;
public ms: any;
public providerUrl: string;
public providerUrls: string[];
public scanSpread: any;
public statsDb: StatsDB;
public statsDbLoaded: Promise<boolean>;
Expand All @@ -52,13 +53,13 @@ export default class Config implements IConfigParams {

// tslint:disable-next-line:cognitive-complexity
constructor(params: IConfigParams) {
if (params.providerUrl) {
this.web3 = W3Util.getWeb3FromProviderUrl(params.providerUrl);
if (params.providerUrls.length) {
this.web3 = W3Util.getWeb3FromProviderUrl(params.providerUrls[0]);
this.util = new W3Util(this.web3);
this.eac = EAC(this.web3);
this.providerUrl = params.providerUrl;
this.providerUrls = params.providerUrls;
} else {
throw new Error('Please set the providerUrl in the config object.');
throw new Error('Must pass at least 1 providerUrl to the config object.');
}

this.economicStrategy = params.economicStrategy || {
Expand All @@ -70,6 +71,7 @@ export default class Config implements IConfigParams {

this.autostart = params.autostart !== undefined ? params.autostart : true;
this.claiming = params.claiming || false;
this.maxRetries = params.maxRetries || 30;
this.ms = params.ms || 4000;
this.scanSpread = params.scanSpread || 50;
this.walletStoresAsPrivateKeys = params.walletStoresAsPrivateKeys || false;
Expand Down
3 changes: 2 additions & 1 deletion src/Config/IConfigParams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ export interface IConfigParams {
claiming?: boolean;
economicStrategy?: IEconomicStrategy;
logger?: ILogger | null;
maxRetries?: number;
ms?: any;
password?: any;
providerUrl: string;
providerUrls: string[];
scanSpread?: number | null;
statsDb?: any;
walletStores?: any;
Expand Down
8 changes: 8 additions & 0 deletions src/Enum/ReconnectMsg.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export enum ReconnectMsg {
NULL = '',
ALREADY_RECONNECTED = 'Recent reconnection. Not attempting for a few seconds.',
RECONNECTED = 'Reconnected!',
MAX_ATTEMPTS = 'Max attempts reached. Stopped TimeNode.',
RECONNECTING = 'Reconnecting in progress.',
FAIL = 'Reconnection failed! Trying again...'
}
11 changes: 6 additions & 5 deletions src/Enum/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
export { Networks } from './Networks';
export { TxStatus } from './TxStatus';
export { FnSignatures } from './FnSignatures';
export { ExecuteStatus } from './ExecuteStatus';
export { AbortReason } from './AbortReason';
export { ClaimStatus } from './ClaimStatus';
export { EconomicStrategyStatus } from './EconomicStrategyStatus';
export { AbortReason } from './AbortReason';
export { ExecuteStatus } from './ExecuteStatus';
export { FnSignatures } from './FnSignatures';
export { Networks } from './Networks';
export { ReconnectMsg } from './ReconnectMsg';
export { TxStatus } from './TxStatus';
8 changes: 6 additions & 2 deletions src/Scanner/TimeNodeScanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,18 @@ export default class TimeNodeScanner extends ChainScanner implements ITimeNodeSc

await this.txPool.start();

this.scanning = true;
this.cacheInterval = await this.runAndSetInterval(() => this.scanCache(), this.config.ms);
this.chainInterval = await this.runAndSetInterval(() => this.watchBlockchain(), 5 * 60 * 1000);

// Mark that we've started.
this.config.logger.info('Scanner STARTED');
this.scanning = true;
return this.scanning;
}

public stop(): boolean {
if (this.scanning) {
this.scanning = false;
// Clear scanning intervals.
clearInterval(this.cacheInterval);
clearInterval(this.chainInterval);
Expand All @@ -53,7 +54,6 @@ export default class TimeNodeScanner extends ChainScanner implements ITimeNodeSc

// Mark that we've stopped.
this.config.logger.info('Scanner STOPPED');
this.scanning = false;
}

Object.keys(this.buckets).forEach((bucketType: string) => {
Expand All @@ -68,6 +68,10 @@ export default class TimeNodeScanner extends ChainScanner implements ITimeNodeSc
}

private async runAndSetInterval(fn: () => Promise<void>, interval: number): Promise<IntervalId> {
if (!this.scanning) {
this.config.logger.debug('Not starting intervals when TimeNode is intentionally stopped.');
return null;
}
const wrapped = async (): Promise<void> => {
try {
await fn();
Expand Down
9 changes: 8 additions & 1 deletion src/TimeNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { Networks } from './Enum';
import Scanner from './Scanner';
import Router from './Router';
import Version from './Version';

import W3Util from './Util';
import WsReconnect from './WsReconnect';
export default class TimeNode {
public actions: Actions;
public config: Config;
Expand Down Expand Up @@ -34,6 +35,12 @@ export default class TimeNode {
this.config = config;
this.scanner = new Scanner(this.config, this.router);

const { logger, providerUrls } = this.config;
if (W3Util.isWSConnection(providerUrls[0])) {
logger.debug('WebSockets provider detected! Setting up reconnect events...');
new WsReconnect(this).setup();
}

this.startupMessage();
}

Expand Down
14 changes: 6 additions & 8 deletions src/TxPool/Txpool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,16 @@ export default class TxPool {

public async stop(): Promise<boolean> {
if (this.subs.pending) {
try {
await this.util.stopFilter(this.subs.pending);
} catch (e) {
await this.util.stopFilter(this.subs.pending)
.catch ((e) => {
this.logger.error(e);
}
});
}
if (this.subs.latest) {
try {
await this.util.stopFilter(this.subs.latest);
} catch (e) {
await this.util.stopFilter(this.subs.latest)
.catch ((e) => {
this.logger.error(e);
}
})
}
if (this.subs.mined) {
clearInterval(this.subs.mined);
Expand Down
13 changes: 11 additions & 2 deletions src/Util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,21 @@ import * as Web3WsProvider from 'web3-providers-ws';
import { IBlock, ITxRequest } from './Types';

export default class W3Util {

public static isHTTPConnection(url: string) : boolean {
return url.includes('http://') || url.includes('https://');
}

public static isWSConnection(url: string) : boolean {
return url.includes('ws://') || url.includes('wss://');
}

public static getWeb3FromProviderUrl(providerUrl: string) {
let provider: any;

if (providerUrl.includes('http://') || providerUrl.includes('https://')) {
if (this.isHTTPConnection(providerUrl)) {
provider = new Web3.providers.HttpProvider(providerUrl);
} else if (providerUrl.includes('ws://') || providerUrl.includes('wss://')) {
} else if (this.isWSConnection(providerUrl)) {
provider = new Web3WsProvider(providerUrl);
provider.__proto__.sendAsync = provider.__proto__.sendAsync || provider.__proto__.send;
}
Expand Down
97 changes: 97 additions & 0 deletions src/WsReconnect/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import TimeNode from '../TimeNode';
import { ReconnectMsg } from '../Enum';
import W3Util from '../Util';

declare const setTimeout: any;

export default class WsReconnect {
private timenode: TimeNode;
private reconnectTries: number = 0;
private reconnecting: boolean = false;
private reconnected: boolean = false;

constructor(timenode: TimeNode) {
this.timenode = timenode;
}

public setup(): void {
const {
logger,
web3: { currentProvider }
} = this.timenode.config;

currentProvider.on('error', (err: any) => {
logger.debug(`[WS ERROR] ${err}`);
setTimeout(async () => {
const msg: ReconnectMsg = await this.handleWsDisconnect();
logger.debug(`[WS RECONNECT] ${msg}`);
}, this.reconnectTries * 1000);
});

currentProvider.on('end', (err: any) => {
logger.debug(`[WS END] Type= ${err.type} Reason= ${err.reason}`);
setTimeout(async () => {
const msg = await this.handleWsDisconnect();
logger.debug(`[WS RECONNECT] ${msg}`);
}, this.reconnectTries * 1000);
});
}

private async handleWsDisconnect(): Promise<ReconnectMsg> {
if (this.reconnected) {
return ReconnectMsg.ALREADY_RECONNECTED;
}
if (this.reconnectTries >= this.timenode.config.maxRetries) {
this.timenode.stopScanning();
return ReconnectMsg.MAX_ATTEMPTS;
}
if (this.reconnecting) {
return ReconnectMsg.RECONNECTING;
}

// Try to reconnect.
this.reconnecting = true;
if (await this.wsReconnect()) {
await this.timenode.startScanning();
this.reconnectTries = 0;
this.setup();
this.reconnected = true;
this.reconnecting = false;
setTimeout(() => {
this.reconnected = false;
}, 10000);
return ReconnectMsg.RECONNECTED;
}

this.reconnecting = false;
this.reconnectTries++;
setTimeout(() => {
this.handleWsDisconnect();
}, this.reconnectTries * 1000);

return ReconnectMsg.FAIL;
}

private async wsReconnect(): Promise<boolean> {
const {
config: { logger, providerUrls }
} = this.timenode;
logger.debug('Attempting WS Reconnect.');
try {
const providerUrl = providerUrls[this.reconnectTries % providerUrls.length];
this.timenode.config.web3 = W3Util.getWeb3FromProviderUrl(providerUrl);

this.timenode.config.util = new W3Util(this.timenode.config.web3);
this.timenode.scanner.util = this.timenode.config.util;
if (await this.timenode.config.util.isWatchingEnabled()) {
return true;
} else {
throw new Error('Invalid providerUrl! eth_getFilterLogs not enabled.');
}
} catch (err) {
logger.error(err.message);
logger.info(`Reconnect tries: ${this.reconnectTries}`);
return false;
}
}
}
2 changes: 1 addition & 1 deletion test/helpers/mockConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const mockConfig = async () => {
logger: new DefaultLogger(),
ms: 4000,
password,
providerUrl,
providerUrls: [providerUrl],
scanSpread: 0,
statsDb: new loki('stats.db'),
walletStores: wallet,
Expand Down
12 changes: 6 additions & 6 deletions test/unit/UnitTestConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ const WALLET_KEYSTORE = `{"version":3,"id":"90ff6d22-668b-492a-bc56-8b560fece46d
describe('Config unit tests', () => {
describe('constructor()', () => {
it('throws an error when initiating without required params', () => {
expect(() => new Config({ providerUrl: null })).to.throw();
expect(() => new Config({ providerUrls: null })).to.throw();
});

it('check all default values are set when empty config', () => {
const config = new Config({ providerUrl });
const config = new Config({ providerUrls: [providerUrl] });

assert.isTrue(config.autostart);
assert.isFalse(config.claiming);
Expand Down Expand Up @@ -46,7 +46,7 @@ describe('Config unit tests', () => {
};

const config = new Config({
providerUrl,
providerUrls: [providerUrl],
autostart: false,
claiming: true,
economicStrategy,
Expand All @@ -69,7 +69,7 @@ describe('Config unit tests', () => {

it('wallet decrypted when using a keystore string', () => {
const config = new Config({
providerUrl,
providerUrls: [providerUrl],
walletStores: [WALLET_KEYSTORE],
password: WALLET_PASSWD
});
Expand All @@ -79,7 +79,7 @@ describe('Config unit tests', () => {

it('wallet decrypted when using a keystore object', () => {
const config = new Config({
providerUrl,
providerUrls: [providerUrl],
walletStores: [JSON.parse(WALLET_KEYSTORE)],
password: WALLET_PASSWD
});
Expand All @@ -91,7 +91,7 @@ describe('Config unit tests', () => {
expect(
() =>
new Config({
providerUrl,
providerUrls: [providerUrl],
walletStores: [WALLET_KEYSTORE]
})
).to.throw();
Expand Down

0 comments on commit 6b80ef6

Please sign in to comment.