diff --git a/package-lock.json b/package-lock.json index 7666250..c7d61c5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1911,6 +1911,11 @@ "integrity": "sha1-Qa4u62XvpiJorr/qg6x9eSmbCIc=", "dev": true }, + "eventemitter3": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.0.tgz", + "integrity": "sha512-qerSRB0p+UDEssxTtm6EDKcE7W4OaoisfIMl4CngyEhjpYglocpNg6UEqCvemdGhosAsg4sO2dXJOdyBifPGCg==" + }, "exec-sh": { "version": "0.2.2", "resolved": "https://registry.npmjs.org/exec-sh/-/exec-sh-0.2.2.tgz", @@ -5924,8 +5929,7 @@ "p-finally": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/p-finally/-/p-finally-1.0.0.tgz", - "integrity": "sha1-P7z7FbiZpEEjs0ttzBi3JDNqLK4=", - "dev": true + "integrity": "sha1-P7z7FbiZpEEjs0ttzBi3JDNqLK4=" }, "p-is-promise": { "version": "2.1.0", @@ -5933,15 +5937,6 @@ "integrity": "sha512-Y3W0wlRPK8ZMRbNq97l4M5otioeA5lm1z7bkNkxCka8HSPjR0xRWmpCmc9utiaLP9Jb1eD8BgeIxTW4AIF45Pg==", "dev": true }, - "p-limit": { - "version": "1.3.0", - "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-1.3.0.tgz", - "integrity": "sha512-vvcXsLAJ9Dr5rQOPk7toZQZJApBl2K4J6dANSsEuh6QI41JYcsS/qhTGa9ErIUUgK3WNQoJYvylxvjqmiqEA9Q==", - "dev": true, - "requires": { - "p-try": "^1.0.0" - } - }, "p-locate": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/p-locate/-/p-locate-2.0.0.tgz", @@ -5949,13 +5944,41 @@ "dev": true, "requires": { "p-limit": "^1.1.0" + }, + "dependencies": { + "p-limit": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-1.3.0.tgz", + "integrity": "sha512-vvcXsLAJ9Dr5rQOPk7toZQZJApBl2K4J6dANSsEuh6QI41JYcsS/qhTGa9ErIUUgK3WNQoJYvylxvjqmiqEA9Q==", + "dev": true, + "requires": { + "p-try": "^1.0.0" + } + }, + "p-try": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/p-try/-/p-try-1.0.0.tgz", + "integrity": "sha1-y8ec26+P1CKOE/Yh8rGiN8GyB7M=", + "dev": true + } } }, - "p-try": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/p-try/-/p-try-1.0.0.tgz", - "integrity": "sha1-y8ec26+P1CKOE/Yh8rGiN8GyB7M=", - "dev": true + "p-queue": { + "version": "6.2.0", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-6.2.0.tgz", + "integrity": "sha512-B2LXNONcyn/G6uz2UBFsGjmSa0e/br3jznlzhEyCXg56c7VhEpiT2pZxGOfv32Q3FSyugAdys9KGpsv3kV+Sbg==", + "requires": { + "eventemitter3": "^4.0.0", + "p-timeout": "^3.1.0" + } + }, + "p-timeout": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-3.2.0.tgz", + "integrity": "sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==", + "requires": { + "p-finally": "^1.0.0" + } }, "pako": { "version": "1.0.10", diff --git a/package.json b/package.json index ae07993..92d924c 100644 --- a/package.json +++ b/package.json @@ -68,6 +68,7 @@ "typescript": "^3.0.3" }, "dependencies": { + "p-queue": "6.2.0", "pako": "^1.0.10", "ts-interface-checker": "^0.1.9" } diff --git a/src/core/index.ts b/src/core/index.ts index a0d645e..fe5aef4 100644 --- a/src/core/index.ts +++ b/src/core/index.ts @@ -14,6 +14,17 @@ import { ValueError, PermissionError, KeyError } from '../errors'; import { Codec } from "../compression/types"; import { getCodec } from "../compression/creation"; +import PQueue from 'p-queue'; + +// TODO: add similar optimizations for `Set` +interface GetOptions { + concurrencyLimit?: number; + progressCallback?: (progressUpdate: { + progress: number; + queueSize: number; + }) => void; +} + export class ZarrArray { public store: Store; @@ -237,35 +248,34 @@ export class ZarrArray { } } - public get(selection?: undefined | Slice | ":" | "..." | null | (Slice | null | ":" | "...")[]): Promise>; - public get(selection?: ArraySelection): Promise | number>; - public get(selection: ArraySelection = null): Promise | number> { - return this.getBasicSelection(selection); + public get(selection?: undefined | Slice | ":" | "..." | null | (Slice | null | ":" | "...")[], opts?: GetOptions): Promise>; + public get(selection?: ArraySelection, opts?: GetOptions): Promise | number>; + public get(selection: ArraySelection = null, opts: GetOptions = {}): Promise | number> { + return this.getBasicSelection(selection, opts); } - public async getBasicSelection(selection: Slice | ":" | "..." | null | (Slice | null | ":" | "...")[]): Promise>; - public async getBasicSelection(selection: ArraySelection): Promise | number>; - public async getBasicSelection(selection: ArraySelection): Promise> { + public async getBasicSelection(selection: Slice | ":" | "..." | null | (Slice | null | ":" | "...")[], opts?: GetOptions): Promise>; + public async getBasicSelection(selection: ArraySelection, opts?: GetOptions): Promise | number>; + public async getBasicSelection(selection: ArraySelection, { concurrencyLimit = 10, progressCallback }: GetOptions = {}): Promise> { // Refresh metadata if (!this.cacheMetadata) { await this.reloadMetadata(); } // Check fields (TODO?) - if (this.shape === []) { throw new Error("Shape [] indexing is not supported yet"); } else { - return this.getBasicSelectionND(selection); + return this.getBasicSelectionND(selection, concurrencyLimit, progressCallback); } } - private getBasicSelectionND(selection: ArraySelection) { + private getBasicSelectionND(selection: ArraySelection, concurrencyLimit: number, progressCallback?: (progressUpdate: { progress: number; queueSize: number }) => void) { const indexer = new BasicIndexer(selection, this); - return this.getSelection(indexer); + return this.getSelection(indexer, concurrencyLimit, progressCallback); } - private async getSelection(indexer: BasicIndexer) { + private async getSelection(indexer: BasicIndexer, concurrencyLimit: number, progressCallback?: (progressUpdate: { progress: number; queueSize: number }) => void) { // We iterate over all chunks which overlap the selection and thus contain data // that needs to be extracted. Each chunk is processed in turn, extracting the // necessary data and storing into the correct location in the output array. @@ -284,10 +294,34 @@ export class ZarrArray { return out; } - for (const proj of indexer.iter()) { - await this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes); + // create promise queue with concurrency control + const queue = new PQueue({ concurrency: concurrencyLimit }); + + if (progressCallback) { + + let queueSize = 0; + for (const _ of indexer.iter()) queueSize += 1; + + let progress = 0; + for (const proj of indexer.iter()) { + (async () => { + await queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes)); + progress += 1; + progressCallback({ progress: progress, queueSize: queueSize }); + })(); + } + + } else { + + for (const proj of indexer.iter()) { + queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes)); + } + } + // gaurantess that all work on queue has finished + await queue.onIdle(); + // Return scalar instead of zero-dimensional array. if (out.shape.length === 0) { return out.data[0] as number; @@ -330,7 +364,7 @@ export class ZarrArray { if (dropAxes !== null) { throw new Error("Drop axes is not supported yet"); } - + out.set(outSelection, tmp as NestedArray); } else { // Chunk isn't there, use fill value @@ -368,7 +402,7 @@ export class ZarrArray { return this.compressor.decode(byteChunkData as any); } - // TODO filtering etc + // TODO filtering etc return byteChunkData.buffer; } @@ -505,7 +539,7 @@ export class ZarrArray { } private encodeChunk(chunk: TypedArray) { - + if (this.compressor !== null) { return this.compressor.encode(new Uint8Array(chunk.buffer)); }