Skip to content

Commit

Permalink
[PECO-1532] Ignore the excess records in query results (#239)
Browse files Browse the repository at this point in the history
* [PECO-1532] Arrow and CloudFetch result handlers: return row count with raw batch data

Signed-off-by: Levko Kravets <levko.ne@gmail.com>

* Update tests

Signed-off-by: Levko Kravets <levko.ne@gmail.com>

* Refactor `ArrowResultConverter` - cleanup and make it skip empty batches

Signed-off-by: Levko Kravets <levko.ne@gmail.com>

* [PECO-1532] Ignore the excess records in arrow batches

Signed-off-by: Levko Kravets <levko.ne@gmail.com>

* Add tests

Signed-off-by: Levko Kravets <levko.ne@gmail.com>

---------

Signed-off-by: Levko Kravets <levko.ne@gmail.com>
  • Loading branch information
kravets-levko committed Mar 27, 2024
1 parent 6673660 commit 1dc16ac
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 120 deletions.
115 changes: 78 additions & 37 deletions lib/result/ArrowResultConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
import { TGetResultSetMetadataResp, TColumnDesc } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { getSchemaColumns, convertThriftValue } from './utils';
import { ArrowBatch, getSchemaColumns, convertThriftValue } from './utils';

const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils;

Expand All @@ -26,15 +26,23 @@ type ArrowSchemaField = Field<DataType<Type, TypeMap>>;
export default class ArrowResultConverter implements IResultsProvider<Array<any>> {
protected readonly context: IClientContext;

private readonly source: IResultsProvider<Array<Buffer>>;
private readonly source: IResultsProvider<ArrowBatch>;

private readonly schema: Array<TColumnDesc>;

private reader?: IterableIterator<RecordBatch<TypeMap>>;
private recordBatchReader?: IterableIterator<RecordBatch<TypeMap>>;

private pendingRecordBatch?: RecordBatch<TypeMap>;
// Remaining rows in current Arrow batch (not the record batch!)
private remainingRows: number = 0;

constructor(context: IClientContext, source: IResultsProvider<Array<Buffer>>, { schema }: TGetResultSetMetadataResp) {
// This is the next (!!) record batch to be read. It is unset only in two cases:
// - prior to the first call to `fetchNext`
// - when no more data available
// This field is primarily used by a `hasMore`, so it can tell if next `fetchNext` will
// actually return a non-empty result
private prefetchedRecordBatch?: RecordBatch<TypeMap>;

constructor(context: IClientContext, source: IResultsProvider<ArrowBatch>, { schema }: TGetResultSetMetadataResp) {
this.context = context;
this.source = source;
this.schema = getSchemaColumns(schema);
Expand All @@ -44,7 +52,7 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
if (this.schema.length === 0) {
return false;
}
if (this.pendingRecordBatch) {
if (this.prefetchedRecordBatch) {
return true;
}
return this.source.hasMore();
Expand All @@ -55,47 +63,80 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
return [];
}

// eslint-disable-next-line no-constant-condition
while (true) {
// It's not possible to know if iterator has more items until trying
// to get the next item. But we need to know if iterator is empty right
// after getting the next item. Therefore, after creating the iterator,
// we get one item more and store it in `pendingRecordBatch`. Next time,
// we use that stored item, and prefetch the next one. Prefetched item
// is therefore the next item we are going to return, so it can be used
// to know if we actually can return anything next time
const recordBatch = this.pendingRecordBatch;
this.pendingRecordBatch = this.prefetch();

if (recordBatch) {
const table = new Table(recordBatch);
return this.getRows(table.schema, table.toArray());
// It's not possible to know if iterator has more items until trying to get the next item.
// So each time we read one batch ahead and store it, but process the batch prefetched on
// a previous `fetchNext` call. Because we actually already have the next item - it's easy
// to tell if the subsequent `fetchNext` will be able to read anything, and `hasMore` logic
// becomes trivial

// This prefetch handles a first call to `fetchNext`, when all the internal fields are not initialized yet.
// On subsequent calls to `fetchNext` it will do nothing
await this.prefetch(options);

if (this.prefetchedRecordBatch) {
// Consume a record batch fetched during previous call to `fetchNext`
const table = new Table(this.prefetchedRecordBatch);
this.prefetchedRecordBatch = undefined;
// Get table rows, but not more than remaining count
const arrowRows = table.toArray().slice(0, this.remainingRows);
const result = this.getRows(table.schema, arrowRows);

// Reduce remaining rows count by a count of rows we just processed.
// If the remaining count reached zero - we're done with current arrow
// batch, so discard the batch reader
this.remainingRows -= result.length;
if (this.remainingRows === 0) {
this.recordBatchReader = undefined;
}

// eslint-disable-next-line no-await-in-loop
const batches = await this.source.fetchNext(options);
if (batches.length === 0) {
this.reader = undefined;
break;
}
// Prefetch the next record batch
await this.prefetch(options);

const reader = RecordBatchReader.from<TypeMap>(batches);
this.reader = reader[Symbol.iterator]();
this.pendingRecordBatch = this.prefetch();
return result;
}

return [];
}

private prefetch(): RecordBatch<TypeMap> | undefined {
const item = this.reader?.next() ?? { done: true, value: undefined };
// This method tries to read one more record batch and store it in `prefetchedRecordBatch` field.
// If `prefetchedRecordBatch` is already non-empty - the method does nothing.
// This method pulls the next item from source if needed, initializes a record batch reader and
// gets the next item from it - until either reaches end of data or finds a non-empty record batch
private async prefetch(options: ResultsProviderFetchNextOptions) {
// This loop will be executed until a next non-empty record batch is retrieved
// Another implicit loop condition (end of data) is checked in the loop body
while (!this.prefetchedRecordBatch) {
// First, try to fetch next item from source and initialize record batch reader.
// If source has no more data - exit prematurely
if (!this.recordBatchReader) {
const sourceHasMore = await this.source.hasMore(); // eslint-disable-line no-await-in-loop
if (!sourceHasMore) {
return;
}

const arrowBatch = await this.source.fetchNext(options); // eslint-disable-line no-await-in-loop
if (arrowBatch.batches.length > 0 && arrowBatch.rowCount > 0) {
const reader = RecordBatchReader.from<TypeMap>(arrowBatch.batches);
this.recordBatchReader = reader[Symbol.iterator]();
this.remainingRows = arrowBatch.rowCount;
}
}

if (item.done || item.value === undefined) {
this.reader = undefined;
return undefined;
// Try to get a next item from current record batch reader. The reader may be unavailable at this point -
// in this case we fall back to a "done" state, and the `while` loop will do one more iteration attempting
// to create a new reader. Eventually it will either succeed or reach end of source. This scenario also
// handles readers which are already empty
const item = this.recordBatchReader?.next() ?? { done: true, value: undefined };
if (item.done || item.value === undefined) {
this.recordBatchReader = undefined;
} else {
// Skip empty batches
// eslint-disable-next-line no-lonely-if
if (item.value.numRows > 0) {
this.prefetchedRecordBatch = item.value;
}
}
}

return item.value;
}

private getRows(schema: ArrowSchema, rows: Array<StructRow | MapRow>): Array<any> {
Expand Down
23 changes: 17 additions & 6 deletions lib/result/ArrowResultHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import LZ4 from 'lz4';
import { TGetResultSetMetadataResp, TRowSet } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { hiveSchemaToArrowSchema } from './utils';
import { ArrowBatch, hiveSchemaToArrowSchema } from './utils';

export default class ArrowResultHandler implements IResultsProvider<Array<Buffer>> {
export default class ArrowResultHandler implements IResultsProvider<ArrowBatch> {
protected readonly context: IClientContext;

private readonly source: IResultsProvider<TRowSet | undefined>;
Expand Down Expand Up @@ -35,22 +35,33 @@ export default class ArrowResultHandler implements IResultsProvider<Array<Buffer

public async fetchNext(options: ResultsProviderFetchNextOptions) {
if (!this.arrowSchema) {
return [];
return {
batches: [],
rowCount: 0,
};
}

const rowSet = await this.source.fetchNext(options);

const batches: Array<Buffer> = [];
rowSet?.arrowBatches?.forEach(({ batch }) => {
let totalRowCount = 0;
rowSet?.arrowBatches?.forEach(({ batch, rowCount }) => {
if (batch) {
batches.push(this.isLZ4Compressed ? LZ4.decode(batch) : batch);
totalRowCount += rowCount.toNumber(true);
}
});

if (batches.length === 0) {
return [];
return {
batches: [],
rowCount: 0,
};
}

return [this.arrowSchema, ...batches];
return {
batches: [this.arrowSchema, ...batches],
rowCount: totalRowCount,
};
}
}
23 changes: 16 additions & 7 deletions lib/result/CloudFetchResultHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import fetch, { RequestInfo, RequestInit, Request } from 'node-fetch';
import { TGetResultSetMetadataResp, TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { ArrowBatch } from './utils';

export default class CloudFetchResultHandler implements IResultsProvider<Array<Buffer>> {
export default class CloudFetchResultHandler implements IResultsProvider<ArrowBatch> {
protected readonly context: IClientContext;

private readonly source: IResultsProvider<TRowSet | undefined>;
Expand All @@ -13,7 +14,7 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B

private pendingLinks: Array<TSparkArrowResultLink> = [];

private downloadTasks: Array<Promise<Buffer>> = [];
private downloadTasks: Array<Promise<ArrowBatch>> = [];

constructor(
context: IClientContext,
Expand Down Expand Up @@ -49,15 +50,20 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
}

const batch = await this.downloadTasks.shift();
const batches = batch ? [batch] : [];
if (!batch) {
return {
batches: [],
rowCount: 0,
};
}

if (this.isLZ4Compressed) {
return batches.map((buffer) => LZ4.decode(buffer));
batch.batches = batch.batches.map((buffer) => LZ4.decode(buffer));
}
return batches;
return batch;
}

private async downloadLink(link: TSparkArrowResultLink): Promise<Buffer> {
private async downloadLink(link: TSparkArrowResultLink): Promise<ArrowBatch> {
if (Date.now() >= link.expiryTime.toNumber()) {
throw new Error('CloudFetch link has expired');
}
Expand All @@ -68,7 +74,10 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
}

const result = await response.arrayBuffer();
return Buffer.from(result);
return {
batches: [Buffer.from(result)],
rowCount: link.rowCount.toNumber(true),
};
}

private async fetch(url: RequestInfo, init?: RequestInit) {
Expand Down
5 changes: 5 additions & 0 deletions lib/result/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import {
import { TTableSchema, TColumnDesc, TPrimitiveTypeEntry, TTypeId } from '../../thrift/TCLIService_types';
import HiveDriverError from '../errors/HiveDriverError';

export interface ArrowBatch {
batches: Array<Buffer>;
rowCount: number;
}

export function getSchemaColumns(schema?: TTableSchema): Array<TColumnDesc> {
if (!schema) {
return [];
Expand Down

0 comments on commit 1dc16ac

Please sign in to comment.