Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./connection";
export * from "./processor";
export * from "./collator";
export * from "./storage";
11 changes: 11 additions & 0 deletions packages/core-kernel/src/contracts/transaction-pool/storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Interfaces } from "@arkecosystem/crypto";

export interface Storage {
boot(): void;
dispose(): void;
add(transaction: Interfaces.ITransaction): void;
delete(id: string): void;
has(id: string): boolean;
clear(): void;
all(): Interfaces.ITransaction[];
}
1 change: 0 additions & 1 deletion packages/core-kernel/src/ioc/identifiers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ export const Identifiers = {
TransactionPoolCleaner: Symbol.for("TransactionPool<Cleaner>"),
TransactionPoolMemory: Symbol.for("TransactionPool<Memory>"),
TransactionPoolStorage: Symbol.for("TransactionPool<Storage>"),
TransactionPoolSynchronizer: Symbol.for("TransactionPool<Synchronizer>"),
TransactionPoolCollator: Symbol.for("TransactionPool<Collator>"),

// Transactions - @todo: better names that won't clash
Expand Down
24 changes: 5 additions & 19 deletions packages/core-transaction-pool/src/cleaner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import { Interfaces } from "@arkecosystem/crypto";

import { Memory } from "./memory";
import { PoolWalletRepository } from "./pool-wallet-repository";
import { Storage } from "./storage";
import { Synchronizer } from "./synchronizer";

/**
* @export
Expand Down Expand Up @@ -48,19 +46,11 @@ export class Cleaner {

/**
* @private
* @type {Storage}
* @type {Contracts.TransactionPool.Storage}
* @memberof Cleaner
*/
@Container.inject(Container.Identifiers.TransactionPoolStorage)
private readonly storage!: Storage;

/**
* @private
* @type {Synchronizer}
* @memberof Cleaner
*/
@Container.inject(Container.Identifiers.TransactionPoolSynchronizer)
private readonly synchronizer!: Synchronizer;
private readonly storage!: Contracts.TransactionPool.Storage;

/**
* @private
Expand All @@ -85,8 +75,7 @@ export class Cleaner {
*/
public flush(): void {
this.memory.flush();

this.storage.deleteAll();
this.storage.clear();
}

/**
Expand All @@ -106,9 +95,7 @@ export class Cleaner {
*/
public removeTransactionById(id: string, senderPublicKey?: string): void {
this.memory.forget(id, senderPublicKey);

this.synchronizer.syncToPersistentStorageIfNecessary();

this.storage.delete(id);
this.emitter.dispatch(AppEnums.TransactionEvent.RemovedFromPool, id);
}

Expand Down Expand Up @@ -197,8 +184,7 @@ export class Cleaner {
AppUtils.assert.defined<Interfaces.ITransaction>(transaction.id);

this.memory.forget(transaction.id, transaction.data.senderPublicKey);

this.synchronizer.syncToPersistentStorageIfNecessary();
this.storage.delete(transaction.id);
};

const lowestNonceBySender = {};
Expand Down
45 changes: 15 additions & 30 deletions packages/core-transaction-pool/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import { TransactionsProcessed } from "./interfaces";
import { PurgeInvalidTransactions } from "./listeners";
import { Memory } from "./memory";
import { PoolWalletRepository } from "./pool-wallet-repository";
import { Storage } from "./storage";
import { Synchronizer } from "./synchronizer";

// todo: review implementation and reduce the complexity of all methods as it is quite high
/**
Expand Down Expand Up @@ -82,11 +80,11 @@ export class Connection implements Contracts.TransactionPool.Connection {

/**
* @private
* @type {Storage}
* @type {Contracts.TransactionPool.Storage}
* @memberof Connection
*/
@Container.inject(Container.Identifiers.TransactionPoolStorage)
private readonly storage!: Storage;
private readonly storage!: Contracts.TransactionPool.Storage;

/**
* @private
Expand All @@ -96,14 +94,6 @@ export class Connection implements Contracts.TransactionPool.Connection {
@Container.inject(Container.Identifiers.TransactionPoolCleaner)
private readonly cleaner!: Cleaner;

/**
* @private
* @type {Synchronizer}
* @memberof Connection
*/
@Container.inject(Container.Identifiers.TransactionPoolSynchronizer)
private readonly synchronizer!: Synchronizer;

/**
* @private
* @type {Contracts.State.TransactionValidatorFactory}
Expand All @@ -120,22 +110,20 @@ export class Connection implements Contracts.TransactionPool.Connection {
*/
public async boot(): Promise<this> {
this.memory.flush();
this.storage.connect();

const transactionsFromDisk: Interfaces.ITransaction[] = this.storage.loadAll();
for (const transaction of transactionsFromDisk) {
this.memory.remember(transaction, true);
if (process.env.CORE_RESET_DATABASE) {
this.storage.clear();
}

// Remove from the pool invalid entries found in `transactionsFromDisk`.
if (process.env.CORE_RESET_DATABASE) {
this.memory.flush();
} else {
await this.validateTransactions(transactionsFromDisk);
await this.cleaner.purgeExpired();
const transactionsFromDisk: Interfaces.ITransaction[] = this.storage.all();
for (const transaction of transactionsFromDisk) {
// ! isn't applied on top of pool wallet repository
this.memory.remember(transaction);
}

this.synchronizer.syncToPersistentStorage();
// Remove from the pool invalid entries found in `transactionsFromDisk`.
await this.validateTransactions(transactionsFromDisk);
await this.cleaner.purgeExpired();

this.emitter.listen(AppEnums.CryptoEvent.MilestoneChanged, this.app.resolve(PurgeInvalidTransactions));

Expand Down Expand Up @@ -222,9 +210,7 @@ export class Connection implements Contracts.TransactionPool.Connection {
*/
public removeTransactionById(id: string, senderPublicKey?: string): void {
this.memory.forget(id, senderPublicKey);

this.synchronizer.syncToPersistentStorageIfNecessary();

this.storage.delete(id);
this.emitter.dispatch(AppEnums.TransactionEvent.RemovedFromPool, id);
}

Expand Down Expand Up @@ -501,6 +487,7 @@ export class Connection implements Contracts.TransactionPool.Connection {
const transactionHandler = await this.blockchainHandlerRegistry.getActivatedHandlerForData(lowest.data);
await transactionHandler.revertForSender(lowest, this.poolWalletRepository);
this.memory.forget(lowest.id, lowest.data.senderPublicKey);
this.storage.delete(lowest.id);
} else {
return {
transaction,
Expand All @@ -514,6 +501,7 @@ export class Connection implements Contracts.TransactionPool.Connection {
}

this.memory.remember(transaction);
this.storage.add(transaction);

try {
AppUtils.assert.defined<string>(transaction.data.senderPublicKey);
Expand All @@ -529,14 +517,11 @@ export class Connection implements Contracts.TransactionPool.Connection {
AppUtils.assert.defined<string>(transaction.id);

this.memory.forget(transaction.id);

console.log(error);
this.storage.delete(transaction.id);

return { transaction, type: "ERR_APPLY", message: error.message };
}

this.synchronizer.syncToPersistentStorageIfNecessary();

return {};
}

Expand Down
1 change: 0 additions & 1 deletion packages/core-transaction-pool/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ export * from "./memory";
export * from "./processor";
export * from "./service-provider";
export * from "./storage";
export * from "./synchronizer";
export * from "./collator";
58 changes: 1 addition & 57 deletions packages/core-transaction-pool/src/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,6 @@ export class Memory {
*/
private byExpirationIsSorted = true;

/**
* @private
* @type {{ added: Set<string>; removed: Set<string> }}
* @memberof Memory
*/
private readonly dirty: { added: Set<string>; removed: Set<string> } = {
added: new Set(),
removed: new Set(),
};

/**
* @returns {Interfaces.ITransaction[]}
* @memberof Memory
Expand Down Expand Up @@ -224,10 +214,9 @@ export class Memory {

/**
* @param {Interfaces.ITransaction} transaction
* @param {boolean} [databaseReady]
* @memberof Memory
*/
public remember(transaction: Interfaces.ITransaction, databaseReady?: boolean): void {
public remember(transaction: Interfaces.ITransaction): void {
AppUtils.assert.defined<string>(transaction.id);

const id: string = transaction.id;
Expand Down Expand Up @@ -286,17 +275,6 @@ export class Memory {
this.byExpiration.push(transaction);
this.byExpirationIsSorted = false;
}

if (!databaseReady) {
if (this.dirty.removed.has(id)) {
// If the transaction has been already in the pool and has been removed
// and the removal has not propagated to disk yet, just wipe it from the
// list of removed transactions, so that the old copy stays on disk.
this.dirty.removed.delete(id);
} else {
this.dirty.added.add(id);
}
}
}

/**
Expand Down Expand Up @@ -352,15 +330,6 @@ export class Memory {
assert.notStrictEqual(i, -1);
this.all.splice(i, 1);
this.allIsSorted = false;

if (this.dirty.added.has(id)) {
// This transaction has been added and deleted without data being synced
// to disk in between, so it will never touch the disk, just remove it
// from the added list.
this.dirty.added.delete(id);
} else {
this.dirty.removed.add(id);
}
}

public has(id: string): boolean {
Expand All @@ -375,37 +344,12 @@ export class Memory {
this.byType.clear();
this.byExpiration = [];
this.byExpirationIsSorted = true;
this.dirty.added.clear();
this.dirty.removed.clear();
}

public count(): number {
return this.all.length;
}

public countDirty(): number {
return this.dirty.added.size + this.dirty.removed.size;
}

public pullDirtyAdded(): Interfaces.ITransaction[] {
const added: Interfaces.ITransaction[] = [];

for (const id of this.dirty.added) {
added.push(this.byId[id]);
}

this.dirty.added.clear();

return added;
}

public pullDirtyRemoved(): string[] {
const removed: string[] = Array.from(this.dirty.removed);
this.dirty.removed.clear();

return removed;
}

/**
* Sort `this.all` by fee (highest fee first) with the exception that transactions
* from the same sender must be ordered lowest `nonce` first.
Expand Down
11 changes: 2 additions & 9 deletions packages/core-transaction-pool/src/service-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { Connection } from "./connection";
import { Memory } from "./memory";
import { PoolWalletRepository } from "./pool-wallet-repository";
import { Storage } from "./storage";
import { Synchronizer } from "./synchronizer";

/**
* @export
Expand Down Expand Up @@ -35,11 +34,6 @@ export class ServiceProvider extends Providers.ServiceProvider {
.to(Storage)
.inSingletonScope();

this.app
.bind(Container.Identifiers.TransactionPoolSynchronizer)
.to(Synchronizer)
.inSingletonScope();

this.app
.bind(Container.Identifiers.TransactionPoolCleaner)
.to(Cleaner)
Expand All @@ -58,6 +52,7 @@ export class ServiceProvider extends Providers.ServiceProvider {
* @memberof ServiceProvider
*/
public async boot(): Promise<void> {
await this.app.get<Storage>(Container.Identifiers.TransactionPoolStorage).boot();
await this.app.get<Connection>(Container.Identifiers.TransactionPoolService).boot();
}

Expand All @@ -66,9 +61,7 @@ export class ServiceProvider extends Providers.ServiceProvider {
* @memberof ServiceProvider
*/
public async dispose(): Promise<void> {
this.app.get<Synchronizer>(Container.Identifiers.TransactionPoolSynchronizer).syncToPersistentStorage();

this.app.get<Storage>(Container.Identifiers.TransactionPoolStorage).disconnect();
this.app.get<Storage>(Container.Identifiers.TransactionPoolStorage).dispose();
}

/**
Expand Down
Loading