Skip to content

Commit

Permalink
Merge 94238f8 into 10ea071
Browse files Browse the repository at this point in the history
  • Loading branch information
josipbagaric committed Oct 16, 2018
2 parents 10ea071 + 94238f8 commit 6c33fc3
Show file tree
Hide file tree
Showing 13 changed files with 2,261 additions and 2,041 deletions.
3,838 changes: 1,919 additions & 1,919 deletions package-lock.json

Large diffs are not rendered by default.

13 changes: 12 additions & 1 deletion src/Buckets/BucketCalc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ export class BucketCalc {
const latest: IBlock = await this.util.getBlock('latest');
return {
currentBuckets: await this.getCurrentBuckets(latest),
nextBuckets: await this.getNextBuckets(latest)
nextBuckets: await this.getNextBuckets(latest),
afterNextBuckets: await this.getAfterNextBuckets(latest)
};
}

Expand All @@ -39,4 +40,14 @@ export class BucketCalc {
timestampBucket: (await this.requestFactory).calcBucket(nextTsInterval, 2)
};
}

private async getAfterNextBuckets(latest: IBlock): Promise<IBucketPair> {
const nextBlockInterval = latest.number + 2 * BucketSize.block;
const nextTsInterval = latest.timestamp + 2 * BucketSize.timestamp;

return {
blockBucket: (await this.requestFactory).calcBucket(nextBlockInterval, 1),
timestampBucket: (await this.requestFactory).calcBucket(nextTsInterval, 2)
};
}
}
1 change: 1 addition & 0 deletions src/Buckets/IBuckets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ import { IBucketPair } from './IBucketPair';
export interface IBuckets {
currentBuckets: IBucketPair;
nextBuckets: IBucketPair;
afterNextBuckets: IBucketPair;
}
3 changes: 3 additions & 0 deletions src/Scanner/BucketWatchCallback.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { ITxRequestRaw } from '../Types/ITxRequest';

export type BucketWatchCallback = (request: ITxRequestRaw) => void;
59 changes: 59 additions & 0 deletions src/Scanner/Buckets.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { WatchableBucket } from './WatchableBucket';
import { IBuckets, IBucketPair } from '../Buckets';
import { BucketWatchCallback } from './BucketWatchCallback';
import { WatchableBucketFactory } from './WatchableBucketFactory';

export class Buckets {
private buckets: WatchableBucket[] = [];
private watchableBucketFactory: WatchableBucketFactory;

constructor(watchableBucketFactory: WatchableBucketFactory) {
this.watchableBucketFactory = watchableBucketFactory;
}

public async update(newBuckets: IBuckets, callback: BucketWatchCallback) {
if (this.stopped) {
await this.addBucket(newBuckets.currentBuckets, callback);
await this.addBucket(newBuckets.nextBuckets, callback);
await this.addBucket(newBuckets.afterNextBuckets, callback);
} else {
if (this.currentBucket.equals(newBuckets.currentBuckets)) {
return;
}

await this.shift();
await this.addBucket(newBuckets.afterNextBuckets, callback);
}
}

public async stop() {
while (!this.stopped) {
await this.shift();
}
}

private async push(bucket: WatchableBucket) {
await bucket.watch();
this.buckets.push(bucket);
}

private async shift() {
const bucket = this.buckets.shift();
if (bucket) {
await bucket.stop();
}
}

private async addBucket(bucketPair: IBucketPair, callback: BucketWatchCallback) {
const bucket = await this.watchableBucketFactory.create(bucketPair, callback);
await this.push(bucket);
}

private get stopped() {
return this.buckets.length === 0;
}

private get currentBucket(): WatchableBucket {
return this.buckets[0];
}
}
105 changes: 8 additions & 97 deletions src/Scanner/ChainScanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import Config from '../Config';
import IRouter from '../Router';
import { IntervalId, Address } from '../Types';
import CacheScanner from './CacheScanner';
import { Bucket, IBuckets, BucketCalc, IBucketCalc } from '../Buckets';
import { BucketCalc, IBucketCalc } from '../Buckets';
import { ITxRequestRaw } from '../Types/ITxRequest';
import { TxStatus } from '../Enum';
import { Buckets } from './Buckets';
import { WatchableBucketFactory } from './WatchableBucketFactory';

export default class ChainScanner extends CacheScanner {
public bucketCalc: IBucketCalc;
Expand All @@ -13,115 +15,24 @@ export default class ChainScanner extends CacheScanner {
public eventWatchers: {} = {};
public requestFactory: Promise<any>;

private buckets: IBuckets = {
currentBuckets: {
blockBucket: -1,
timestampBucket: -1
},
nextBuckets: {
blockBucket: -1,
timestampBucket: -1
}
};
private buckets: Buckets;

constructor(config: Config, router: IRouter) {
super(config, router);
this.requestFactory = config.eac.requestFactory();
this.bucketCalc = new BucketCalc(config.util, this.requestFactory);
this.buckets = new Buckets(new WatchableBucketFactory(this.requestFactory, this.config.logger));

this.handleRequest = this.handleRequest.bind(this);
}

public async watchBlockchain(): Promise<void> {
const buckets = await this.bucketCalc.getBuckets();

if (this.buckets.nextBuckets.blockBucket === buckets.currentBuckets.blockBucket) {
await this.stopWatcher(this.buckets.currentBuckets.blockBucket);

// If we are only doing one bucket step up we only need to start one watcher.
this.buckets.currentBuckets.blockBucket = buckets.currentBuckets.blockBucket;
this.buckets.nextBuckets.blockBucket = await this.watchRequestsByBucket(
buckets.nextBuckets.blockBucket,
this.buckets.nextBuckets.blockBucket
);
} else {
// Start watching the current buckets right away.
this.buckets.currentBuckets.blockBucket = await this.watchRequestsByBucket(
buckets.currentBuckets.blockBucket,
this.buckets.currentBuckets.blockBucket
);
this.buckets.nextBuckets.blockBucket = await this.watchRequestsByBucket(
buckets.nextBuckets.blockBucket,
this.buckets.nextBuckets.blockBucket
);
}

if (this.buckets.nextBuckets.timestampBucket === buckets.currentBuckets.timestampBucket) {
await this.stopWatcher(this.buckets.currentBuckets.timestampBucket);

this.buckets.currentBuckets.timestampBucket = buckets.currentBuckets.timestampBucket;
this.buckets.nextBuckets.timestampBucket = await this.watchRequestsByBucket(
buckets.nextBuckets.timestampBucket,
this.buckets.nextBuckets.timestampBucket
);
} else {
this.buckets.currentBuckets.timestampBucket = await this.watchRequestsByBucket(
buckets.currentBuckets.timestampBucket,
this.buckets.currentBuckets.timestampBucket
);
this.buckets.nextBuckets.timestampBucket = await this.watchRequestsByBucket(
buckets.nextBuckets.timestampBucket,
this.buckets.nextBuckets.timestampBucket
);
}
}

public async watchRequestsByBucket(bucket: Bucket, previousBucket: Bucket): Promise<Bucket> {
if (bucket !== previousBucket) {
await this.stopWatcher(previousBucket);
return this.startWatcher(bucket);
}

return previousBucket;
const newBuckets = await this.bucketCalc.getBuckets();
await this.buckets.update(newBuckets, this.handleRequest);
}

protected async stopAllWatchers(): Promise<void> {
for (const type of Object.keys(this.buckets)) {
for (const key of Object.keys(this.buckets[type])) {
await this.stopWatcher(this.buckets[type][key]);
// Reset to default value when stopping TimeNode.
this.buckets[type][key] = -1;
}
}
}

protected async startWatcher(bucket: Bucket): Promise<Bucket> {
const reqFactory = await this.requestFactory;
try {
const watcher = await reqFactory.watchRequestsByBucket(bucket, this.handleRequest);
this.eventWatchers[bucket] = watcher;

this.config.logger.debug(`Buckets: Watcher for bucket=${bucket} has been started`);
} catch (err) {
this.config.logger.error(`Buckets: Starting bucket=${bucket} watching failed!`);
}

return bucket;
}

protected async stopWatcher(bucket: Bucket) {
try {
const watcher = this.eventWatchers[bucket];
if (watcher !== undefined) {
const reqFactory = await this.requestFactory;
await reqFactory.stopWatch(watcher);
delete this.eventWatchers[bucket];

this.config.logger.debug(`Buckets: Watcher for bucket=${bucket} has been stopped`);
}
} catch (err) {
this.config.logger.error(`Buckets: Stopping bucket=${bucket} watching failed!`);
}
return this.buckets.stop();
}

private handleRequest(request: ITxRequestRaw): void {
Expand Down
6 changes: 6 additions & 0 deletions src/Scanner/IBucketWatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { BucketWatchCallback } from './BucketWatchCallback';

export interface IBucketWatcher {
watchRequestsByBucket(bucket: number, callBack: BucketWatchCallback): Promise<any>;
stopWatch(watcher: any): Promise<void>;
}
70 changes: 70 additions & 0 deletions src/Scanner/WatchableBucket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { ILogger, DefaultLogger } from '../Logger';
import { IBucketPair, Bucket } from '../Buckets';
import { BucketWatchCallback } from './BucketWatchCallback';
import { IBucketWatcher } from './IBucketWatcher';

export class WatchableBucket {
private bucket: IBucketPair;
private requestFactory: IBucketWatcher;
private logger: ILogger;
private callBack: BucketWatchCallback;
private eventWatchers: {} = {};

constructor(
bucket: IBucketPair,
requestFactory: IBucketWatcher,
callBack: BucketWatchCallback,
logger: ILogger = new DefaultLogger()
) {
this.bucket = bucket;
this.requestFactory = requestFactory;
this.callBack = callBack;
this.logger = logger;
}

public async watch() {
await this.stop();

await this.startWatcher(this.bucket.blockBucket);
await this.startWatcher(this.bucket.timestampBucket);
}

public async stop() {
await this.stopWatcher(this.bucket.blockBucket);
await this.stopWatcher(this.bucket.timestampBucket);
}

public equals(newBucket: IBucketPair): boolean {
return (
newBucket.blockBucket === this.bucket.blockBucket &&
newBucket.timestampBucket === this.bucket.timestampBucket
);
}

private async startWatcher(bucket: Bucket): Promise<number> {
try {
const watcher = await this.requestFactory.watchRequestsByBucket(bucket, this.callBack);
this.eventWatchers[bucket] = watcher;

this.logger.debug(`Buckets: Watcher for bucket=${bucket} has been started`);
} catch (err) {
this.logger.error(`Buckets: Starting bucket=${bucket} watching failed!`);
}

return bucket;
}

private async stopWatcher(bucket: Bucket) {
try {
const watcher = this.eventWatchers[bucket];
if (watcher !== undefined) {
await this.requestFactory.stopWatch(watcher);
delete this.eventWatchers[bucket];

this.logger.debug(`Buckets: Watcher for bucket=${bucket} has been stopped`);
}
} catch (err) {
this.logger.error(`Buckets: Stopping bucket=${bucket} watching failed!`);
}
}
}
22 changes: 22 additions & 0 deletions src/Scanner/WatchableBucketFactory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { WatchableBucket } from './WatchableBucket';
import { IBucketWatcher } from './IBucketWatcher';
import { IBucketPair } from '../Buckets';
import { BucketWatchCallback } from './BucketWatchCallback';
import { ILogger } from '../Logger';

export class WatchableBucketFactory {
private requestFactory: Promise<IBucketWatcher>;
private logger: ILogger;

constructor(requestFactory: Promise<IBucketWatcher>, logger: ILogger) {
this.requestFactory = requestFactory;
this.logger = logger;
}

public async create(
bucketPair: IBucketPair,
callback: BucketWatchCallback
): Promise<WatchableBucket> {
return new WatchableBucket(bucketPair, await this.requestFactory, callback, this.logger);
}
}
26 changes: 20 additions & 6 deletions test/unit/UnitTestBucketCalc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { mockConfig } from '../helpers';

describe('ButcketCalc', () => {
describe('getBuckets()', async () => {
it('returns current and next buckets', async () => {
it('returns current, next and after next buckets', async () => {
const defaultBlock: IBlock = { number: 10000, timestamp: 10000000000 };
const util = TypeMoq.Mock.ofType<W3Util>();
util.setup(u => u.getBlock('latest')).returns(async () => defaultBlock);
Expand All @@ -18,12 +18,14 @@ describe('ButcketCalc', () => {

const bucketCalc = new BucketCalc(util.object, requestFactory);

const { currentBuckets, nextBuckets } = await bucketCalc.getBuckets();
const { currentBuckets, nextBuckets, afterNextBuckets } = await bucketCalc.getBuckets();

expect(currentBuckets).to.haveOwnProperty('blockBucket');
expect(currentBuckets).to.haveOwnProperty('timestampBucket');
expect(nextBuckets).to.haveOwnProperty('blockBucket');
expect(nextBuckets).to.haveOwnProperty('timestampBucket');
expect(afterNextBuckets).to.haveOwnProperty('blockBucket');
expect(afterNextBuckets).to.haveOwnProperty('timestampBucket');

assert.equal(
currentBuckets.blockBucket,
Expand All @@ -34,15 +36,27 @@ describe('ButcketCalc', () => {
defaultBlock.timestamp - (defaultBlock.timestamp % BucketSize.timestamp)
);

const expectedBlockInterval = defaultBlock.number + BucketSize.block;
const expectedTimestampInterval = defaultBlock.timestamp + BucketSize.timestamp;
const expectedNextBlockInterval = defaultBlock.number + BucketSize.block;
const expectedNextTimestampInterval = defaultBlock.timestamp + BucketSize.timestamp;
assert.equal(
nextBuckets.blockBucket,
-1 * (expectedBlockInterval - (expectedBlockInterval % BucketSize.block))
-1 * (expectedNextBlockInterval - (expectedNextBlockInterval % BucketSize.block))
);
assert.equal(
nextBuckets.timestampBucket,
expectedTimestampInterval - (expectedTimestampInterval % BucketSize.timestamp)
expectedNextTimestampInterval - (expectedNextTimestampInterval % BucketSize.timestamp)
);

const expectedAfterNextBlockInterval = defaultBlock.number + 2 * BucketSize.block;
const expectedAfterNextTimestampInterval = defaultBlock.timestamp + 2 * BucketSize.timestamp;
assert.equal(
afterNextBuckets.blockBucket,
-1 * (expectedAfterNextBlockInterval - (expectedAfterNextBlockInterval % BucketSize.block))
);
assert.equal(
afterNextBuckets.timestampBucket,
expectedAfterNextTimestampInterval -
(expectedAfterNextTimestampInterval % BucketSize.timestamp)
);
});
});
Expand Down

0 comments on commit 6c33fc3

Please sign in to comment.