Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async indexing with concurrency limit (and progress callback) #11

Merged
merged 9 commits into from
Jan 15, 2020
55 changes: 39 additions & 16 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"typescript": "^3.0.3"
},
"dependencies": {
"p-queue": "6.2.0",
"pako": "^1.0.10",
"ts-interface-checker": "^0.1.9"
}
Expand Down
68 changes: 51 additions & 17 deletions src/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ import { ValueError, PermissionError, KeyError } from '../errors';
import { Codec } from "../compression/types";
import { getCodec } from "../compression/creation";

import PQueue from 'p-queue';

// TODO: add similar optimizations for `Set`
interface GetOptions {
concurrencyLimit?: number;
manzt marked this conversation as resolved.
Show resolved Hide resolved
progressCallback?: (progressUpdate: {
progress: number;
queueSize: number;
}) => void;
}

export class ZarrArray {

public store: Store;
Expand Down Expand Up @@ -237,35 +248,34 @@ export class ZarrArray {
}
}

public get(selection?: undefined | Slice | ":" | "..." | null | (Slice | null | ":" | "...")[]): Promise<NestedArray<TypedArray>>;
public get(selection?: ArraySelection): Promise<NestedArray<TypedArray> | number>;
public get(selection: ArraySelection = null): Promise<NestedArray<TypedArray> | number> {
return this.getBasicSelection(selection);
public get(selection?: undefined | Slice | ":" | "..." | null | (Slice | null | ":" | "...")[], opts?: GetOptions): Promise<NestedArray<TypedArray>>;
public get(selection?: ArraySelection, opts?: GetOptions): Promise<NestedArray<TypedArray> | number>;
public get(selection: ArraySelection = null, opts: GetOptions = {}): Promise<NestedArray<TypedArray> | number> {
return this.getBasicSelection(selection, opts);
}

public async getBasicSelection(selection: Slice | ":" | "..." | null | (Slice | null | ":" | "...")[]): Promise<NestedArray<TypedArray>>;
public async getBasicSelection(selection: ArraySelection): Promise<NestedArray<TypedArray> | number>;
public async getBasicSelection(selection: ArraySelection): Promise<number | NestedArray<TypedArray>> {
public async getBasicSelection(selection: Slice | ":" | "..." | null | (Slice | null | ":" | "...")[], opts?: GetOptions): Promise<NestedArray<TypedArray>>;
public async getBasicSelection(selection: ArraySelection, opts?: GetOptions): Promise<NestedArray<TypedArray> | number>;
public async getBasicSelection(selection: ArraySelection, { concurrencyLimit = 10, progressCallback }: GetOptions = {}): Promise<number | NestedArray<TypedArray>> {
// Refresh metadata
if (!this.cacheMetadata) {
await this.reloadMetadata();
}

// Check fields (TODO?)

if (this.shape === []) {
throw new Error("Shape [] indexing is not supported yet");
} else {
return this.getBasicSelectionND(selection);
return this.getBasicSelectionND(selection, concurrencyLimit, progressCallback);
}
}

private getBasicSelectionND(selection: ArraySelection) {
private getBasicSelectionND(selection: ArraySelection, concurrencyLimit: number, progressCallback?: (progressUpdate: { progress: number; queueSize: number }) => void) {
const indexer = new BasicIndexer(selection, this);
return this.getSelection(indexer);
return this.getSelection(indexer, concurrencyLimit, progressCallback);
}

private async getSelection(indexer: BasicIndexer) {
private async getSelection(indexer: BasicIndexer, concurrencyLimit: number, progressCallback?: (progressUpdate: { progress: number; queueSize: number }) => void) {
// We iterate over all chunks which overlap the selection and thus contain data
// that needs to be extracted. Each chunk is processed in turn, extracting the
// necessary data and storing into the correct location in the output array.
Expand All @@ -284,10 +294,34 @@ export class ZarrArray {
return out;
}

for (const proj of indexer.iter()) {
await this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes);
// create promise queue with concurrency control
const queue = new PQueue({ concurrency: concurrencyLimit });

if (progressCallback) {
manzt marked this conversation as resolved.
Show resolved Hide resolved

let queueSize = 0;
for (const _ of indexer.iter()) queueSize += 1;

let progress = 0;
for (const proj of indexer.iter()) {
(async () => {
await queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes));
progress += 1;
progressCallback({ progress: progress, queueSize: queueSize });
})();
}

} else {

for (const proj of indexer.iter()) {
queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes));
}

}

// gaurantess that all work on queue has finished
await queue.onIdle();

// Return scalar instead of zero-dimensional array.
if (out.shape.length === 0) {
return out.data[0] as number;
Expand Down Expand Up @@ -330,7 +364,7 @@ export class ZarrArray {
if (dropAxes !== null) {
throw new Error("Drop axes is not supported yet");
}

out.set(outSelection, tmp as NestedArray<T>);

} else { // Chunk isn't there, use fill value
Expand Down Expand Up @@ -368,7 +402,7 @@ export class ZarrArray {
return this.compressor.decode(byteChunkData as any);
}

// TODO filtering etc
// TODO filtering etc
return byteChunkData.buffer;
}

Expand Down Expand Up @@ -505,7 +539,7 @@ export class ZarrArray {
}

private encodeChunk(chunk: TypedArray) {

if (this.compressor !== null) {
return this.compressor.encode(new Uint8Array(chunk.buffer));
}
Expand Down