diff --git a/packages/core-kernel/src/contracts/transaction-pool/index.ts b/packages/core-kernel/src/contracts/transaction-pool/index.ts index 47bc7afc80..a1961660f6 100644 --- a/packages/core-kernel/src/contracts/transaction-pool/index.ts +++ b/packages/core-kernel/src/contracts/transaction-pool/index.ts @@ -1,3 +1,4 @@ export * from "./connection"; export * from "./processor"; export * from "./collator"; +export * from "./storage"; diff --git a/packages/core-kernel/src/contracts/transaction-pool/storage.ts b/packages/core-kernel/src/contracts/transaction-pool/storage.ts new file mode 100644 index 0000000000..f9e20a949f --- /dev/null +++ b/packages/core-kernel/src/contracts/transaction-pool/storage.ts @@ -0,0 +1,11 @@ +import { Interfaces } from "@arkecosystem/crypto"; + +export interface Storage { + boot(): void; + dispose(): void; + add(transaction: Interfaces.ITransaction): void; + delete(id: string): void; + has(id: string): boolean; + clear(): void; + all(): Interfaces.ITransaction[]; +} diff --git a/packages/core-kernel/src/ioc/identifiers.ts b/packages/core-kernel/src/ioc/identifiers.ts index 0bd2fbd99a..6828f0d863 100644 --- a/packages/core-kernel/src/ioc/identifiers.ts +++ b/packages/core-kernel/src/ioc/identifiers.ts @@ -82,7 +82,6 @@ export const Identifiers = { TransactionPoolCleaner: Symbol.for("TransactionPool"), TransactionPoolMemory: Symbol.for("TransactionPool"), TransactionPoolStorage: Symbol.for("TransactionPool"), - TransactionPoolSynchronizer: Symbol.for("TransactionPool"), TransactionPoolCollator: Symbol.for("TransactionPool"), // Transactions - @todo: better names that won't clash diff --git a/packages/core-transaction-pool/src/cleaner.ts b/packages/core-transaction-pool/src/cleaner.ts index 19deeff32e..0020c6d543 100644 --- a/packages/core-transaction-pool/src/cleaner.ts +++ b/packages/core-transaction-pool/src/cleaner.ts @@ -5,8 +5,6 @@ import { Interfaces } from "@arkecosystem/crypto"; import { Memory } from "./memory"; import { PoolWalletRepository } from "./pool-wallet-repository"; -import { Storage } from "./storage"; -import { Synchronizer } from "./synchronizer"; /** * @export @@ -48,19 +46,11 @@ export class Cleaner { /** * @private - * @type {Storage} + * @type {Contracts.TransactionPool.Storage} * @memberof Cleaner */ @Container.inject(Container.Identifiers.TransactionPoolStorage) - private readonly storage!: Storage; - - /** - * @private - * @type {Synchronizer} - * @memberof Cleaner - */ - @Container.inject(Container.Identifiers.TransactionPoolSynchronizer) - private readonly synchronizer!: Synchronizer; + private readonly storage!: Contracts.TransactionPool.Storage; /** * @private @@ -85,8 +75,7 @@ export class Cleaner { */ public flush(): void { this.memory.flush(); - - this.storage.deleteAll(); + this.storage.clear(); } /** @@ -106,9 +95,7 @@ export class Cleaner { */ public removeTransactionById(id: string, senderPublicKey?: string): void { this.memory.forget(id, senderPublicKey); - - this.synchronizer.syncToPersistentStorageIfNecessary(); - + this.storage.delete(id); this.emitter.dispatch(AppEnums.TransactionEvent.RemovedFromPool, id); } @@ -197,8 +184,7 @@ export class Cleaner { AppUtils.assert.defined(transaction.id); this.memory.forget(transaction.id, transaction.data.senderPublicKey); - - this.synchronizer.syncToPersistentStorageIfNecessary(); + this.storage.delete(transaction.id); }; const lowestNonceBySender = {}; diff --git a/packages/core-transaction-pool/src/connection.ts b/packages/core-transaction-pool/src/connection.ts index 375be28927..65d3aa2abb 100644 --- a/packages/core-transaction-pool/src/connection.ts +++ b/packages/core-transaction-pool/src/connection.ts @@ -10,8 +10,6 @@ import { TransactionsProcessed } from "./interfaces"; import { PurgeInvalidTransactions } from "./listeners"; import { Memory } from "./memory"; import { PoolWalletRepository } from "./pool-wallet-repository"; -import { Storage } from "./storage"; -import { Synchronizer } from "./synchronizer"; // todo: review implementation and reduce the complexity of all methods as it is quite high /** @@ -82,11 +80,11 @@ export class Connection implements Contracts.TransactionPool.Connection { /** * @private - * @type {Storage} + * @type {Contracts.TransactionPool.Storage} * @memberof Connection */ @Container.inject(Container.Identifiers.TransactionPoolStorage) - private readonly storage!: Storage; + private readonly storage!: Contracts.TransactionPool.Storage; /** * @private @@ -96,14 +94,6 @@ export class Connection implements Contracts.TransactionPool.Connection { @Container.inject(Container.Identifiers.TransactionPoolCleaner) private readonly cleaner!: Cleaner; - /** - * @private - * @type {Synchronizer} - * @memberof Connection - */ - @Container.inject(Container.Identifiers.TransactionPoolSynchronizer) - private readonly synchronizer!: Synchronizer; - /** * @private * @type {Contracts.State.TransactionValidatorFactory} @@ -120,22 +110,20 @@ export class Connection implements Contracts.TransactionPool.Connection { */ public async boot(): Promise { this.memory.flush(); - this.storage.connect(); - const transactionsFromDisk: Interfaces.ITransaction[] = this.storage.loadAll(); - for (const transaction of transactionsFromDisk) { - this.memory.remember(transaction, true); + if (process.env.CORE_RESET_DATABASE) { + this.storage.clear(); } - // Remove from the pool invalid entries found in `transactionsFromDisk`. - if (process.env.CORE_RESET_DATABASE) { - this.memory.flush(); - } else { - await this.validateTransactions(transactionsFromDisk); - await this.cleaner.purgeExpired(); + const transactionsFromDisk: Interfaces.ITransaction[] = this.storage.all(); + for (const transaction of transactionsFromDisk) { + // ! isn't applied on top of pool wallet repository + this.memory.remember(transaction); } - this.synchronizer.syncToPersistentStorage(); + // Remove from the pool invalid entries found in `transactionsFromDisk`. + await this.validateTransactions(transactionsFromDisk); + await this.cleaner.purgeExpired(); this.emitter.listen(AppEnums.CryptoEvent.MilestoneChanged, this.app.resolve(PurgeInvalidTransactions)); @@ -222,9 +210,7 @@ export class Connection implements Contracts.TransactionPool.Connection { */ public removeTransactionById(id: string, senderPublicKey?: string): void { this.memory.forget(id, senderPublicKey); - - this.synchronizer.syncToPersistentStorageIfNecessary(); - + this.storage.delete(id); this.emitter.dispatch(AppEnums.TransactionEvent.RemovedFromPool, id); } @@ -501,6 +487,7 @@ export class Connection implements Contracts.TransactionPool.Connection { const transactionHandler = await this.blockchainHandlerRegistry.getActivatedHandlerForData(lowest.data); await transactionHandler.revertForSender(lowest, this.poolWalletRepository); this.memory.forget(lowest.id, lowest.data.senderPublicKey); + this.storage.delete(lowest.id); } else { return { transaction, @@ -514,6 +501,7 @@ export class Connection implements Contracts.TransactionPool.Connection { } this.memory.remember(transaction); + this.storage.add(transaction); try { AppUtils.assert.defined(transaction.data.senderPublicKey); @@ -529,14 +517,11 @@ export class Connection implements Contracts.TransactionPool.Connection { AppUtils.assert.defined(transaction.id); this.memory.forget(transaction.id); - - console.log(error); + this.storage.delete(transaction.id); return { transaction, type: "ERR_APPLY", message: error.message }; } - this.synchronizer.syncToPersistentStorageIfNecessary(); - return {}; } diff --git a/packages/core-transaction-pool/src/index.ts b/packages/core-transaction-pool/src/index.ts index 851befb618..fd4c2ddefa 100644 --- a/packages/core-transaction-pool/src/index.ts +++ b/packages/core-transaction-pool/src/index.ts @@ -4,5 +4,4 @@ export * from "./memory"; export * from "./processor"; export * from "./service-provider"; export * from "./storage"; -export * from "./synchronizer"; export * from "./collator"; diff --git a/packages/core-transaction-pool/src/memory.ts b/packages/core-transaction-pool/src/memory.ts index a3c2b966a2..0c36c3b13a 100644 --- a/packages/core-transaction-pool/src/memory.ts +++ b/packages/core-transaction-pool/src/memory.ts @@ -81,16 +81,6 @@ export class Memory { */ private byExpirationIsSorted = true; - /** - * @private - * @type {{ added: Set; removed: Set }} - * @memberof Memory - */ - private readonly dirty: { added: Set; removed: Set } = { - added: new Set(), - removed: new Set(), - }; - /** * @returns {Interfaces.ITransaction[]} * @memberof Memory @@ -224,10 +214,9 @@ export class Memory { /** * @param {Interfaces.ITransaction} transaction - * @param {boolean} [databaseReady] * @memberof Memory */ - public remember(transaction: Interfaces.ITransaction, databaseReady?: boolean): void { + public remember(transaction: Interfaces.ITransaction): void { AppUtils.assert.defined(transaction.id); const id: string = transaction.id; @@ -286,17 +275,6 @@ export class Memory { this.byExpiration.push(transaction); this.byExpirationIsSorted = false; } - - if (!databaseReady) { - if (this.dirty.removed.has(id)) { - // If the transaction has been already in the pool and has been removed - // and the removal has not propagated to disk yet, just wipe it from the - // list of removed transactions, so that the old copy stays on disk. - this.dirty.removed.delete(id); - } else { - this.dirty.added.add(id); - } - } } /** @@ -352,15 +330,6 @@ export class Memory { assert.notStrictEqual(i, -1); this.all.splice(i, 1); this.allIsSorted = false; - - if (this.dirty.added.has(id)) { - // This transaction has been added and deleted without data being synced - // to disk in between, so it will never touch the disk, just remove it - // from the added list. - this.dirty.added.delete(id); - } else { - this.dirty.removed.add(id); - } } public has(id: string): boolean { @@ -375,37 +344,12 @@ export class Memory { this.byType.clear(); this.byExpiration = []; this.byExpirationIsSorted = true; - this.dirty.added.clear(); - this.dirty.removed.clear(); } public count(): number { return this.all.length; } - public countDirty(): number { - return this.dirty.added.size + this.dirty.removed.size; - } - - public pullDirtyAdded(): Interfaces.ITransaction[] { - const added: Interfaces.ITransaction[] = []; - - for (const id of this.dirty.added) { - added.push(this.byId[id]); - } - - this.dirty.added.clear(); - - return added; - } - - public pullDirtyRemoved(): string[] { - const removed: string[] = Array.from(this.dirty.removed); - this.dirty.removed.clear(); - - return removed; - } - /** * Sort `this.all` by fee (highest fee first) with the exception that transactions * from the same sender must be ordered lowest `nonce` first. diff --git a/packages/core-transaction-pool/src/service-provider.ts b/packages/core-transaction-pool/src/service-provider.ts index 26fd78f518..691e7a46f3 100644 --- a/packages/core-transaction-pool/src/service-provider.ts +++ b/packages/core-transaction-pool/src/service-provider.ts @@ -6,7 +6,6 @@ import { Connection } from "./connection"; import { Memory } from "./memory"; import { PoolWalletRepository } from "./pool-wallet-repository"; import { Storage } from "./storage"; -import { Synchronizer } from "./synchronizer"; /** * @export @@ -35,11 +34,6 @@ export class ServiceProvider extends Providers.ServiceProvider { .to(Storage) .inSingletonScope(); - this.app - .bind(Container.Identifiers.TransactionPoolSynchronizer) - .to(Synchronizer) - .inSingletonScope(); - this.app .bind(Container.Identifiers.TransactionPoolCleaner) .to(Cleaner) @@ -58,6 +52,7 @@ export class ServiceProvider extends Providers.ServiceProvider { * @memberof ServiceProvider */ public async boot(): Promise { + await this.app.get(Container.Identifiers.TransactionPoolStorage).boot(); await this.app.get(Container.Identifiers.TransactionPoolService).boot(); } @@ -66,9 +61,7 @@ export class ServiceProvider extends Providers.ServiceProvider { * @memberof ServiceProvider */ public async dispose(): Promise { - this.app.get(Container.Identifiers.TransactionPoolSynchronizer).syncToPersistentStorage(); - - this.app.get(Container.Identifiers.TransactionPoolStorage).disconnect(); + this.app.get(Container.Identifiers.TransactionPoolStorage).dispose(); } /** diff --git a/packages/core-transaction-pool/src/storage.ts b/packages/core-transaction-pool/src/storage.ts index 16def5fc59..c4b97a2e4f 100644 --- a/packages/core-transaction-pool/src/storage.ts +++ b/packages/core-transaction-pool/src/storage.ts @@ -1,147 +1,58 @@ -import { Container, Providers } from "@arkecosystem/core-kernel"; +import { Container, Contracts, Providers } from "@arkecosystem/core-kernel"; import { Interfaces, Transactions } from "@arkecosystem/crypto"; -import { strictEqual } from "assert"; import BetterSqlite3 from "better-sqlite3"; import { ensureFileSync } from "fs-extra"; -/** - * @export - * @class Storage - */ @Container.injectable() -export class Storage { - /** - * @private - * @type {Providers.PluginConfiguration} - * @memberof Storage - */ +export class Storage implements Contracts.TransactionPool.Storage { @Container.inject(Container.Identifiers.PluginConfiguration) @Container.tagged("plugin", "@arkecosystem/core-transaction-pool") private readonly configuration!: Providers.PluginConfiguration; - /** - * @private - * @type {string} - * @memberof Storage - */ - private readonly table: string = "pool"; - - /** - * @private - * @type {BetterSqlite3.Database} - * @memberof Storage - */ private database!: BetterSqlite3.Database; - /** - * @memberof Storage - */ - public connect() { - ensureFileSync(this.configuration.getRequired("storage")); - - this.database = new BetterSqlite3(this.configuration.getRequired("storage")); + public boot(): void { + const filename = this.configuration.getRequired("storage"); + ensureFileSync(filename); + this.database = new BetterSqlite3(filename); this.database.exec(` - PRAGMA journal_mode=WAL; - CREATE TABLE IF NOT EXISTS ${this.table} ( - "id" VARCHAR(64) PRIMARY KEY, - "serialized" BLOB NOT NULL - ); - `); + PRAGMA journal_mode = WAL; + CREATE TABLE IF NOT EXISTS pool (id VARCHAR(64) PRIMARY KEY, serialized BLOB NOT NULL); + `); } - /** - * @memberof Storage - */ - public disconnect(): void { + public dispose(): void { this.database.close(); } - /** - * @param {Interfaces.ITransaction[]} data - * @returns {void} - * @memberof Storage - */ - public bulkAdd(data: Interfaces.ITransaction[]): void { - if (data.length === 0) { - return; - } - - const insertStatement: BetterSqlite3.Statement = this.database.prepare( - `INSERT INTO ${this.table} ` + "(id, serialized) VALUES " + "(:id, :serialized);", - ); - - try { - this.database.prepare("BEGIN;").run(); - - for (const transaction of data) { - insertStatement.run({ id: transaction.id, serialized: transaction.serialized }); - } - - this.database.prepare("COMMIT;").run(); - } finally { - if (this.database.inTransaction) { - this.database.prepare("ROLLBACK;").run(); - } - } + public add(transaction: Interfaces.ITransaction): void { + this.database.prepare("INSERT INTO pool (id, serialized) VALUES (:id, :serialized)").run({ + id: transaction.id, + serialized: transaction.serialized, + }); } - /** - * @param {string[]} ids - * @returns {void} - * @memberof Storage - */ - public bulkRemoveById(ids: string[]): void { - if (ids.length === 0) { - return; - } - - const deleteStatement: BetterSqlite3.Statement = this.database.prepare( - `DELETE FROM ${this.table} WHERE id = :id;`, - ); - - this.database.prepare("BEGIN;").run(); - - for (const id of ids) { - deleteStatement.run({ id }); - } - - this.database.prepare("COMMIT;").run(); + public delete(id: string): void { + this.database.prepare("DELETE FROM pool WHERE id = ?").run(id); } - /** - * @returns {Interfaces.ITransaction[]} - * @memberof Storage - */ - public loadAll(): Interfaces.ITransaction[] { - const rows: Array<{ id: string; serialized: string }> = this.database - .prepare(`SELECT id, LOWER(HEX(serialized)) AS serialized FROM ${this.table};`) - .all(); - - const transactions: Interfaces.ITransaction[] = []; - - const invalidIds: string[] = []; - for (const row of rows) { - try { - const transaction: Interfaces.ITransaction = Transactions.TransactionFactory.fromHex(row.serialized); - - strictEqual(row.id, transaction.id); - - transaction.isVerified ? transactions.push(transaction) : invalidIds.push(row.id); - } catch { - invalidIds.push(row.id); - } - } - - this.bulkRemoveById(invalidIds); + public has(id: string): boolean { + return this.database + .prepare("SELECT COUNT(*) FROM pool WHERE id = ?") + .pluck(true) + .get(id) as boolean; + } - return transactions; + public clear(): void { + this.database.prepare("DELETE FROM pool").run(); } - /** - * @memberof Storage - */ - public deleteAll(): void { - this.database.exec(`DELETE FROM ${this.table};`); + public all(): Interfaces.ITransaction[] { + return this.database + .prepare("SELECT LOWER(HEX(serialized)) FROM pool") + .pluck(true) + .all() + .map(Transactions.TransactionFactory.fromHex); } } diff --git a/packages/core-transaction-pool/src/synchronizer.ts b/packages/core-transaction-pool/src/synchronizer.ts deleted file mode 100644 index f33aa3a1b3..0000000000 --- a/packages/core-transaction-pool/src/synchronizer.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { Container, Providers } from "@arkecosystem/core-kernel"; - -import { Memory } from "./memory"; -import { Storage } from "./storage"; - -/** - * @export - * @class Synchronizer - */ -@Container.injectable() -export class Synchronizer { - /** - * @private - * @type {Providers.PluginConfiguration} - * @memberof Synchronizer - */ - @Container.inject(Container.Identifiers.PluginConfiguration) - @Container.tagged("plugin", "@arkecosystem/core-transaction-pool") - private readonly configuration!: Providers.PluginConfiguration; - - /** - * @private - * @type {Memory} - * @memberof Synchronizer - */ - @Container.inject(Container.Identifiers.TransactionPoolMemory) - private readonly memory!: Memory; - - /** - * @private - * @type {Storage} - * @memberof Synchronizer - */ - @Container.inject(Container.Identifiers.TransactionPoolStorage) - private readonly storage!: Storage; - - /** - * @memberof Synchronizer - */ - public syncToPersistentStorage(): void { - this.storage.bulkAdd(this.memory.pullDirtyAdded()); - this.storage.bulkRemoveById(this.memory.pullDirtyRemoved()); - } - - /** - * @memberof Synchronizer - */ - public syncToPersistentStorageIfNecessary(): void { - if (this.configuration.getRequired("syncInterval") <= this.memory.countDirty()) { - this.syncToPersistentStorage(); - } - } -}