diff --git a/packages/core-api/src/controllers/transactions.ts b/packages/core-api/src/controllers/transactions.ts index 09661bd152..1578ccab2a 100644 --- a/packages/core-api/src/controllers/transactions.ts +++ b/packages/core-api/src/controllers/transactions.ts @@ -70,18 +70,12 @@ export class TransactionsController extends Controller { public async unconfirmed(request: Hapi.Request, h: Hapi.ResponseToolkit) { const pagination = super.paginate(request); - - const data = (await this.transactionPool.getTransactions(pagination.offset, pagination.limit)).map( - transaction => ({ - serialized: transaction.toString("hex"), - }), - ); + const transactions = await this.transactionPool.getTransactions(pagination.offset, pagination.limit); + const poolSize = await this.transactionPool.getPoolSize(); + const data = transactions.map(t => ({ serialized: t.serialized.toString("hex") })); return super.toPagination( - { - count: await this.transactionPool.getPoolSize(), - rows: data, - }, + { count: poolSize, rows: data }, TransactionResource, (request.query.transform as unknown) as boolean, ); diff --git a/packages/core-forger/src/forger-service.ts b/packages/core-forger/src/forger-service.ts index 38bf4bd4b7..803797b13d 100644 --- a/packages/core-forger/src/forger-service.ts +++ b/packages/core-forger/src/forger-service.ts @@ -251,24 +251,18 @@ export class ForgerService { * @memberof ForgerService */ public async getTransactionsForForging(): Promise { - const response: Contracts.P2P.ForgingTransactions = await this.client.getTransactions(); - + const response = await this.client.getTransactions(); if (AppUtils.isEmpty(response)) { this.logger.error("Could not get unconfirmed transactions from transaction pool."); - return []; } - - const transactions: Interfaces.ITransactionData[] = response.transactions.map( - (hex: string) => Transactions.TransactionFactory.fromBytesUnsafe(Buffer.from(hex, "hex")).data, + const transactions = response.transactions.map( + hex => Transactions.TransactionFactory.fromBytesUnsafe(Buffer.from(hex, "hex")).data, ); - this.logger.debug( - `Received ${AppUtils.pluralize("transaction", transactions.length, true)} from the pool containing ${ - response.poolSize - }`, + `Received ${AppUtils.pluralize("transaction", transactions.length, true)} ` + + `from the pool containing ${response.poolSize}`, ); - return transactions; } diff --git a/packages/core-kernel/src/contracts/state/index.ts b/packages/core-kernel/src/contracts/state/index.ts index ebed42ac17..4ce2664013 100644 --- a/packages/core-kernel/src/contracts/state/index.ts +++ b/packages/core-kernel/src/contracts/state/index.ts @@ -2,3 +2,4 @@ export * from "./state-store"; export * from "./wallets"; export * from "./blocks"; export * from "./dpos"; +export * from "./transaction-validator"; diff --git a/packages/core-kernel/src/contracts/state/transaction-validator.ts b/packages/core-kernel/src/contracts/state/transaction-validator.ts new file mode 100644 index 0000000000..b37493ba5c --- /dev/null +++ b/packages/core-kernel/src/contracts/state/transaction-validator.ts @@ -0,0 +1,7 @@ +import { Interfaces } from "@arkecosystem/crypto"; + +export interface TransactionValidator { + validate(transaction: Interfaces.ITransaction): Promise; +} + +export type TransactionValidatorFactory = () => TransactionValidator; diff --git a/packages/core-kernel/src/contracts/transaction-pool/collator.ts b/packages/core-kernel/src/contracts/transaction-pool/collator.ts new file mode 100644 index 0000000000..34183a8fc6 --- /dev/null +++ b/packages/core-kernel/src/contracts/transaction-pool/collator.ts @@ -0,0 +1,5 @@ +import { Interfaces } from "@arkecosystem/crypto"; + +export interface Collator { + getBlockCandidateTransactions(): Promise; +} diff --git a/packages/core-kernel/src/contracts/transaction-pool/connection.ts b/packages/core-kernel/src/contracts/transaction-pool/connection.ts index 1252bef818..a00a47196c 100644 --- a/packages/core-kernel/src/contracts/transaction-pool/connection.ts +++ b/packages/core-kernel/src/contracts/transaction-pool/connection.ts @@ -22,10 +22,8 @@ export interface Connection { buildWallets(): Promise; replay(transactions: Interfaces.ITransaction[]): Promise; getTransaction(id: string): Promise; - getTransactionIdsForForging(start: number, size: number): Promise; - getTransactions(start: number, size: number, maxBytes?: number): Promise; + getTransactions(start: number, size: number): Promise; getTransactionsByType(type: number, typeGroup?: number): Promise>; - getTransactionsForForging(blockSize: number): Promise; has(transactionId: string): Promise; hasExceededMaxTransactions(senderPublicKey: string): Promise; senderHasTransactionsOfType(senderPublicKey: string, type: number, typeGroup?: number): Promise; diff --git a/packages/core-kernel/src/contracts/transaction-pool/index.ts b/packages/core-kernel/src/contracts/transaction-pool/index.ts index 5af15bb77d..47bc7afc80 100644 --- a/packages/core-kernel/src/contracts/transaction-pool/index.ts +++ b/packages/core-kernel/src/contracts/transaction-pool/index.ts @@ -1,2 +1,3 @@ export * from "./connection"; export * from "./processor"; +export * from "./collator"; diff --git a/packages/core-kernel/src/ioc/identifiers.ts b/packages/core-kernel/src/ioc/identifiers.ts index 99a2d3f0c2..5d27d2d3e4 100644 --- a/packages/core-kernel/src/ioc/identifiers.ts +++ b/packages/core-kernel/src/ioc/identifiers.ts @@ -62,6 +62,8 @@ export const Identifiers = { StateStore: Symbol.for("State"), StateTransactionStore: Symbol.for("State"), WalletFactory: Symbol.for("State"), + TransactionValidator: Symbol("State"), + TransactionValidatorFactory: Symbol("State"), // Derived states DposState: Symbol.for("State"), @@ -78,6 +80,8 @@ export const Identifiers = { TransactionPoolMemory: Symbol.for("TransactionPool"), TransactionPoolStorage: Symbol.for("TransactionPool"), TransactionPoolSynchronizer: Symbol.for("TransactionPool"), + TransactionPoolCollator: Symbol.for("TransactionPool"), + // Transactions - @todo: better names that won't clash WalletAttributes: Symbol.for("Wallet"), // TransactionHandler diff --git a/packages/core-p2p/src/socket-server/versions/internal.ts b/packages/core-p2p/src/socket-server/versions/internal.ts index e95dab9783..559b54990a 100644 --- a/packages/core-p2p/src/socket-server/versions/internal.ts +++ b/packages/core-p2p/src/socket-server/versions/internal.ts @@ -54,17 +54,14 @@ export const getUnconfirmedTransactions = async ({ }: { app: Contracts.Kernel.Application; }): Promise => { - const blockchain = app.get(Container.Identifiers.BlockchainService); - const { maxTransactions } = Managers.configManager.getMilestone(blockchain.getLastBlock().data.height).block; - + const collator = app.get(Container.Identifiers.TransactionPoolCollator); const transactionPool: Contracts.TransactionPool.Connection = app.get( Container.Identifiers.TransactionPoolService, ); + const transactions = await collator.getBlockCandidateTransactions(); + const poolSize = await transactionPool.getPoolSize(); - return { - transactions: await transactionPool.getTransactionsForForging(maxTransactions), - poolSize: await transactionPool.getPoolSize(), - }; + return { poolSize, transactions: transactions.map(t => t.serialized.toString("hex")) }; }; export const getCurrentRound = async ({ diff --git a/packages/core-state/src/service-provider.ts b/packages/core-state/src/service-provider.ts index e3af6a2ea6..e4ac1f3c9c 100644 --- a/packages/core-state/src/service-provider.ts +++ b/packages/core-state/src/service-provider.ts @@ -7,6 +7,7 @@ import { StateBuilder } from "./state-builder"; import { BlockStore } from "./stores/blocks"; import { StateStore } from "./stores/state"; import { TransactionStore } from "./stores/transactions"; +import { TransactionValidator } from "./transaction-validator"; import { TempWalletRepository, Wallet, WalletRepository } from "./wallets"; const dposPreviousRoundStateProvider = (context: Container.interfaces.Context) => { @@ -50,6 +51,12 @@ export class ServiceProvider extends Providers.ServiceProvider { this.app .bind(Container.Identifiers.DposPreviousRoundStateProvider) .toProvider(dposPreviousRoundStateProvider); + + this.app.bind(Container.Identifiers.TransactionValidator).to(TransactionValidator); + + this.app + .bind(Container.Identifiers.TransactionValidatorFactory) + .toAutoFactory(Container.Identifiers.TransactionValidator); } public async boot(): Promise { diff --git a/packages/core-state/src/transaction-validator.ts b/packages/core-state/src/transaction-validator.ts new file mode 100644 index 0000000000..2a4f9cb6ed --- /dev/null +++ b/packages/core-state/src/transaction-validator.ts @@ -0,0 +1,18 @@ +import { Container, Contracts } from "@arkecosystem/core-kernel"; +import { Handlers } from "@arkecosystem/core-transactions"; +import { Interfaces, Transactions } from "@arkecosystem/crypto"; +import { strictEqual } from "assert"; + +@Container.injectable() +export class TransactionValidator implements Contracts.State.TransactionValidator { + @Container.inject(Container.Identifiers.TransactionHandlerRegistry) + @Container.tagged("state", "temp") + private readonly handlerRegistry!: Handlers.Registry; + + public async validate(transaction: Interfaces.ITransaction): Promise { + const deserialized: Interfaces.ITransaction = Transactions.TransactionFactory.fromBytes(transaction.serialized); + strictEqual(transaction.id, deserialized.id); + const handler = await this.handlerRegistry.getActivatedHandlerForData(transaction.data); + await handler.apply(transaction); + } +} diff --git a/packages/core-transaction-pool/src/collator.ts b/packages/core-transaction-pool/src/collator.ts new file mode 100644 index 0000000000..8abc88f1ec --- /dev/null +++ b/packages/core-transaction-pool/src/collator.ts @@ -0,0 +1,56 @@ +import { Container, Contracts } from "@arkecosystem/core-kernel"; +import { Interfaces, Managers } from "@arkecosystem/crypto"; + +import { Connection } from "./connection"; +import { Memory } from "./memory"; + +@Container.injectable() +export class Collator implements Contracts.TransactionPool.Collator { + @Container.inject(Container.Identifiers.TransactionValidatorFactory) + private readonly createTransactionValidator!: Contracts.State.TransactionValidatorFactory; + + @Container.inject(Container.Identifiers.BlockchainService) + private readonly blockchain!: Contracts.Blockchain.Blockchain; + + @Container.inject(Container.Identifiers.TransactionPoolService) + private readonly pool!: Connection; + + @Container.inject(Container.Identifiers.TransactionPoolMemory) + private readonly memory!: Memory; + + @Container.inject(Container.Identifiers.LogService) + private readonly logger!: Contracts.Kernel.Logger; + + public async getBlockCandidateTransactions(): Promise { + let bytesLeft = this.pool.options.maxTransactionBytes ? this.pool.options.maxTransactionBytes : null; + + const height = this.blockchain.getLastBlock().data.height; + const milestone = Managers.configManager.getMilestone(height); + const transactions: Interfaces.ITransaction[] = []; + const validator = this.createTransactionValidator(); + + for (const transaction of this.memory.allSortedByFee().slice()) { + if (transactions.length === milestone.block.maxTransactions) { + break; + } + + try { + await validator.validate(transaction); + if (bytesLeft !== null) { + bytesLeft -= JSON.stringify(transaction.data).length; + if (bytesLeft < 0) { + break; + } + } + transactions.push(transaction); + } catch (error) { + this.pool.removeTransactionById(transaction.id!); + this.logger.error( + `[Pool] Removed ${transaction.id} before forging because it is no longer valid: ${error.message}`, + ); + } + } + + return transactions; + } +} diff --git a/packages/core-transaction-pool/src/connection.ts b/packages/core-transaction-pool/src/connection.ts index 60faf2bc4a..53b3d2e1aa 100644 --- a/packages/core-transaction-pool/src/connection.ts +++ b/packages/core-transaction-pool/src/connection.ts @@ -94,6 +94,14 @@ export class Connection implements Contracts.TransactionPool.Connection { @Container.inject(Container.Identifiers.TransactionPoolSynchronizer) private readonly synchronizer!: Synchronizer; + /** + * @private + * @type {Contracts.State.TransactionValidatorFactory} + * @memberof Connection + */ + @Container.inject(Container.Identifiers.TransactionValidatorFactory) + private readonly createTransactionValidator!: Contracts.State.TransactionValidatorFactory; + private readonly loggedAllowedSenders: string[] = []; /** @@ -240,7 +248,6 @@ export class Connection implements Contracts.TransactionPool.Connection { */ public async getTransaction(id: string): Promise { await this.cleaner.purgeExpired(); - return this.memory.getById(id); } @@ -251,41 +258,9 @@ export class Connection implements Contracts.TransactionPool.Connection { * @returns {Promise} * @memberof Connection */ - public async getTransactions(start: number, size: number, maxBytes?: number): Promise { - return (await this.getValidatedTransactions(start, size, maxBytes)).map( - (transaction: Interfaces.ITransaction) => transaction.serialized, - ); - } - - /** - * @param {number} blockSize - * @returns {Promise} - * @memberof Connection - */ - public async getTransactionsForForging(blockSize: number): Promise { - return (await this.getValidatedTransactions(0, blockSize, this.options.maxTransactionBytes)).map(transaction => - transaction.serialized.toString("hex"), - ); - } - - /** - * @param {number} start - * @param {number} size - * @returns {Promise} - * @memberof Connection - */ - public async getTransactionIdsForForging(start: number, size: number): Promise { - const transactions: Interfaces.ITransaction[] = await this.getValidatedTransactions( - start, - size, - this.options.maxTransactionBytes, - ); - - return transactions.map((transaction: Interfaces.ITransaction) => { - AppUtils.assert.defined(transaction.id); - - return transaction.id; - }); + public async getTransactions(start: number, size: number): Promise { + await this.cleaner.purgeExpired(); + return this.memory.allSortedByFee().slice(start, size); } /** @@ -407,41 +382,37 @@ export class Connection implements Contracts.TransactionPool.Connection { public async buildWallets(): Promise { this.poolWalletRepository.reset(); - - const transactionIds: string[] = await this.getTransactionIdsForForging(0, await this.getPoolSize()); - - this.app.get(Container.Identifiers.StateStore).clearCachedTransactionIds(); - - for (const transactionId of transactionIds) { - const transaction: Interfaces.ITransaction | undefined = await this.getTransaction(transactionId); - - if (!transaction || !transaction.data.senderPublicKey) { - return; + const validator = this.createTransactionValidator(); + const handlerRegistry = this.app.getTagged( + Container.Identifiers.TransactionHandlerRegistry, + "state", + "blockchain", + ); + for (const transaction of this.memory.allSortedByFee().slice()) { + try { + await validator.validate(transaction); + } catch (error) { + this.removeTransactionById(transaction.id!); + this.logger.error( + `[Pool] Removed ${transaction.id} before forging because it is no longer valid: ${error.message}`, + ); + continue; } - const senderWallet: Contracts.State.Wallet = this.poolWalletRepository.findByPublicKey( - transaction.data.senderPublicKey, - ); + if (!transaction.data.senderPublicKey) { + continue; + } - // TODO: rework error handling try { - const handlerRegistry = this.app.getTagged( - Container.Identifiers.TransactionHandlerRegistry, - "state", - "blockchain", - ); + const senderWallet = this.poolWalletRepository.findByPublicKey(transaction.data.senderPublicKey); const transactionHandler = await handlerRegistry.getActivatedHandlerForData(transaction.data); - await transactionHandler.throwIfCannotBeApplied(transaction, senderWallet, this.walletRepository); - await transactionHandler.applyToSender(transaction, this.poolWalletRepository); } catch (error) { this.logger.error(`BuildWallets from pool: ${error.message}`); - this.cleaner.purgeByPublicKey(transaction.data.senderPublicKey); } } - this.logger.info("Transaction Pool Manager build wallets complete"); } @@ -502,68 +473,6 @@ export class Connection implements Contracts.TransactionPool.Connection { } } - /** - * @private - * @param {number} start - * @param {number} size - * @param {number} [maxBytes=0] - * @returns {Promise} - * @memberof Connection - */ - private async getValidatedTransactions( - start: number, - size: number, - maxBytes = 0, - ): Promise { - await this.cleaner.purgeExpired(); - - const data: Interfaces.ITransaction[] = []; - - let transactionBytes = 0; - - const tempWalletRepository = this.app.getTagged( - Container.Identifiers.WalletRepository, - "state", - "temp", - ); - - let i = 0; - // Copy the returned array because validateTransactions() in the loop body we may remove entries. - const allTransactions: Interfaces.ITransaction[] = [...this.memory.allSortedByFee()]; - for (const transaction of allTransactions) { - if (data.length === size) { - return data; - } - - const valid: Interfaces.ITransaction[] = await this.validateTransactions( - [transaction], - tempWalletRepository, - ); - - if (valid.length === 0) { - continue; - } - - if (i++ < start) { - continue; - } - - if (maxBytes > 0) { - const transactionSize: number = JSON.stringify(transaction.data).length; - - if (transactionBytes + transactionSize > maxBytes) { - return data; - } - - transactionBytes += transactionSize; - } - - data.push(transaction); - } - - return data; - } - /** * @private * @param {Interfaces.ITransaction} transaction diff --git a/packages/core-transaction-pool/src/index.ts b/packages/core-transaction-pool/src/index.ts index 63fb48d69a..851befb618 100644 --- a/packages/core-transaction-pool/src/index.ts +++ b/packages/core-transaction-pool/src/index.ts @@ -5,3 +5,4 @@ export * from "./processor"; export * from "./service-provider"; export * from "./storage"; export * from "./synchronizer"; +export * from "./collator"; diff --git a/packages/core-transaction-pool/src/service-provider.ts b/packages/core-transaction-pool/src/service-provider.ts index 5ae6d8d8e3..b8f656a0f9 100644 --- a/packages/core-transaction-pool/src/service-provider.ts +++ b/packages/core-transaction-pool/src/service-provider.ts @@ -1,6 +1,7 @@ import { Container, Providers } from "@arkecosystem/core-kernel"; import { Cleaner } from "./cleaner"; +import { Collator } from "./collator"; import { Connection } from "./connection"; import { Memory } from "./memory"; import { PoolWalletRepository } from "./pool-wallet-repository"; @@ -49,6 +50,8 @@ export class ServiceProvider extends Providers.ServiceProvider { .to(Connection) .inSingletonScope(); + this.app.bind(Container.Identifiers.TransactionPoolCollator).to(Collator); + this.initializeComponents(); }