/
BlobQuickQueryStream.ts
151 lines (140 loc) · 4.94 KB
/
BlobQuickQueryStream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import { Readable } from "stream";
import { AbortSignalLike } from "@azure/abort-controller";
import { TransferProgressEvent } from "@azure/core-http";
import { AvroReadableFromStream, AvroReader } from "../../../storage-internal-avro/src";
import { BlobQueryError } from "../Clients";
export interface BlobQuickQueryStreamOptions {
/**
* An implementation of the `AbortSignalLike` interface to signal the request to cancel the operation.
* For example, use the @azure/abort-controller to create an `AbortSignal`.
*
* @type {AbortSignalLike}
* @memberof BlobQuickQueryStreamOptions
*/
abortSignal?: AbortSignalLike;
/**
* Read progress event handler
*
* @memberof BlobQuickQueryStreamOptions
*/
onProgress?: (progress: TransferProgressEvent) => void;
/**
* Callback to receive error events during the query operaiton.
*
* @memberof BlockBlobQueryOptions
*/
onError?: (error: BlobQueryError) => void;
}
/**
* ONLY AVAILABLE IN NODE.JS RUNTIME.
*
* A Node.js BlobQuickQueryStream will internally parse avro data stream for blob query.
*
* @class BlobQuickQueryStream
* @extends {Readable}
*/
export class BlobQuickQueryStream extends Readable {
private source: NodeJS.ReadableStream;
private avroReader: AvroReader;
private avroIter: AsyncIterableIterator<Object | null>;
private avroPaused: boolean = true;
private onProgress?: (progress: TransferProgressEvent) => void;
private onError?: (error: BlobQueryError) => void;
/**
* Creates an instance of BlobQuickQueryStream.
*
* @param {NodeJS.ReadableStream} source The current ReadableStream returned from getter
* @param {BlobQuickQueryStreamOptions} [options={}]
* @memberof BlobQuickQueryStream
*/
public constructor(source: NodeJS.ReadableStream, options: BlobQuickQueryStreamOptions = {}) {
super();
this.source = source;
this.onProgress = options.onProgress;
this.onError = options.onError;
this.avroReader = new AvroReader(new AvroReadableFromStream(this.source));
this.avroIter = this.avroReader.parseObjects({ abortSignal: options.abortSignal });
}
public _read() {
if (this.avroPaused) {
this.readInternal().catch((err) => {
this.emit("error", err);
});
}
}
private async readInternal() {
this.avroPaused = false;
let avroNext;
do {
avroNext = await this.avroIter.next();
if (avroNext.done) {
break;
}
const obj = avroNext.value;
const schema = (obj as any).$schema;
if (typeof schema !== "string") {
throw Error("Missing schema in avro record.");
}
switch (schema) {
case "com.microsoft.azure.storage.queryBlobContents.resultData":
const data = (obj as any).data;
if (data instanceof Uint8Array === false) {
throw Error("Invalid data in avro result record.");
}
if (!this.push(Buffer.from(data))) {
this.avroPaused = true;
}
break;
case "com.microsoft.azure.storage.queryBlobContents.progress":
const bytesScanned = (obj as any).bytesScanned;
if (typeof bytesScanned !== "number") {
throw Error("Invalid bytesScanned in avro progress record.");
}
if (this.onProgress) {
this.onProgress({ loadedBytes: bytesScanned });
}
break;
case "com.microsoft.azure.storage.queryBlobContents.end":
if (this.onProgress) {
const totalBytes = (obj as any).totalBytes;
if (typeof totalBytes !== "number") {
throw Error("Invalid totalBytes in avro end record.");
}
this.onProgress({ loadedBytes: totalBytes });
}
this.push(null);
break;
case "com.microsoft.azure.storage.queryBlobContents.error":
if (this.onError) {
const fatal = (obj as any).fatal;
if (typeof fatal !== "boolean") {
throw Error("Invalid fatal in avro error record.");
}
const name = (obj as any).name;
if (typeof name !== "string") {
throw Error("Invalid name in avro error record.");
}
const description = (obj as any).description;
if (typeof description !== "string") {
throw Error("Invalid description in avro error record.");
}
const position = (obj as any).position;
if (typeof position !== "number") {
throw Error("Invalid position in avro error record.");
}
this.onError({
position,
name,
isFatal: fatal,
description
});
}
break;
default:
throw Error(`Unknown schema ${schema} in avro progress record.`);
}
} while (!avroNext.done && !this.avroPaused);
}
}