Skip to content

Commit

Permalink
Do not replay block events when the provider event block is reset (#343
Browse files Browse the repository at this point in the history
…).
  • Loading branch information
ricmoo committed Nov 12, 2018
1 parent 09b698b commit 93152ef
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 29 deletions.
99 changes: 70 additions & 29 deletions src.ts/providers/base-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,21 @@ export class BaseProvider extends Provider {
private _network: Network;

private _events: Array<_Event>;
protected _emitted: any;

// To help mitigate the eventually conssitent nature of the blockchain
// we keep a mapping of events we emit. If we emit an event X, we expect
// that a user should be able to query for that event in the callback,
// if the node returns null, we stall the response until we get back a
// meaningful value, since we may be hitting a re-org, or a node that
// has not indexed the event yet.
// Events:
// - t:{hash} - Transaction hash
// - b:{hash} - BlockHash
// - block - The most recent emitted block
protected _emitted: { [ eventName: string ]: number | 'pending' };

private _pollingInterval: number;
private _poller: any; // @TODO: what does TypeScript thing setInterval returns?
private _poller: any; // @TODO: what does TypeScript think setInterval returns?

private _lastBlockNumber: number;

Expand Down Expand Up @@ -532,11 +543,7 @@ export class BaseProvider extends Provider {

this._pollingInterval = 4000;

// We use this to track recent emitted events; for example, if we emit a "block" of 100
// and we get a `getBlock(100)` request which would result in null, we should retry
// until we get a response. This provides devs with a consistent view. Similarly for
// transaction hashes.
this._emitted = { block: this._lastBlockNumber };
this._emitted = { block: -2 };

this._fastQueryDate = 0;
}
Expand All @@ -548,24 +555,43 @@ export class BaseProvider extends Provider {
// If the block hasn't changed, meh.
if (blockNumber === this._lastBlockNumber) { return; }

if (this._lastBlockNumber === -2) { this._lastBlockNumber = blockNumber - 1; }
// First polling cycle, trigger a "block" events
if (this._emitted.block === -2) {
this._emitted.block = blockNumber - 1;
}

// Notify all listener for each block that has passed
for (let i = this._lastBlockNumber + 1; i <= blockNumber; i++) {
if (this._emitted.block < i) {
this._emitted.block = i;
for (let i = (<number>this._emitted.block) + 1; i <= blockNumber; i++) {
this.emit('block', i);
}

// The emitted block was updated, check for obsolete events
if ((<number>this._emitted.block) !== blockNumber) {
this._emitted.block = blockNumber;

Object.keys(this._emitted).forEach((key) => {
// The block event does not expire
if (key === 'block') { return; }

// The block we were at when we emitted this event
let eventBlockNumber = this._emitted[key];

// We cannot garbage collect pending transactions or blocks here
// They should be garbage collected by the Provider when setting
// "pending" events
if (eventBlockNumber === 'pending') { return; }

// Evict any transaction hashes or block hashes over 12 blocks
// old, since they should not return null anyways
Object.keys(this._emitted).forEach((key) => {
if (key === 'block') { return; }
if (blockNumber - eventBlockNumber > 12) {
delete this._emitted[key];
}
});
}

if (this._emitted[key] > i + 12) {
delete this._emitted[key];
}
});
}
this.emit('block', i);
// First polling cycle
if (this._lastBlockNumber === -2) {
this._lastBlockNumber = blockNumber - 1;
}

// Sweep balances and remove addresses we no longer have events for
Expand All @@ -583,6 +609,7 @@ export class BaseProvider extends Provider {
this.emit(hash, receipt);
return null;
}).catch((error: Error) => { this.emit('error', error); });

break;
}

Expand All @@ -591,13 +618,15 @@ export class BaseProvider extends Provider {
if (this._balances[address]) {
newBalances[address] = this._balances[address];
}
this.getBalance(address, 'latest').then(function(balance) {

this.getBalance(address, 'latest').then((balance) => {
let lastBalance = this._balances[address];
if (lastBalance && balance.eq(lastBalance)) { return; }
this._balances[address] = balance;
this.emit(address, balance);
return null;
}).catch((error: Error) => { this.emit('error', error); });

break;
}

Expand Down Expand Up @@ -630,12 +659,13 @@ export class BaseProvider extends Provider {

return null;
}).catch((error: Error) => { });

this.doPoll();
}

resetEventsBlock(blockNumber: number): void {
this._lastBlockNumber = blockNumber;
this._doPoll();
this._lastBlockNumber = blockNumber - 1;
if (this.polling) { this._doPoll(); }
}

get network(): Network {
Expand All @@ -647,8 +677,7 @@ export class BaseProvider extends Provider {
}

get blockNumber(): number {
if (this._lastBlockNumber < 0) { return null; }
return this._lastBlockNumber;
return this._fastBlockNumber;
}

get polling(): boolean {
Expand Down Expand Up @@ -719,10 +748,12 @@ export class BaseProvider extends Provider {
// this will be used once we move to the WebSocket or other alternatives to polling

waitForTransaction(transactionHash: string, confirmations?: number): Promise<TransactionReceipt> {
if (!confirmations) { confirmations = 1; }
if (confirmations == null) { confirmations = 1; }
return poll(() => {
return this.getTransactionReceipt(transactionHash).then((receipt) => {
if (receipt == null || receipt.confirmations < confirmations) { return undefined; }
if (receipt == null && confirmations !== 0) {
if (receipt.confirmations < confirmations) { return undefined; }
}
return receipt;
});
}, { onceBlock: this });
Expand Down Expand Up @@ -832,11 +863,21 @@ export class BaseProvider extends Provider {
errors.throwError('Transaction hash mismatch from Provider.sendTransaction.', errors.UNKNOWN_ERROR, { expectedHash: tx.hash, returnedHash: hash });
}

this._emitted['t:' + tx.hash] = 'pending';

// @TODO: (confirmations? number, timeout? number)
result.wait = (confirmations?: number) => {

// We know this transaction *must* exist (whether it gets mined is
// another story), so setting an emitted value forces us to
// wait even if the node returns null for the receipt
if (confirmations !== 0) {
this._emitted['t:' + tx.hash] = 'pending';
}

return this.waitForTransaction(tx.hash, confirmations).then((receipt) => {

// No longer pending, allow the polling loop to garbage collect this
this._emitted['t:' + tx.hash] = receipt.blockNumber;

if (receipt.status === 0) {
errors.throwError('transaction failed', errors.CALL_EXCEPTION, {
transactionHash: tx.hash,
Expand Down Expand Up @@ -918,7 +959,7 @@ export class BaseProvider extends Provider {
return poll(() => {
return this.perform('getBlock', { blockTag: blockTag, includeTransactions: !!includeTransactions }).then((block) => {
if (block == null) {
if (blockNumber > this._emitted.block) {
if (blockNumber <= this._emitted.block) {
return undefined;
}
return null;
Expand Down
1 change: 1 addition & 0 deletions src.ts/providers/json-rpc-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ export class JsonRpcProvider extends BaseProvider {

var seq = Promise.resolve();
hashes.forEach(function(hash) {
// @TODO: This should be garbage collected at some point... How? When?
self._emitted['t:' + hash.toLowerCase()] = 'pending';
seq = seq.then(function() {
return self.getTransaction(hash).then(function(tx) {
Expand Down

0 comments on commit 93152ef

Please sign in to comment.