/
cursor.ts
98 lines (85 loc) 路 2.28 KB
/
cursor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import { Long } from "../../deps.ts";
import { Document } from "../types.ts";
import { parseNamespace } from "../utils/ns.ts";
import { WireProtocol } from "./protocol.ts";
export interface CommandCursorOptions<T> {
id: bigint | number | string;
ns: string;
firstBatch: T[];
maxTimeMS?: number;
comment?: Document;
}
export class CommandCursor<T> {
#id?: bigint;
#protocol: WireProtocol;
#batches: T[] = [];
#db?: string;
#collection?: string;
#executor: () => Promise<CommandCursorOptions<T>>;
#executed = false;
constructor(
protocol: WireProtocol,
executor: () => Promise<CommandCursorOptions<T>>,
) {
this.#protocol = protocol;
this.#executor = executor;
}
private async execute() {
this.#executed = true;
const options = await this.#executor();
this.#batches = options.firstBatch;
this.#id = BigInt(options.id);
const { db, collection } = parseNamespace(options.ns);
this.#db = db;
this.#collection = collection;
}
async next(): Promise<T | undefined> {
if (this.#batches.length > 0) {
return this.#batches.shift();
}
if (!this.#executed) {
await this.execute();
return this.#batches.shift();
}
if (this.#id === 0n) {
return undefined;
}
const { cursor } = await this.#protocol.commandSingle(this.#db!, {
getMore: Long.fromBigInt(this.#id!),
collection: this.#collection,
});
this.#batches = cursor.nextBatch || [];
this.#id = BigInt(cursor.id.toString());
return this.#batches.shift();
}
async *[Symbol.asyncIterator](): AsyncGenerator<T> {
while (this.#batches.length > 0 || this.#id !== 0n) {
const value = await this.next();
if (value !== undefined) {
yield value;
}
}
}
async forEach(callback: (item: T, index: number) => void) {
let index = 0;
for await (const item of this) {
if (item) {
callback(item, index++);
}
}
}
async map<M>(callback: (item: T, index: number) => M): Promise<M[]> {
let index = 0;
const result = [];
for await (const item of this) {
if (item) {
const newItem = callback(item, index++);
result.push(newItem);
}
}
return result;
}
toArray(): Promise<T[]> {
return this.map((item) => item);
}
}