-
Notifications
You must be signed in to change notification settings - Fork 0
/
promise_array_parallel.test.ts
150 lines (144 loc) · 4.78 KB
/
promise_array_parallel.test.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import {
assert,
assertEquals,
assertRejects,
} from "https://deno.land/std@0.160.0/testing/asserts.ts";
import { PromiseArray } from "./promise_array_parallel.ts";
import { SeedableRandom, sleep } from "./util.ts";
Deno.test("First come, first served", async () => {
let parallelSizeX = 0;
let parallelSizeXMax = 0;
let parallelSizeY = 0;
let parallelSizeYMax = 0;
const r = new SeedableRandom();
let coming = 0;
let maxComing = 0;
const t = await PromiseArray
.from([...new Array(100)].map((_, i) => i))
.parallelWork(async () => {
parallelSizeX++;
parallelSizeXMax = Math.max(parallelSizeXMax, parallelSizeX);
await sleep(r.int(10, 200));
parallelSizeX--;
return coming++;
}, { parallelDegMax: 20 })
.parallelWork(async ({ idx, value }) => {
assertEquals(value, maxComing, "Executed in sequence");
maxComing++;
parallelSizeY++;
parallelSizeYMax = Math.max(parallelSizeYMax, parallelSizeY);
await sleep(r.int(10, 200));
parallelSizeY--;
return idx + 100;
}, { parallelDegMax: 10, priority: "COME" })
.all();
assert(parallelSizeX <= 20);
assert(parallelSizeY <= 10);
assertEquals(t, [...new Array(100)].map((_, i) => 100 + i));
});
Deno.test("First index, first served", async () => {
let parallelSizeX = 0;
let parallelSizeXMax = 0;
let parallelSizeY = 0;
let parallelSizeYMax = 0;
const r = new SeedableRandom();
let index = 0;
const t = await PromiseArray
.from([...new Array(100)].map((_, i) => i))
.parallelWork(async () => {
parallelSizeX++;
parallelSizeXMax = Math.max(parallelSizeXMax, parallelSizeX);
await sleep(r.int(10, 200));
parallelSizeX--;
}, { parallelDegMax: 20 })
.parallelWork(async ({ idx }) => {
assertEquals(idx, index, "Indexes come in order.");
index++;
parallelSizeY++;
parallelSizeYMax = Math.max(parallelSizeYMax, parallelSizeY);
await sleep(r.int(10, 200));
parallelSizeY--;
return idx + 100;
}, { parallelDegMax: 10, priority: "INDEX" })
.all();
assert(parallelSizeX <= 20);
assert(parallelSizeY <= 10);
assertEquals(t, [...new Array(100)].map((_, i) => 100 + i));
});
Deno.test("Rejected, all() throw error", async () => {
const r = new SeedableRandom();
const x = PromiseArray
.from([...new Array(100)].map((_, i) => i))
.parallelWork(async ({ idx }) => {
if (idx === 50) throw new Error("SOMETHING ERROR!");
await sleep(r.int(30, 100));
}, { parallelDegMax: 20 })
.parallelWork(async ({ idx }) => {
await sleep(r.int(30, 100));
return idx + 100;
}, { parallelDegMax: 10, priority: "INDEX" });
await assertRejects(() => x.all(), Error, "SOMETHING ERROR!");
});
Deno.test("Rejected, allSettled not throw error", async () => {
const r = new SeedableRandom();
const x = PromiseArray
.from([...new Array(100)].map((_, i) => i))
.parallelWork(async ({ idx }) => {
if (idx === 50) throw new Error("SOMETHING ERROR!");
await sleep(r.int(30, 100));
}, { parallelDegMax: 20 })
.parallelWork(async ({ idx }) => {
await sleep(r.int(30, 100));
return idx + 100;
}, { parallelDegMax: 10, priority: "INDEX" });
const allSettled = await x.allSettled().catch((reason) => {
console.log(reason);
return [];
});
assertEquals(
allSettled,
[...new Array(100)]
.map<PromiseSettledResult<number>>((_, i) => (
i === 50
? {
status: "rejected",
reason: "reason" in allSettled[50] ? allSettled[50].reason : null,
}
: { status: "fulfilled", value: 100 + i }
)),
);
});
Deno.test("maintain sequence", async () => {
const r = new SeedableRandom();
const x = await PromiseArray
.from([...new Array(100)].map((_, i) => i))
.parallelWork(async ({ value, idx }) => {
await sleep(r.int(30, 100));
return value + idx;
}, { parallelDegMax: 20, priority: "COME" })
.parallelWork(async ({ value, idx }) => {
await sleep(r.int(30, 100));
return value + idx;
}, { parallelDegMax: 10, priority: "INDEX" })
.parallelWork(async ({ value, idx }) => {
await sleep(r.int(30, 100));
return value + idx;
}, { parallelDegMax: 20, priority: "COME" })
.parallelWork(async ({ value, idx }) => {
await sleep(r.int(30, 100));
return value + idx;
}, { parallelDegMax: 10, priority: "COME" })
.parallelWork(async ({ value, idx }) => {
await sleep(r.int(30, 100));
return value + idx;
}, { parallelDegMax: 10, priority: "INDEX" })
.parallelWork(async ({ value, idx }) => {
await sleep(r.int(30, 100));
return value + idx;
}, { parallelDegMax: 10, priority: "INDEX" })
.all();
assertEquals(
x,
[...new Array(100)].map((_, i) => i * 7),
);
});