-
Notifications
You must be signed in to change notification settings - Fork 29
/
transactionWorker.ts
333 lines (304 loc) · 12.9 KB
/
transactionWorker.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
/* eslint-disable no-await-in-loop */
import EventEmitter from "eventemitter3";
import { AptosConfig } from "../../api/aptosConfig";
import { Account } from "../../core";
import { waitForTransaction } from "../../internal/transaction";
import { generateTransaction, signAndSubmitTransaction } from "../../internal/transactionSubmission";
import { PendingTransactionResponse, TransactionResponse } from "../../types";
import { InputGenerateTransactionOptions, InputGenerateTransactionPayloadData } from "../types";
import { AccountSequenceNumber } from "./accountSequenceNumber";
import { AsyncQueue, AsyncQueueCancelledError } from "./asyncQueue";
import { SimpleTransaction } from "../instances/simpleTransaction";
export const promiseFulfilledStatus = "fulfilled";
// Event types the worker fires during execution and
// the dapp can listen to
export enum TransactionWorkerEventsEnum {
// fired after a transaction gets sent to the chain
TransactionSent = "transactionSent",
// fired if there is an error sending the transaction to the chain
TransactionSendFailed = "transactionSendFailed",
// fired when a single transaction has executed successfully
TransactionExecuted = "transactionExecuted",
// fired if a single transaction fails in execution
TransactionExecutionFailed = "transactionExecutionFailed",
// fired when the worker has finished its job / when the queue has been emptied
ExecutionFinish = "executionFinish",
}
// Typed interface of the worker events
export interface TransactionWorkerEvents {
transactionSent: (data: SuccessEventData) => void;
transactionSendFailed: (data: FailureEventData) => void;
transactionExecuted: (data: SuccessEventData) => void;
transactionExecutionFailed: (data: FailureEventData) => void;
executionFinish: (data: ExecutionFinishEventData) => void;
}
// Type for when the worker has finished its job
export type ExecutionFinishEventData = {
message: string;
};
// Type for a success event
export type SuccessEventData = {
message: string;
transactionHash: string;
};
// Type for a failure event
export type FailureEventData = {
message: string;
error: string;
};
/**
* TransactionWorker provides a simple framework for receiving payloads to be processed.
*
* Once one `start()` the process and pushes a new transaction, the worker acquires
* the current account's next sequence number (by using the AccountSequenceNumber class),
* generates a signed transaction and pushes an async submission process into the `outstandingTransactions` queue.
* At the same time, the worker processes transactions by reading the `outstandingTransactions` queue
* and submits the next transaction to chain, it
* 1) waits for resolution of the submission process or get pre-execution validation error
* and 2) waits for the resolution of the execution process or get an execution error.
* The worker fires events for any submission and/or execution success and/or failure.
*/
export class TransactionWorker extends EventEmitter<TransactionWorkerEvents> {
readonly aptosConfig: AptosConfig;
readonly account: Account;
// current account sequence number
readonly accountSequnceNumber: AccountSequenceNumber;
readonly taskQueue: AsyncQueue<() => Promise<void>> = new AsyncQueue<() => Promise<void>>();
// process has started
started: boolean;
/**
* transactions payloads waiting to be generated and signed
*
* TODO support entry function payload from ABI builder
*/
transactionsQueue = new AsyncQueue<
[InputGenerateTransactionPayloadData, InputGenerateTransactionOptions | undefined]
>();
/**
* signed transactions waiting to be submitted
*/
outstandingTransactions = new AsyncQueue<[Promise<PendingTransactionResponse>, bigint]>();
/**
* transactions that have been submitted to chain
*/
sentTransactions: Array<[string, bigint, any]> = [];
/**
* transactions that have been committed to chain
*/
executedTransactions: Array<[string, bigint, any]> = [];
/**
* Provides a simple framework for receiving payloads to be processed.
*
* @param aptosConfig - a config object
* @param sender - a sender as Account
* @param maxWaitTime - the max wait time to wait before resyncing the sequence number
* to the current on-chain state, default to 30
* @param maximumInFlight - submit up to `maximumInFlight` transactions per account.
* Mempool limits the number of transactions per account to 100, hence why we default to 100.
* @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating, default to 10
*/
constructor(
aptosConfig: AptosConfig,
account: Account,
maxWaitTime: number = 30,
maximumInFlight: number = 100,
sleepTime: number = 10,
) {
super();
this.aptosConfig = aptosConfig;
this.account = account;
this.started = false;
this.accountSequnceNumber = new AccountSequenceNumber(
aptosConfig,
account,
maxWaitTime,
maximumInFlight,
sleepTime,
);
}
/**
* Gets the current account sequence number,
* generates the transaction with the account sequence number,
* adds the transaction to the outstanding transaction queue
* to be processed later.
*/
async submitNextTransaction() {
try {
/* eslint-disable no-constant-condition */
while (true) {
const sequenceNumber = await this.accountSequnceNumber.nextSequenceNumber();
if (sequenceNumber === null) return;
const transaction = await this.generateNextTransaction(this.account, sequenceNumber);
if (!transaction) return;
const pendingTransaction = signAndSubmitTransaction({
aptosConfig: this.aptosConfig,
transaction,
signer: this.account,
});
await this.outstandingTransactions.enqueue([pendingTransaction, sequenceNumber]);
}
} catch (error: any) {
if (error instanceof AsyncQueueCancelledError) {
return;
}
throw new Error(`Submit transaction failed for ${this.account.accountAddress.toString()} with error ${error}`);
}
}
/**
* Reads the outstanding transaction queue and submits the transaction to chain.
*
* If the transaction has fulfilled, it pushes the transaction to the processed
* transactions queue and fires a transactionsFulfilled event.
*
* If the transaction has failed, it pushes the transaction to the processed
* transactions queue with the failure reason and fires a transactionsFailed event.
*/
async processTransactions() {
try {
/* eslint-disable no-constant-condition */
while (true) {
const awaitingTransactions = [];
const sequenceNumbers = [];
let [pendingTransaction, sequenceNumber] = await this.outstandingTransactions.dequeue();
awaitingTransactions.push(pendingTransaction);
sequenceNumbers.push(sequenceNumber);
while (!this.outstandingTransactions.isEmpty()) {
[pendingTransaction, sequenceNumber] = await this.outstandingTransactions.dequeue();
awaitingTransactions.push(pendingTransaction);
sequenceNumbers.push(sequenceNumber);
}
// send awaiting transactions to chain
const sentTransactions = await Promise.allSettled(awaitingTransactions);
for (let i = 0; i < sentTransactions.length && i < sequenceNumbers.length; i += 1) {
// check sent transaction status
const sentTransaction = sentTransactions[i];
sequenceNumber = sequenceNumbers[i];
if (sentTransaction.status === promiseFulfilledStatus) {
// transaction sent to chain
this.sentTransactions.push([sentTransaction.value.hash, sequenceNumber, null]);
// check sent transaction execution
this.emit(TransactionWorkerEventsEnum.TransactionSent, {
message: `transaction hash ${sentTransaction.value.hash} has been committed to chain`,
transactionHash: sentTransaction.value.hash,
});
await this.checkTransaction(sentTransaction, sequenceNumber);
} else {
// send transaction failed
this.sentTransactions.push([sentTransaction.status, sequenceNumber, sentTransaction.reason]);
this.emit(TransactionWorkerEventsEnum.TransactionSendFailed, {
message: `failed to commit transaction ${this.sentTransactions.length} with error ${sentTransaction.reason}`,
error: sentTransaction.reason,
});
}
}
this.emit(TransactionWorkerEventsEnum.ExecutionFinish, {
message: `execute ${sentTransactions.length} transactions finished`,
});
}
} catch (error: any) {
if (error instanceof AsyncQueueCancelledError) {
return;
}
throw new Error(`Process execution failed for ${this.account.accountAddress.toString()} with error ${error}`);
}
}
/**
* Once transaction has been sent to chain, we check for its execution status.
* @param sentTransaction transactions that were sent to chain and are now waiting to be executed
* @param sequenceNumber the account's sequence number that was sent with the transaction
*/
async checkTransaction(sentTransaction: PromiseFulfilledResult<PendingTransactionResponse>, sequenceNumber: bigint) {
try {
const waitFor: Array<Promise<TransactionResponse>> = [];
waitFor.push(waitForTransaction({ aptosConfig: this.aptosConfig, transactionHash: sentTransaction.value.hash }));
const sentTransactions = await Promise.allSettled(waitFor);
for (let i = 0; i < sentTransactions.length; i += 1) {
const executedTransaction = sentTransactions[i];
if (executedTransaction.status === promiseFulfilledStatus) {
// transaction executed to chain
this.executedTransactions.push([executedTransaction.value.hash, sequenceNumber, null]);
this.emit(TransactionWorkerEventsEnum.TransactionExecuted, {
message: `transaction hash ${executedTransaction.value.hash} has been executed on chain`,
transactionHash: sentTransaction.value.hash,
});
} else {
// transaction execution failed
this.executedTransactions.push([executedTransaction.status, sequenceNumber, executedTransaction.reason]);
this.emit(TransactionWorkerEventsEnum.TransactionExecutionFailed, {
message: `failed to execute transaction ${this.executedTransactions.length} with error ${executedTransaction.reason}`,
error: executedTransaction.reason,
});
}
}
} catch (error: any) {
throw new Error(`Check transaction failed for ${this.account.accountAddress.toString()} with error ${error}`);
}
}
/**
* Push transaction to the transactions queue
*
* @param transactionData Transaction payload
* @param transactionData.abi For all entry function payloads, the ABI to skip remote ABI lookups
* @param options.maxGasAmount Maximum gas amount for the transaction
* @param options.gasUnitPrice Gas unit price for the transaction
* @param options.expireTimestamp expiration timestamp on the transaction
* @param options.accountSequenceNumber the sequence number for the transaction
*/
async push(
transactionData: InputGenerateTransactionPayloadData,
options?: InputGenerateTransactionOptions,
): Promise<void> {
this.transactionsQueue.enqueue([transactionData, options]);
}
/**
* Generates a signed transaction that can be submitted to chain
* @param account an Aptos account
* @param sequenceNumber a sequence number the transaction will be generated with
* @returns
*/
async generateNextTransaction(account: Account, sequenceNumber: bigint): Promise<SimpleTransaction | undefined> {
if (this.transactionsQueue.isEmpty()) return undefined;
const [transactionData, options] = await this.transactionsQueue.dequeue();
return generateTransaction({
aptosConfig: this.aptosConfig,
sender: account.accountAddress,
data: transactionData,
options: { ...options, accountSequenceNumber: sequenceNumber },
});
}
/**
* Starts transaction submission and transaction processing.
*/
async run() {
try {
while (!this.taskQueue.isCancelled()) {
const task = await this.taskQueue.dequeue();
await task();
}
} catch (error: any) {
throw new Error(`Unable to start transaction batching: ${error}`);
}
}
/**
* Starts the transaction management process.
*/
start() {
if (this.started) {
throw new Error("worker has already started");
}
this.started = true;
this.taskQueue.enqueue(() => this.submitNextTransaction());
this.taskQueue.enqueue(() => this.processTransactions());
this.run();
}
/**
* Stops the the transaction management process.
*/
stop() {
if (this.taskQueue.isCancelled()) {
throw new Error("worker has already stopped");
}
this.started = false;
this.taskQueue.cancel();
}
}