-
Notifications
You must be signed in to change notification settings - Fork 19
feat: add wrapper for reading table data using Storage API #431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
485b33d
39d3370
1ef1c9b
4bf1511
45a4afa
aa57c03
72afe00
7a5847a
6b98711
77fff01
98546f3
bd67c85
aaeb3bd
9fa976a
7382c34
63805b3
182d323
ac0a018
26d5b95
456f2d4
a02c634
4b41b3e
d266408
0d1af64
88449ea
f811ead
898ae4b
6a86580
872f0f3
052713b
94886cc
6b9f68c
67baeca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
// Copyright 2024 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
import {ResourceStream} from '@google-cloud/paginator'; | ||
import {RecordBatch} from 'apache-arrow'; | ||
|
||
import * as protos from '../../protos/protos'; | ||
import {TableReference, ReadClient} from './read_client'; | ||
import {logger} from '../util/logger'; | ||
import { | ||
ArrowRawTransform, | ||
ArrowRecordBatchTransform, | ||
ArrowRecordReaderTransform, | ||
} from './arrow_transform'; | ||
import {ReadSession, GetStreamOptions} from './read_session'; | ||
import {ArrowFormat} from './data_format'; | ||
|
||
type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession; | ||
|
||
/** | ||
* A BigQuery Storage API Reader that can be used to read data | ||
* from BigQuery Tables using the Storage API in Arrow format. | ||
* | ||
* @class | ||
* @memberof reader | ||
*/ | ||
export class ArrowTableReader { | ||
private _tableRef: TableReference; | ||
private _session: ReadSession; | ||
|
||
/** | ||
* Creates a new ArrowTableReader instance. Usually created via | ||
* ReadClient.createArrowTableReader(). | ||
* | ||
* @param {ReadClient} readClient - Storage Read Client. | ||
* @param {TableReference} table - target table to read data from. | ||
*/ | ||
constructor(readClient: ReadClient, tableRef: TableReference) { | ||
this._tableRef = tableRef; | ||
this._session = new ReadSession(readClient, tableRef, ArrowFormat); | ||
} | ||
|
||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment as others about the any |
||
private trace(msg: string, ...otherArgs: any[]) { | ||
logger( | ||
'arrow_table_reader', | ||
`[table: ${this._tableRef.tableId}]`, | ||
msg, | ||
...otherArgs | ||
); | ||
} | ||
|
||
getSessionInfo(): ReadSessionInfo | undefined | null { | ||
return this._session.getSessionInfo(); | ||
} | ||
|
||
/** | ||
* Get a byte stream of Arrow Record Batch. | ||
* | ||
* @param {GetStreamOptions} options | ||
*/ | ||
async getStream( | ||
options?: GetStreamOptions | ||
): Promise<ResourceStream<Uint8Array>> { | ||
this.trace('getStream', options); | ||
const stream = await this._session.getStream(options); | ||
return stream.pipe(new ArrowRawTransform()) as ResourceStream<Uint8Array>; | ||
leahecole marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
/** | ||
* Get a stream of Arrow RecordBatch objects. | ||
* | ||
* @param {GetStreamOptions} options | ||
*/ | ||
async getRecordBatchStream( | ||
options?: GetStreamOptions | ||
): Promise<ResourceStream<RecordBatch>> { | ||
this.trace('getRecordBatchStream', options); | ||
const stream = await this._session.getStream(options); | ||
const info = this._session.getSessionInfo(); | ||
return stream | ||
.pipe(new ArrowRawTransform()) | ||
.pipe(new ArrowRecordReaderTransform(info!)) | ||
.pipe(new ArrowRecordBatchTransform()) as ResourceStream<RecordBatch>; | ||
Comment on lines
+92
to
+95
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With this many streams piped, it is probably better to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll check how to use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tried to use The error that I got:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am envisioning something like
or
then you'd return that outputStream - are you thinking of something different? I won't die on this hill, but I really got burned by pipes recently and want to save you from that experience |
||
} | ||
|
||
close() { | ||
this._session.close(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
// Copyright 2024 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
import {Transform, TransformCallback} from 'stream'; | ||
import { | ||
RecordBatchReader, | ||
RecordBatch, | ||
RecordBatchStreamReader, | ||
Vector, | ||
} from 'apache-arrow'; | ||
import * as protos from '../../protos/protos'; | ||
|
||
type ReadRowsResponse = | ||
protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; | ||
type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; | ||
|
||
interface TableCell { | ||
v?: any; | ||
} | ||
interface TableRow { | ||
f?: Array<TableCell>; | ||
alvarowolfx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
/** | ||
* ArrowRawTransform implements a node stream Transform that reads | ||
* ReadRowsResponse from BigQuery Storage Read API and convert | ||
* a raw Arrow Record Batch. | ||
*/ | ||
export class ArrowRawTransform extends Transform { | ||
constructor() { | ||
super({ | ||
readableObjectMode: false, | ||
writableObjectMode: true, | ||
}); | ||
} | ||
|
||
_transform( | ||
response: ReadRowsResponse, | ||
_: BufferEncoding, | ||
callback: TransformCallback | ||
): void { | ||
if ( | ||
!( | ||
response.arrowRecordBatch && | ||
response.arrowRecordBatch.serializedRecordBatch | ||
) | ||
) { | ||
callback(null); | ||
return; | ||
} | ||
callback(null, response.arrowRecordBatch?.serializedRecordBatch); | ||
alvarowolfx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
/** | ||
* ArrowRecordReaderTransform implements a node stream Transform that reads | ||
* a byte stream of raw Arrow Record Batch and convert to a stream of Arrow | ||
* RecordBatchStreamReader. | ||
*/ | ||
export class ArrowRecordReaderTransform extends Transform { | ||
private session: ReadSession; | ||
|
||
constructor(session: ReadSession) { | ||
super({ | ||
objectMode: true, | ||
}); | ||
this.session = session; | ||
} | ||
|
||
_transform( | ||
serializedRecordBatch: Uint8Array, | ||
_: BufferEncoding, | ||
callback: TransformCallback | ||
): void { | ||
const buf = Buffer.concat([ | ||
this.session.arrowSchema?.serializedSchema as Uint8Array, | ||
serializedRecordBatch, | ||
]); | ||
const reader = RecordBatchReader.from(buf); | ||
callback(null, reader); | ||
} | ||
} | ||
|
||
/** | ||
* ArrowRecordBatchTransform implements a node stream Transform that reads | ||
* a RecordBatchStreamReader and convert a stream of Arrow RecordBatch. | ||
*/ | ||
export class ArrowRecordBatchTransform extends Transform { | ||
constructor() { | ||
super({ | ||
objectMode: true, | ||
}); | ||
} | ||
|
||
_transform( | ||
reader: RecordBatchStreamReader, | ||
_: BufferEncoding, | ||
callback: TransformCallback | ||
): void { | ||
const batches = reader.readAll(); | ||
for (const row of batches) { | ||
this.push(row); | ||
} | ||
callback(null); | ||
} | ||
} | ||
|
||
/** | ||
* ArrowRecordBatchTableRowTransform implements a node stream Transform that reads | ||
* an Arrow RecordBatch and convert a stream of BigQuery TableRow. | ||
*/ | ||
export class ArrowRecordBatchTableRowTransform extends Transform { | ||
constructor() { | ||
super({ | ||
objectMode: true, | ||
}); | ||
} | ||
|
||
_transform( | ||
batch: RecordBatch, | ||
_: BufferEncoding, | ||
callback: TransformCallback | ||
): void { | ||
const rows = new Array(batch.numRows); | ||
for (let i = 0; i < batch.numRows; i++) { | ||
rows[i] = { | ||
f: new Array(batch.numCols), | ||
}; | ||
} | ||
for (let j = 0; j < batch.numCols; j++) { | ||
const column = batch.selectAt([j]); | ||
const columnName = column.schema.fields[0].name; | ||
for (let i = 0; i < batch.numRows; i++) { | ||
const fieldData = column.get(i); | ||
const fieldValue = fieldData?.toJSON()[columnName]; | ||
rows[i].f[j] = { | ||
v: convertArrowValue(fieldValue), | ||
}; | ||
} | ||
} | ||
for (let i = 0; i < batch.numRows; i++) { | ||
this.push(rows[i]); | ||
} | ||
callback(null); | ||
} | ||
} | ||
|
||
function convertArrowValue(fieldValue: any): any { | ||
Check warning on line 159 in src/reader/arrow_transform.ts
|
||
alvarowolfx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (typeof fieldValue === 'object') { | ||
if (fieldValue instanceof Vector) { | ||
const arr = fieldValue.toJSON(); | ||
return arr.map((v: any) => { | ||
return {v: convertArrowValue(v)}; | ||
}); | ||
} | ||
const tableRow: TableRow = {f: []}; | ||
Object.keys(fieldValue).forEach(key => { | ||
tableRow.f?.push({ | ||
v: convertArrowValue(fieldValue[key]), | ||
}); | ||
}); | ||
return tableRow; | ||
} | ||
return fieldValue; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
// Copyright 2024 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
import * as protos from '../../protos/protos'; | ||
|
||
export type DataFormat = | ||
protos.google.cloud.bigquery.storage.v1.IReadSession['dataFormat']; | ||
const DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; | ||
|
||
/** | ||
* Return data in Apache Arrow format. | ||
* | ||
* @memberof reader | ||
*/ | ||
export const ArrowFormat: DataFormat = 'ARROW'; | ||
|
||
/** | ||
* Return data in Apache Avro format. | ||
* | ||
* @memberof reader | ||
*/ | ||
export const AvroFormat: DataFormat = 'AVRO'; |
Uh oh!
There was an error while loading. Please reload this page.