Skip to content

Commit

Permalink
Merge 078aab7 into d814da1
Browse files Browse the repository at this point in the history
  • Loading branch information
kosecki123 committed Oct 19, 2018
2 parents d814da1 + 078aab7 commit 362a0c9
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 395 deletions.
48 changes: 25 additions & 23 deletions src/Buckets/BucketCalc.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { IBlock } from '../Types';
import { IBucketPair, IBuckets, BucketSize } from '.';
import { BucketSize } from '.';
import W3Util from '../Util';
import { Bucket } from './IBucketPair';

export interface IBucketCalc {
getBuckets(): Promise<IBuckets>;
getBuckets(): Promise<Bucket[]>;
}

export class BucketCalc {
Expand All @@ -15,39 +16,40 @@ export class BucketCalc {
this.requestFactory = requestFactory;
}

public async getBuckets(): Promise<IBuckets> {
public async getBuckets(): Promise<Bucket[]> {
const latest: IBlock = await this.util.getBlock('latest');
return {
currentBuckets: await this.getCurrentBuckets(latest),
nextBuckets: await this.getNextBuckets(latest),
afterNextBuckets: await this.getAfterNextBuckets(latest)
};

const currentBuckets = await this.getCurrentBuckets(latest);
const nextBuckets = await this.getNextBuckets(latest);
const afterNextBuckets = await this.getAfterNextBuckets(latest);

return currentBuckets.concat(nextBuckets).concat(afterNextBuckets);
}

private async getCurrentBuckets(latest: IBlock): Promise<IBucketPair> {
return {
blockBucket: (await this.requestFactory).calcBucket(latest.number, 1),
timestampBucket: (await this.requestFactory).calcBucket(latest.timestamp, 2)
};
private async getCurrentBuckets(latest: IBlock): Promise<Bucket[]> {
return [
(await this.requestFactory).calcBucket(latest.number, 1),
(await this.requestFactory).calcBucket(latest.timestamp, 2)
];
}

private async getNextBuckets(latest: IBlock): Promise<IBucketPair> {
private async getNextBuckets(latest: IBlock): Promise<Bucket[]> {
const nextBlockInterval = latest.number + BucketSize.block;
const nextTsInterval = latest.timestamp + BucketSize.timestamp;

return {
blockBucket: (await this.requestFactory).calcBucket(nextBlockInterval, 1),
timestampBucket: (await this.requestFactory).calcBucket(nextTsInterval, 2)
};
return [
(await this.requestFactory).calcBucket(nextBlockInterval, 1),
(await this.requestFactory).calcBucket(nextTsInterval, 2)
];
}

private async getAfterNextBuckets(latest: IBlock): Promise<IBucketPair> {
private async getAfterNextBuckets(latest: IBlock): Promise<Bucket[]> {
const nextBlockInterval = latest.number + 2 * BucketSize.block;
const nextTsInterval = latest.timestamp + 2 * BucketSize.timestamp;

return {
blockBucket: (await this.requestFactory).calcBucket(nextBlockInterval, 1),
timestampBucket: (await this.requestFactory).calcBucket(nextTsInterval, 2)
};
return [
(await this.requestFactory).calcBucket(nextBlockInterval, 1),
(await this.requestFactory).calcBucket(nextTsInterval, 2)
];
}
}
7 changes: 0 additions & 7 deletions src/Buckets/IBuckets.ts

This file was deleted.

1 change: 0 additions & 1 deletion src/Buckets/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
export { Bucket, IBucketPair } from './IBucketPair';
export { IBuckets } from './IBuckets';
export { BucketSize } from './consts';
export { IBucketCalc, BucketCalc } from './BucketCalc';
5 changes: 3 additions & 2 deletions src/EconomicStrategy/EconomicStrategyManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,12 @@ export class EconomicStrategyManager {

private async tooShortClaimWindow(txRequest: ITxRequest): Promise<boolean> {
const { minClaimWindowBlock, minClaimWindow } = this.strategy;
const { claimWindowEnd, now, temporalUnit } = txRequest;
const { claimWindowEnd, temporalUnit } = txRequest;
const now = await txRequest.now();

const minWindow = temporalUnit === 1 ? minClaimWindowBlock : minClaimWindow;

return claimWindowEnd.sub(await now()).lt(minWindow);
return claimWindowEnd.sub(now).lt(minWindow);
}

private tooShortReserved(txRequest: ITxRequest): boolean {
Expand Down
2 changes: 1 addition & 1 deletion src/Router/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ export default class Router implements IRouter {
current = next;
}
} catch (err) {
this.logger.error(`Transition from ${current} failed: ${err}`);
this.logger.error(`Transition from ${TxStatus[current]} failed: ${err}`);
}

this.txRequestStates[txRequest.address] = current;
Expand Down
79 changes: 0 additions & 79 deletions src/Scanner/Buckets.ts

This file was deleted.

51 changes: 51 additions & 0 deletions src/Scanner/BucketsManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { WatchableBucket } from './WatchableBucket';
import { Bucket } from '../Buckets';
import { BucketWatchCallback } from './BucketWatchCallback';
import { WatchableBucketFactory } from './WatchableBucketFactory';
import { ILogger, DefaultLogger } from '../Logger';

export class BucketsManager {
private buckets: WatchableBucket[] = [];
private watchableBucketFactory: WatchableBucketFactory;
private logger: ILogger;

constructor(
watchableBucketFactory: WatchableBucketFactory,
logger: ILogger = new DefaultLogger()
) {
this.watchableBucketFactory = watchableBucketFactory;
this.logger = logger;
}

public async stop() {
await Promise.all(this.buckets.map(b => b.stop()));
this.buckets = [];
return;
}

public async update(buckets: Bucket[], callback: BucketWatchCallback) {
this.logger.debug(`Buckets: updating with ${buckets}`);

const toStart = await Promise.all(
buckets
.filter(b => !this.knownBucket(b))
.map(b => this.watchableBucketFactory.create(b, callback))
);
const toSkip = this.buckets.filter(b => buckets.indexOf(b.bucket) > -1);
const toStop = this.buckets.filter(b => buckets.indexOf(b.bucket) === -1);

const starting = toStart.map(b => b.watch());
const stopping = toStop.map(b => b.stop());

await Promise.all(starting);
await Promise.all(stopping);

this.buckets = toSkip.concat(toStart);

this.logger.debug(`Buckets: updated ${this.buckets.map(b => b.bucket)}`);
}

private knownBucket(bucket: Bucket): boolean {
return this.buckets.some(b => b.bucket === bucket);
}
}
10 changes: 5 additions & 5 deletions src/Scanner/ChainScanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import CacheScanner from './CacheScanner';
import { BucketCalc, IBucketCalc } from '../Buckets';
import { ITxRequestRaw } from '../Types/ITxRequest';
import { TxStatus } from '../Enum';
import { Buckets } from './Buckets';
import { BucketsManager } from './BucketsManager';
import { WatchableBucketFactory } from './WatchableBucketFactory';

export default class ChainScanner extends CacheScanner {
Expand All @@ -15,13 +15,13 @@ export default class ChainScanner extends CacheScanner {
public eventWatchers: {} = {};
public requestFactory: Promise<any>;

private buckets: Buckets;
private bucketsManager: BucketsManager;

constructor(config: Config, router: IRouter) {
super(config, router);
this.requestFactory = config.eac.requestFactory();
this.bucketCalc = new BucketCalc(config.util, this.requestFactory);
this.buckets = new Buckets(
this.bucketsManager = new BucketsManager(
new WatchableBucketFactory(this.requestFactory, this.config.logger),
this.config.logger
);
Expand All @@ -31,11 +31,11 @@ export default class ChainScanner extends CacheScanner {

public async watchBlockchain(): Promise<void> {
const newBuckets = await this.bucketCalc.getBuckets();
await this.buckets.update(newBuckets, this.handleRequest);
return this.bucketsManager.update(newBuckets, this.handleRequest);
}

protected async stopAllWatchers(): Promise<void> {
return this.buckets.stop();
return this.bucketsManager.stop();
}

private handleRequest(request: ITxRequestRaw): void {
Expand Down
64 changes: 27 additions & 37 deletions src/Scanner/WatchableBucket.ts
Original file line number Diff line number Diff line change
@@ -1,73 +1,63 @@
import { ILogger, DefaultLogger } from '../Logger';
import { IBucketPair, Bucket } from '../Buckets';
import { Bucket } from '../Buckets';
import { BucketWatchCallback } from './BucketWatchCallback';
import { IBucketWatcher } from './IBucketWatcher';

export class WatchableBucket {
private bucket: IBucketPair;
private bucketNumber: Bucket;
private watcher: any;
private requestFactory: IBucketWatcher;
private logger: ILogger;
private callBack: BucketWatchCallback;
private eventWatchers: {} = {};

constructor(
bucket: IBucketPair,
bucket: Bucket,
requestFactory: IBucketWatcher,
callBack: BucketWatchCallback,
logger: ILogger = new DefaultLogger()
) {
this.bucket = bucket;
this.bucketNumber = bucket;
this.requestFactory = requestFactory;
this.callBack = callBack;
this.logger = logger;
}

public async watch() {
await this.stop();

await this.startWatcher(this.bucket.blockBucket);
await this.startWatcher(this.bucket.timestampBucket);
if (this.watcher) {
this.logger.debug(`WatchableBucket: Bucket ${this.bucketNumber} already watched.`);
return;
}
await this.start();
}

public async stop() {
await this.stopWatcher(this.bucket.blockBucket);
await this.stopWatcher(this.bucket.timestampBucket);
}

public equals(newBucket: IBucketPair): boolean {
this.logger.debug(
`Buckets: comparing ${JSON.stringify(this.bucket)} to ${JSON.stringify(newBucket)}`
);
return (
newBucket.blockBucket === this.bucket.blockBucket &&
newBucket.timestampBucket === this.bucket.timestampBucket
);
}

private async startWatcher(bucket: Bucket): Promise<number> {
try {
const watcher = await this.requestFactory.watchRequestsByBucket(bucket, this.callBack);
this.eventWatchers[bucket] = watcher;
if (this.watcher) {
await this.requestFactory.stopWatch(this.watcher);
this.watcher = null;

this.logger.debug(`Buckets: Watcher for bucket=${bucket} has been started`);
this.logger.debug(`Buckets: Watcher for bucket=${this.bucketNumber} has been stopped`);
}
} catch (err) {
this.logger.error(`Buckets: Starting bucket=${bucket} watching failed!`);
this.logger.error(`Buckets: Stopping bucket=${this.bucketNumber} watching failed!`);
}
}

return bucket;
public get bucket(): Bucket {
return this.bucketNumber;
}

private async stopWatcher(bucket: Bucket) {
private async start() {
try {
const watcher = this.eventWatchers[bucket];
if (watcher !== undefined) {
await this.requestFactory.stopWatch(watcher);
delete this.eventWatchers[bucket];
const watcher = await this.requestFactory.watchRequestsByBucket(
this.bucketNumber,
this.callBack
);
this.watcher = watcher;

this.logger.debug(`Buckets: Watcher for bucket=${bucket} has been stopped`);
}
this.logger.debug(`Buckets: Watcher for bucket=${this.bucketNumber} has been started`);
} catch (err) {
this.logger.error(`Buckets: Stopping bucket=${bucket} watching failed!`);
this.logger.error(`Buckets: Starting bucket=${this.bucketNumber} watching failed!`);
}
}
}

0 comments on commit 362a0c9

Please sign in to comment.