-
Notifications
You must be signed in to change notification settings - Fork 1
/
pipeline.ts
680 lines (637 loc) · 24.1 KB
/
pipeline.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
/**
* @copyright 2021–2023 Hong Minhee
* @license LGPL-3.0-only
*/
import {
concat,
filter,
map,
} from "https://deno.land/x/aitertools@0.3.1/mod.ts";
import { MediaType } from "./media_type.ts";
import { LanguageTag } from "./language_tag.ts";
import {
Content,
ContentCriterion,
ContentFields,
toContentPredicate,
} from "./content.ts";
import { Resource } from "./resource.ts";
/** Represents a type `T` except for `undefined`. */
export type NonUndefined<T> = T extends undefined ? never : T;
/**
* Represents functions to summarize {@link Resource}s in a `pipeline` into
* {@link Resource}s.
* @param pipeline A pipeline with {@link Resource}s to summarize.
* @returns Summary as {@link Resource}s.
*/
export type PipelineSummarizer = (pipeline: Pipeline) =>
| AsyncIterable<Resource>
| Promise<Resource>
| Iterable<Resource>
| Resource;
/**
* Represents functions to transform a {@link Resource} into another
* {@link Resource}.
* @param resource A resource to be transformed.
* @returns A transformed resource.
*/
export type ResourceTransformer = (resource: Resource) => Resource;
/**
* Represents functions to divide a {@link Resource} into multiple
* {@link Resource}s.
* @param resource A resource to be divided into multiple resources.
* @returns Divided resources.
*/
export type ResourceDivider = (resource: Resource) =>
| Iterable<Resource>
| AsyncIterable<Resource>;
/**
* Represents functions to determine whether to filter out a {@link Resource}
* or not.
* @param resource A resource to be determined if it is filtered or not.
* @returns `true` means to remain the given `resource`, and `false` means to
* exclude it.
*/
export type ResourcePredicate = (resource: Resource) => boolean;
/**
* Represents functions to transform a {@link URL} into another {@link URL}.
* @param path A URL to be transformed.
* @returns A transformed URL.
*/
export type PathTransformer = (path: URL) => URL;
/**
* Represents functions to transform a {@link Content} into another
* {@link Content}.
* @param content A content to be transformed.
* @returns A transformed content.
*/
export type ContentTransformer = (content: Content) => Content;
/**
* Stream of multiple #resources with transformation. The whole process of
* build is based on this.
*
* Note that it as a data structure works as like sets rather than lists.
* There cannot be more {@link Resource}s than one with the duplicate `path`s.
* When a {@link Resource} having a duplicate `path` is added, its
* representations are merged into the existing {@link Resource}.
*/
export class Pipeline implements AsyncIterable<Resource> {
readonly #getResources: () => AsyncIterable<Resource>;
#buffer?: Resource[];
#merged: boolean;
#resourcesMonitor?: AsyncIterable<void>;
/**
* Creates a new pipeline with the given asynchronous iterable of
* {@link Resource}s.
* @param resources An iterable or asynchronous iterable of {@link Resource}s.
* As {@link Pipeline} objects in themselves are also
* asynchronous iterable, these can be passed here.
* If there are more {@link Resource}s than one with
* the duplicate `path`s, later one is ignored.
*/
constructor(resources: AsyncIterable<Resource> | Iterable<Resource>);
/**
* Creates a new pipeline with the function to get {@link Resource}s and
* a monitor to signal when resources are updated.
* @param resourcesGetter A function to load {@link Resource}s.
* If there are more {@link Resource}s than one with
* the duplicate `path`s, later one is ignored.
* @param resourcesMonitor A monitor to signal when resources are updated.
* When this asynchronous iterable yields,
* resources in the pipeline are reloaded using
* `resourcesGetter` function.
*/
constructor(
resourcesGetter: () => AsyncIterable<Resource>,
resourcesMonitor: AsyncIterable<void> | null,
);
constructor(
resourcesGetter:
| (() => AsyncIterable<Resource>)
| AsyncIterable<Resource>
| Iterable<Resource>,
resourcesMonitor?: AsyncIterable<void> | null,
) {
this.#getResources = typeof resourcesGetter == "function"
? resourcesGetter
: () =>
Symbol.asyncIterator in resourcesGetter
? resourcesGetter as AsyncIterable<Resource>
: toAsyncIterable(resourcesGetter as Iterable<Resource>);
if (resourcesGetter instanceof Array) {
this.#buffer = resourcesGetter;
}
if (resourcesMonitor != null) {
this.#resourcesMonitor = map(() => {
this.#buffer = undefined;
this.#merged = false;
}, resourcesMonitor);
}
this.#merged = false;
}
/**
* Gets the last time any resource/content was modified.
* @returns The last time any resource/content in the pipeline was modified.
* It can be `null` if there are no resources at all.
*/
async getLastModified(): Promise<Date | null> {
let result: Date | null = null;
for await (const r of this.getResources()) {
const modified = r.lastModified;
if (result == null || modified > result) {
result = modified;
}
}
return result;
}
private async *getResources(): AsyncIterableIterator<Resource> {
if (this.#buffer == null) {
const buffer: Resource[] = [];
for await (const resource of this.#getResources()) {
buffer.push(resource);
yield resource;
}
this.#buffer = buffer;
} else {
yield* this.#buffer;
}
}
async *[Symbol.asyncIterator](): AsyncIterableIterator<Resource> {
if (this.#merged) {
yield* this.getResources();
return;
}
const resources = new Map<string, Resource[]>();
for await (const resource of this.getResources()) {
const path = resource.path.href;
const array = resources.get(path);
if (array == null) resources.set(path, [resource]);
else array.push(resource);
}
const buffer: Resource[] = [];
for (const array of resources.values()) {
let resource: Resource = array[0];
if (array.length > 1) {
const contents: Content[] = [];
for (const r of array) {
for (const c of r) contents.push(c);
}
resource = new Resource(resource.path, contents);
}
yield resource;
buffer.push(resource);
}
this.#buffer = buffer;
this.#merged = true;
}
/**
* Merges all {@link Resource}s in two pipelines into a distinct pipeline.
* @param pipeline A pipeline to merge with. Resources having overlapped
* paths in it are ignored in the merged pipeline.
* @returns A distinct pipeline having all {@link Resource}s in two pipelines.
*/
union(pipeline: Pipeline): Pipeline {
return new Pipeline(
() => concat(this.getResources(), pipeline.getResources()),
this.#resourcesMonitor ?? null,
);
}
/**
* Adds a `resource` to the pipeline.
* @param resource A resource to add. If its `path` is duplicate with
* existing resource paths, existing one is replaced by it.
* @returns A distinct pipeline having the added `resource` besides existing
* ones.
*/
add(resource: Resource): Pipeline {
return new Pipeline(
() => concat([resource], this.getResources()),
this.#resourcesMonitor ?? null,
);
}
/**
* Summarize {@link Resource}s in the pipeline into new {@link Resource}s,
* and adds them into a distinct pipeline besides existing {@link Resource}s.
*
* If a summary's `path` is duplicate with any `path` of existing
* resources, the existing resource is replaced by the summary.
* @param summarizer A function to summarize {@link Resource}s in the pipeline
* into new {@link Resource}s.
* @param predicate An optional predicate to filter {@link Resource}s in
* the pipeline to summarize. Only {@link Resource}s
* satisfying the predicate are summarized.
* Include all {@link Resource}s by default.
* @returns A distinct pipeline having new summary {@link Resource}s
* besides existing {@link Resource}s.
*/
addSummaries(
summarizer: PipelineSummarizer,
predicate?: ResourcePredicate,
): Pipeline {
return new Pipeline(() => {
const summary = summarizer(
predicate == null ? this : this.filter(predicate),
);
if (summary instanceof Promise) {
const iter = async function* (): AsyncIterable<Resource> {
yield await summary;
};
return concat(iter(), this.getResources());
} else if (summary instanceof Resource) {
return concat([summary], this.getResources());
}
return concat(summary, this.getResources());
}, this.#resourcesMonitor ?? null);
}
/**
* Transforms the {@link Resource}s in the pipeline, and returns a distinct
* pipeline. Note that this does not mutate the pipeline in-place.
*
* To transform {@link Content}s or {@link Resource}s' paths instead,
* use {@link transform} or {@link move} higher-order functions.
* @param transformers Resource transformers.
* @returns A distinct pipeline with transformed {@link Resource}s.
*/
map(...transformers: ResourceTransformer[]): Pipeline {
return new Pipeline(
() => {
let resources: AsyncIterable<Resource> = this[Symbol.asyncIterator]();
for (const transformer of transformers) {
resources = map(transformer, resources);
}
return resources;
},
this.#resourcesMonitor ?? null,
);
}
private mapWithoutMerge(transformer: ResourceTransformer): Pipeline {
return new Pipeline(
() => map(transformer, this.getResources()),
this.#resourcesMonitor ?? null,
);
}
/**
* Divides every single {@link Resource} in the pipeline into multiple
* {@link Resource}s (unless their paths overlap), and returns a distinct
* pipeline. Note that this does not mutate the pipeline in-place.
*
* If divided {@link Resource}s' paths overlap each other, their contents
* belong together in a single {@link Resource}.
* @param divider A function to divide a {@link Resource} into multiple
* {@link Resource}s.
* @param predicate An optional predicate to filter {@link Resource}s to
* divide. Only {@link Resource}s satisfying the predicate
* are divided. Include all {@link Resource}s by default.
* @returns A distinct pipeline with divided {@link Resource}s.
*/
divide(divider: ResourceDivider, predicate?: ResourcePredicate): Pipeline {
const getResources = () => this[Symbol.asyncIterator]();
return new Pipeline(
async function* (): AsyncIterable<Resource> {
for await (const r of getResources()) {
if (predicate == null || predicate(r)) {
for await (const divided of divider(r)) {
yield divided;
}
} else {
yield r;
}
}
},
this.#resourcesMonitor ?? null,
);
}
/**
* Remains only {@link Resource}s satisfying all given `predicates` in
* the pipeline, and returns a distinct pipeline with only remained
* {@link Resource}s. Note that this does not mutate the pipeline in-place.
* @param predicates One or more predicate functions that should be all
* satisfied.
* @returns A distinct pipeline with only remained {@link Resource}s, which
* satisfy all given `predicates`.
*/
filter(...predicates: ResourcePredicate[]): Pipeline {
return new Pipeline(
() => {
let resources: AsyncIterable<Resource> = this[Symbol.asyncIterator]();
for (const pred of predicates) {
resources = filter(pred, resources);
}
return resources;
},
this.#resourcesMonitor ?? null,
);
}
/**
* Organizes resources in the pipeline into a map of groups, where keys are
* common values among resources in each group and values are resources
* having their common group values. Key is determined by a function passed
* through the parameter `grouper`.
*
* Note that keys cannot be `undefined`.
* @param grouper A function to extract the commons values from resources
* to determine groups they belong to. To exclude a resource
* from any groups return `undefined`.
* @returns A map of groups, where keys are common values among resources
* in each group and values are resources having their common group
* values (which are determined by `grouper`).
*/
async groupBy<T>(
grouper: (resource: Resource) => T | undefined,
): Promise<Map<NonUndefined<T>, ResourceSet>> {
const groups = new Map<NonUndefined<T>, ResourceSet>();
await this.forEach((resource: Resource) => {
const groupKey: T | undefined = grouper(resource);
if (groupKey !== undefined) {
const key = groupKey as NonUndefined<T>;
const group = groups.get(key);
if (group === undefined) groups.set(key, new ResourceSet([resource]));
else group.add(resource);
}
});
return groups;
}
/**
* Executes a provided `callback` function once for each {@link Resource}.
* Unlike {@link map} method, this returns a promise resolving nothing.
* Note that this guarantees nothing about its execution order.
* @param callback A callback function to be invoked with each
* {@link Resource}, and its index.
*/
async forEach(
callback:
| ((resource: Resource, index: number) => Promise<void>)
| ((resource: Resource, index: number) => void)
| ((resource: Resource) => Promise<void>)
| ((resource: Resource) => void),
): Promise<void> {
let i = 0;
const promises: Promise<void>[] = [];
for await (const resource of this) {
const p = callback(resource, i++);
if (p instanceof Promise) {
promises.push(p);
}
}
await Promise.all(promises);
}
/**
* Transforms paths of all {@link Resource}s in the {@link Pipeline} into
* other paths, and returns a distinct {@link Pipeline} with paths.
* @param transformer A function transforming a path into other one.
* @returns A distinct pipeline with `path`-transformed {@link Resource}s.
*/
move(transformer: PathTransformer): Pipeline {
return this.mapWithoutMerge(move(transformer));
}
/**
* Transforms {@link Content}s of all {@link Resource}s in
* the {@link Pipeline} into other {@link Content}s, and returns a distinct
* {@link Pipeline} with transformed {@link Content}s.
*
* In order to add transformed {@link Contents} with maintaining existing
* {@link Content}s instead of replacing them, use {@link Pipeline#diversify}
* method instead.
* @param transformer A content transformer to apply to {@link Content}s in
* {@link Resource}s.
* @param criterion An optional criterion to transform {@link Content}s.
* {@link Content}s that do not satisfy the criterion will
* not be transformed, but will be as these are.
* @returns A distinct pipeline with transformed {@link Content}s.
*/
transform(
transformer: ContentTransformer,
criterion?: ContentCriterion,
): Pipeline {
return this.mapWithoutMerge(transform(transformer, criterion));
}
/**
* Similar to {@link Pipeline#transform}, except that it does not replace
* existing {@link Content}s in {@link Resource}s, but adds transformed
* {@link Content}s with maintaining existing {@link Content}s instead.
* @param transformer A content transformer to apply to {@link Content}s in
* {@link Resource}s.
* @param criterion An optional filter to transform {@link Content}s.
* {@link Content}s that do not satisfy the filter will not
* be transformed.
* @returns A distinct pipeline with transformed {@link Content}s.
*/
diversify(
transformer: ContentTransformer,
criterion?: ContentCriterion,
): Pipeline {
return this.mapWithoutMerge(diversify(transformer, criterion));
}
/**
* Similar to {@link Pipeline#forEach}, except that this monitors if resources
* are updated, and reloads them if they are.
* Unlike {@link Pipeline#forEach}, this usually does not terminate unless
* the monitor is terminated or the pipeline has no monitor.
* Its most common usage is to watch files in a directory for changes,
* and rebuild the pipeline when changes are detected.
* @param callback A callback function to be invoked with each
* {@link Resource}, and its index.
* @param onReloaded A callback function to be invoked when resource update
* is detected and it is to be reloaded.
*/
async forEachWithReloading(
callback:
| ((resource: Resource, index: number) => Promise<void>)
| ((resource: Resource, index: number) => void)
| ((resource: Resource) => Promise<void>)
| ((resource: Resource) => void),
onReloaded?: (() => Promise<void>) | (() => void),
): Promise<void> {
await this.forEach(callback);
if (this.#resourcesMonitor != null) {
for await (const _ of this.#resourcesMonitor) {
if (onReloaded != null) {
const p = onReloaded();
if (p instanceof Promise) await p;
}
await this.forEach(callback);
}
}
}
}
/**
* {@link Set} of {@link Resource}s, with an additional property
* {@link #lastModified}.
*/
export class ResourceSet extends Set<Resource> {
/**
* The last modified time of the {@link Resource}s in this set.
*/
get lastModified(): Date {
let maxTime = 0;
for (const resource of this) {
const time = resource.lastModified.getTime();
if (time > maxTime) maxTime = time;
}
return new Date(maxTime);
}
}
/**
* Turns an iterable into an async iterable.
* @param iterable An iterable to turn into an async iterable.
* @returns The corresponding async iterable.
*/
async function* toAsyncIterable<T>(iterable: Iterable<T>): AsyncIterable<T> {
yield* iterable;
}
/**
* Turns a {@link PathTransformer} into a {@link ResourceTransformer}.
*
* Designed to work with {@link Pipeline#map} method.
* @param transformer A path transformer to turn into
* a {@link ResourceTransformer}.
* @returns A resource transformer turned from the given path transformer.
*/
export function move(transformer: PathTransformer): ResourceTransformer {
return (resource: Resource): Resource =>
resource.move(transformer(resource.path));
}
/**
* Turns a {@link ContentTransformer} into a {@link ResourceTransformer}.
*
* In order to add transformed {@link Contents} with maintaining existing
* {@link Content}s instead of replacing them, use {@link diversify} function
* instead.
*
* Designed to work with {@link Pipeline#map} method.
* @param transformer A content transformer to apply to {@link Content}s in
* {@link Resource}s.
* @param criterion An optional criterion to transform {@link Content}s.
* {@link Content}s that do not satisfy the criterion will not
* be transformed, but will be as these are.
* @returns A resource transformer turned from the given content transformer.
*/
export function transform(
transformer: ContentTransformer,
criterion?: ContentCriterion,
): ResourceTransformer {
const satisfies = toContentPredicate(criterion);
function* transformRepresentations(resource: Resource) {
for (const repr of resource) {
yield satisfies(repr) ? transformer(repr) : repr;
}
}
return (resource: Resource): Resource =>
new Resource(resource.path, transformRepresentations(resource));
}
/**
* Similar to {@link transform}, except that it does not replace existing
* {@link Content}s in {@link Resource}s, but adds transformed {@link Content}s
* with maintaining existing {@link Content}s instead.
*
* Designed to work with {@link Pipeline#map} method.
* @param transformer A content transformer to apply to {@link Content}s in
* {@link Resource}s.
* @param criterion An optional criterion to transform {@link Content}s.
* {@link Content}s that do not satisfy the criterion will not
* be transformed.
* @returns A resource transformer which diversifies {@link Content}s in
* the given {@link Resource}.
*/
export function diversify(
transformer: ContentTransformer,
criterion?: ContentCriterion,
): ResourceTransformer {
const satisfies = toContentPredicate(criterion);
function* diverseRepresentations(resource: Resource) {
for (const repr of resource) {
yield repr;
if (satisfies(repr)) {
yield transformer(repr);
}
}
}
return (resource: Resource): Resource =>
new Resource(resource.path, diverseRepresentations(resource));
}
/**
* Gets a content transformer which updates a given {@link Content}'s some
* fields to the specific values.
*
* Designed to work with {@link Pipeline#transform} or
* {@link Pipeline#diversify} methods.
* @param fields The field values which will be filled to {@link Content}s'
* corresponding fields. See also {@link Content#replace} method
* and {@link ContentFields} interface for how to use.
* @returns A content transformer which updates a given {@link Content}'s
* fields to the specific values.
* @throws {MediaTypeError} Thrown when the given `fields.type` is not a valid
* IANA media type.
* @throws {LanguageTagError} Thrown when the given `fields.language` is not
* a valid RFC 5646 language tag.
*/
export function replace(fields: ContentFields): ContentTransformer {
if (typeof fields.type == "string") {
fields = { ...fields, type: MediaType.fromString(fields.type) };
}
if (typeof fields.language == "string") {
fields = { ...fields, language: LanguageTag.fromString(fields.language) };
}
return (content: Content): Content => content.replace(fields);
}
/**
* Gets a {@link ResourcePredicate} which checks whether a given
* {@link Resource} has any representations satisfying the given
* `contentCriterion`.
*
* Designed to work with {@link Pipeline#filter} method.
* @param contentCriterion A criterion to check resource's representations.
* @returns A function which checks whether a given {@link Resource} has any
* representations satisfying the given `contentCriterion`.
*/
export function anyRepresentations(
contentCriterion: ContentCriterion,
): ResourcePredicate {
if (typeof contentCriterion === "undefined") return (_) => true;
else if (typeof contentCriterion === "function") {
return (resource: Resource): boolean => {
for (const repr of resource) {
if (contentCriterion(repr)) return true;
}
return false;
};
} else {
return (resource: Resource): boolean => {
for (const repr of resource) {
if (repr.matches(contentCriterion)) return true;
}
return false;
};
}
}
/**
* Gets a {@link ResourcePredicate} which checks whether a given
* {@link Resource}'s all representations satisfy the given `contentCriterion`.
*
* Designed to work with {@link Pipeline#filter} method.
* @param contentCriterion A criterion to check resource's representations.
* @returns A function which checks whether a given {@link Resource}'s all
* representations satisfy the given `contentCriterion`.
*/
export function allRepresentations(
contentCriterion: ContentCriterion,
): ResourcePredicate {
if (typeof contentCriterion === "undefined") return (_) => true;
else if (typeof contentCriterion === "function") {
return (resource: Resource): boolean => {
for (const repr of resource) {
if (!contentCriterion(repr)) return false;
}
return true;
};
} else {
return (resource: Resource): boolean => {
for (const repr of resource) {
if (!repr.matches(contentCriterion)) return false;
}
return true;
};
}
}
export { Content, LanguageTag, MediaType, Resource };
export default Pipeline;