Skip to content

Commit

Permalink
Merge a5f6300 into ce93403
Browse files Browse the repository at this point in the history
  • Loading branch information
adibas03 committed Aug 21, 2018
2 parents ce93403 + a5f6300 commit e50a419
Show file tree
Hide file tree
Showing 13 changed files with 489 additions and 41 deletions.
3 changes: 2 additions & 1 deletion src/Actions/Actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export default class Actions implements IActions {
if (!this.config.wallet.isAccountAbleToSendTx(nextAccount)) {
return ClaimStatus.ACCOUNT_BUSY;
}
if (await hasPending(this.config, txRequest, { type: 'claim' })) {
if (await hasPending(this.config, txRequest, { type: 'claim', checkGasPrice: true })) {
return ClaimStatus.PENDING;
}

Expand Down Expand Up @@ -144,6 +144,7 @@ export default class Actions implements IActions {
private async hasPendingExecuteTransaction(txRequest: ITxRequest): Promise<boolean> {
return hasPending(this.config, txRequest, {
type: 'execute',
checkGasPrice: true,
minPrice: txRequest.gasPrice
});
}
Expand Down
87 changes: 60 additions & 27 deletions src/Actions/Pending.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
import { FnSignatures } from '../Enum';
import TxPool, { ITxPoolTxDetails } from '../TxPool';
import Config from '../Config';
import BigNumber from 'bignumber.js';
import { FnSignatures } from '../Enum';
import { ITxRequestPending } from '../Types/ITxRequest';
import { ITxRequest } from '../Types';

interface PendingOpts {
type?: string;
checkGasPrice?: boolean;
checkGasPrice: boolean;
minPrice?: BigNumber;
}

/**
* Uses the Parity specific RPC request `parity_pendingTransactions` to search
* Uses the locally maintained TxPool to check
* for pending transactions in the transaction pool.
* @param {Config} conf Config object.
* @param {TransactionRequest} txRequest
* @param {string} type (optional) Type of pending request: claim,execute.
* @param {boolean} checkGasPrice (optional, default: true) Check if transaction's gasPrice is sufficient for Network.
* @param {number} minPrice (optional) Expected gasPrice.
* @returns {Promise<boolean>} True if a pending transaction to this address exists.
*/
const hasPendingParity = (
conf: any,
conf: Config,
txRequest: ITxRequestPending,
opts: PendingOpts
): Promise<boolean> => {
opts.checkGasPrice = opts.checkGasPrice === undefined ? true : opts.checkGasPrice;
const provider = conf.web3.currentProvider;

return new Promise(async (resolve, reject) => {
Expand All @@ -42,12 +45,13 @@ const hasPendingParity = (
return;
}

const currentGasPrice: BigNumber = await conf.util.networkGasPrice();
for (const count of Object.keys(res.result)) {
if (res.result[count].to === txRequest.address) {
const withValidGasPrice =
res.result[count] &&
(!opts.checkGasPrice ||
(await hasValidGasPrice(conf, res.result[count], opts.minPrice)));
(await hasValidGasPrice(currentGasPrice, res.result[count], opts.minPrice)));
if (
res.result[count] &&
isOfType(res.result[count], opts.type) &&
Expand All @@ -70,18 +74,18 @@ const hasPendingParity = (
/**
* Uses the Geth specific RPC request `txpool_content` to search
* for pending transactions in the transaction pool.
* @param {Config} conf Config object.
* @param {TransactionRequest} txRequest
* @param {string} type (optional) Type of pending request: claim,execute.
* @param {boolean} checkGasPrice (optional, default: true) Check if transaction's gasPrice is sufficient for Network.
* @param {number} minPrice (optional) Expected gasPrice.
* @returns {Promise<object>} Transaction, if a pending transaction to this address exists.
*/
const hasPendingGeth = (
conf: any,
conf: Config,
txRequest: ITxRequestPending,
opts: PendingOpts
): Promise<boolean> => {
opts.checkGasPrice = opts.checkGasPrice === undefined ? true : opts.checkGasPrice;
const provider = conf.web3.currentProvider;

return new Promise((resolve, reject) => {
Expand All @@ -101,14 +105,15 @@ const hasPendingGeth = (
return;
}

const currentGasPrice: BigNumber = await conf.util.networkGasPrice();
for (const account of Object.keys(res.result.pending)) {
for (const nonce in res.result.pending[account]) {
if (res.result.pending[account][nonce].to === txRequest.address) {
const withValidGasPrice =
res.result.pending[account][nonce] &&
(!opts.checkGasPrice ||
(await hasValidGasPrice(
conf,
currentGasPrice,
res.result.pending[account][nonce],
opts.minPrice
)));
Expand All @@ -133,30 +138,55 @@ const hasPendingGeth = (
};

/**
* Uses the Geth specific RPC request `txpool_content` to search
* Uses the locally maintained TxPool to check
* for pending transactions in the transaction pool.
* @param {Config} conf Config object.
* @param {TransactionRequest} txRequest
* @param {string} type (optional) Type of pending request: claim,execute.
* @param {boolean} checkGasPrice (optional, default: true) Check if transaction's gasPrice is sufficient for Network.
* @param {number} minPrice (optional) Expected gasPrice.
* @returns {Promise<object>} Transaction, if a pending transaction to this address exists.
*/
const hasPendingPool = async (
conf: Config,
txRequest: ITxRequestPending,
opts: PendingOpts
): Promise<boolean> => {
let validPending: (boolean | ITxPoolTxDetails)[] = [];

try{
const currentGasPrice: BigNumber = await conf.util.networkGasPrice();
validPending = await conf.txPool.pool.get(txRequest.address, 'to')
.filter( async(tx: ITxPoolTxDetails) => {
const withValidGasPrice =
(!opts.checkGasPrice ||
(hasValidGasPrice(
currentGasPrice,
tx,
opts.minPrice
)));
return isOfType(tx, opts.type) && withValidGasPrice
})
} catch (e) {
conf.logger.info(e);
}
return validPending.length > 0;

};

/**
* Checks that pending transactions in the transaction pool have valid gasPrices.
* @param {Config} conf Config object.
* @param {TransactionReceipt} transaction Ethereum transaction receipt
* @param {number} minPrice (optional) Expected gasPrice.
* @returns {Promise<boolean>} Transaction, if a pending transaction to this address exists.
*/
const hasValidGasPrice = async (conf: any, transaction: any, minPrice?: any) => {
const hasValidGasPrice = (networkPrice: BigNumber, transaction: ITxPoolTxDetails, minPrice?: BigNumber) => {
if (minPrice) {
return minPrice.valueOf() === transaction.gasPrice.valueOf();
}
const spread = 0.3;
let currentGasPrice: number;
await new Promise((resolve, reject) => {
conf.web3.eth.getGasPrice((err: Error, res: any) => {
if (err) {
conf.logger.error(err);
return;
}
currentGasPrice = res;
resolve(true);
});
});
return currentGasPrice && spread * currentGasPrice.valueOf() <= transaction.gasPrice.valueOf();
return networkPrice && networkPrice.times(spread).valueOf() <= transaction.gasPrice.valueOf();
};

/**
Expand All @@ -166,7 +196,7 @@ const hasValidGasPrice = async (conf: any, transaction: any, minPrice?: any) =>
* @param {string} type Type of pending request: claim,execute.
* @returns {Promise<boolean>} True if a pending transaction to this address exists.
*/
const isOfType = (transaction: any, type?: string) => {
const isOfType = (transaction: ITxPoolTxDetails, type?: string) => {
if (transaction && !type) {
return true;
}
Expand All @@ -181,15 +211,18 @@ const isOfType = (transaction: any, type?: string) => {
* @param {string} type (optional) Type of pending request: claim,execute.
* @param {boolean} checkGasPrice (optional, default: true) Check if transaction's gasPrice is sufficient for Network.
* @param {number} minPrice (optional) Expected gasPrice to compare.
* @returns {Promise<boolean>} True if a pending transaction to this address exists.
*/
const hasPending = async (
conf: any,
conf: Config,
txRequest: ITxRequestPending,
opts: PendingOpts
): Promise<boolean> => {
let result = false;
let result: boolean = false;
if (conf.txPool && conf.txPool.running()) {

if (conf.client === 'parity') {
result = await hasPendingPool(conf, txRequest, opts)
} else if (conf.client === 'parity') {
result = await hasPendingParity(conf, txRequest, opts);
} else if (conf.client === 'geth') {
result = await hasPendingGeth(conf, txRequest, opts);
Expand Down
1 change: 1 addition & 0 deletions src/Cache/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export { default } from './Cache';
export { ICachedTxDetails } from './Cache';
3 changes: 3 additions & 0 deletions src/Config/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { IConfigParams } from './IConfigParams';
import { IEconomicStrategy } from '../EconomicStrategy';
import { ILogger, DefaultLogger } from '../Logger';
import { StatsDB } from '../Stats';
import TxPool from '../TxPool';
import W3Util from '../Util';
import { ICachedTxDetails } from '../Cache/Cache';
import { getWeb3FromProviderUrl } from './helpers';
Expand All @@ -31,6 +32,7 @@ export default class Config implements IConfigParams {
public providerUrl: string;
public scanSpread: any;
public statsDb: StatsDB;
public txPool: TxPool;
public util: W3Util;
public wallet: Wallet;
public web3: any;
Expand All @@ -57,6 +59,7 @@ export default class Config implements IConfigParams {
this.scanSpread = params.scanSpread || 50;
this.walletStoresAsPrivateKeys = params.walletStoresAsPrivateKeys || false;
this.logger = params.logger || new DefaultLogger();
this.txPool = new TxPool(this);

if (!params.disableDetection) {
this.getConnectedClient();
Expand Down
4 changes: 2 additions & 2 deletions src/Config/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ import * as Web3 from 'web3';
import * as Web3WsProvider from 'web3-providers-ws';

const getWeb3FromProviderUrl = (providerUrl: string) => {
let provider;
let provider: any;

if (providerUrl.includes('http://') || providerUrl.includes('https://')) {
provider = new Web3.providers.HttpProvider(providerUrl);
} else if (providerUrl.includes('ws://') || providerUrl.includes('wss://')) {
provider = new Web3WsProvider(providerUrl);
provider.__proto__.sendAsync = provider.__proto__.send;
provider.__proto__.sendAsync = provider.__proto__.sendAsync || provider.__proto__.send;
}

return new Web3(provider);
Expand Down
6 changes: 4 additions & 2 deletions src/Router/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ export default interface IRouter {
export default class Router implements IRouter {
public actions: IActions;
public config: Config;
public util: W3Util;
public txRequestStates: object = {};

public transitions: object = {};

constructor(config: Config, actions: IActions) {
this.actions = actions;
this.config = config;
this.util = config.util;

this.transitions[TxStatus.BeforeClaimWindow] = this.beforeClaimWindow.bind(this);
this.transitions[TxStatus.ClaimWindow] = this.claimWindow.bind(this);
Expand All @@ -36,6 +34,10 @@ export default class Router implements IRouter {
};
}

public get util (): W3Util {
return this.config.util;
}

public async beforeClaimWindow(txRequest: ITxRequest): Promise<TxStatus> {
if (txRequest.isCancelled) {
// TODO Status.CleanUp?
Expand Down
8 changes: 7 additions & 1 deletion src/Scanner/Scanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { Bucket, IBucketPair, IBuckets, BucketCalc, BucketSize, IBucketCalc } fr
import W3Util from '../Util';
import { CacheStates } from '../Enum';
import { ITxRequestRaw } from '../Types/ITxRequest';
import TxPool from '../TxPool';
import IRouter from '../Router';

export default class {
Expand All @@ -17,6 +18,7 @@ export default class {
public scanning: boolean;
public router: IRouter;
public requestFactory: Promise<any>;
public txPool: TxPool;

// Child Scanners, tracked by the ID of their interval
public cacheScanner: IntervalId;
Expand Down Expand Up @@ -48,6 +50,7 @@ export default class {
this.util = config.util;
this.scanning = false;
this.router = router;
this.txPool = config.txPool;
this.requestFactory = config.eac.requestFactory();
this.bucketCalc = new BucketCalc(config, this.requestFactory);
}
Expand All @@ -73,7 +76,7 @@ export default class {
if (!(await this.util.isWatchingEnabled())) {
throw new Error('We are currently supporting nodes with filtering capabilities');
}

await this.txPool.start();
this.chainScanner = await this.runAndSetInterval(() => this.watchBlockchain(), 5 * 60 * 1000);
this.cacheScanner = await this.runAndSetInterval(() => this.scanCache(), this.config.ms);

Expand All @@ -89,6 +92,9 @@ export default class {
clearInterval(this.cacheScanner);
clearInterval(this.chainScanner);

this.txPool.stop();


// Mark that we've stopped.
this.config.logger.info('Scanner STOPPED');
this.scanning = false;
Expand Down
63 changes: 63 additions & 0 deletions src/TxPool/Pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { ILogger } from '../Logger';
import BigNumber from 'bignumber.js';

interface IPool {
[id: string]: boolean | ITxPoolTxDetails;
}

export interface ITxPoolTxDetails {
to: string;
from: string;
input: string;
gasPrice: BigNumber;
timestamp: number;
transactionHash: string;
}

export class Pool {
public pool: IPool = {};

public preSet(key: string): boolean {
if (this.pool[key]) {
return false;
}
return this.pool[key] = true;
}

public set(key: string, value: ITxPoolTxDetails) {
this.pool[key] = value;
}

public get(key: string, field: string): (boolean | ITxPoolTxDetails)[] {
return this.stored().filter((p: string) =>
(field === 'transactionHash' && p === key) || this.pool[p][field] === key)
.map((found: string) => this.pool[found])
}

public has(key: string, field: string): boolean {
return this.get(key, field).length > 0;
}

public del(key: string) {
delete this.pool[key];
}

public wipe() {
this.pool = {};
}

public length(): number {
return this.stored().length;
}

public stored(): string[] {
return Object.keys(this.pool);
}

public isEmpty() {
return this.length() === 0;
}
}

// Local copy of the transaction pool known to the node
// This is maintained while TimeNode is running

0 comments on commit e50a419

Please sign in to comment.