/
Batch.ts
192 lines (172 loc) · 4.04 KB
/
Batch.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
// In browser, during webpack or browserify bundling, this module will be replaced by 'events'
// https://github.com/Gozala/events
import { EventEmitter } from "events";
/**
* Operation is an async function to be executed and managed by Batch.
*/
export declare type Operation = () => Promise<any>;
/**
* States for Batch.
*
* @enum {number}
*/
enum BatchStates {
Good,
Error
}
/**
* Batch provides basic parallel execution with concurrency limits.
* Will stop execute left operations when one of the executed operation throws an error.
* But Batch cannot cancel ongoing operations, you need to cancel them by yourself.
*
* @export
* @class Batch
*/
export class Batch {
/**
* Concurrency. Must be lager than 0.
*
* @type {number}
* @memberof Batch
*/
private concurrency: number;
/**
* Number of active operations under execution.
*
* @private
* @type {number}
* @memberof Batch
*/
private actives: number = 0;
/**
* Number of completed operations under execution.
*
* @private
* @type {number}
* @memberof Batch
*/
private completed: number = 0;
/**
* Offset of next operation to be executed.
*
* @private
* @type {number}
* @memberof Batch
*/
private offset: number = 0;
/**
* Operation array to be executed.
*
* @private
* @type {Operation[]}
* @memberof Batch
*/
private operations: Operation[] = [];
/**
* States of Batch. When an error happens, state will turn into error.
* Batch will stop execute left operations.
*
* @private
* @type {BatchStates}
* @memberof Batch
*/
private state: BatchStates = BatchStates.Good;
/**
* A private emitter used to pass events inside this class.
*
* @private
* @type {EventEmitter}
* @memberof Batch
*/
private emitter: EventEmitter;
/**
* Creates an instance of Batch.
* @param {number} [concurrency=5]
* @memberof Batch
*/
public constructor(concurrency: number = 5) {
if (concurrency < 1) {
throw new RangeError("concurrency must be larger than 0");
}
this.concurrency = concurrency;
this.emitter = new EventEmitter();
}
/**
* Add a operation into queue.
*
* @param {Operation} operation
* @memberof Batch
*/
public addOperation(operation: Operation): void {
this.operations.push(async () => {
try {
this.actives++;
await operation();
this.actives--;
this.completed++;
this.parallelExecute();
} catch (error) {
this.emitter.emit("error", error);
}
});
}
/**
* Start execute operations in the queue.
*
* @returns {Promise<void>}
* @memberof Batch
*/
public async do(): Promise<void> {
if (this.operations.length === 0) {
return Promise.resolve();
}
this.parallelExecute();
return new Promise<void>((resolve, reject) => {
this.emitter.on("finish", resolve);
this.emitter.on("error", (error) => {
this.state = BatchStates.Error;
reject(error);
});
});
}
/**
* Get next operation to be executed. Return null when reaching ends.
*
* @private
* @returns {(Operation | null)}
* @memberof Batch
*/
private nextOperation(): Operation | null {
if (this.offset < this.operations.length) {
return this.operations[this.offset++];
}
return null;
}
/**
* Start execute operations. One one the most important difference between
* this method with do() is that do() wraps as an sync method.
*
* @private
* @returns {void}
* @memberof Batch
*/
private parallelExecute(): void {
if (this.state === BatchStates.Error) {
return;
}
if (this.completed >= this.operations.length) {
this.emitter.emit("finish");
return;
}
while (this.actives < this.concurrency) {
const operation = this.nextOperation();
if (operation) {
operation();
} else {
return;
}
}
}
}