diff --git a/package.json b/package.json index 537e16b..b5d6a6a 100644 --- a/package.json +++ b/package.json @@ -81,6 +81,7 @@ }, "dependencies": { "@elastic/transport": "^8.9.1", + "apache-arrow": "^18.0.0", "tslib": "^2.5.0" }, "tap": { diff --git a/src/helpers.ts b/src/helpers.ts index 739ddef..5e9c01f 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -25,6 +25,7 @@ import assert from 'node:assert' import * as timersPromises from 'node:timers/promises' import { Readable } from 'node:stream' import { errors, TransportResult, TransportRequestOptions, TransportRequestOptionsWithMeta } from '@elastic/transport' +import { Table, TypeMap, tableFromIPC, RecordBatchStreamReader } from 'apache-arrow/Arrow.node' import Client from './client' import * as T from './api/types' @@ -154,6 +155,8 @@ export interface EsqlResponse { export interface EsqlHelper { toRecords: () => Promise> + toArrowTable: () => Promise> + toArrowReader: () => Promise } export interface EsqlToRecords { @@ -955,11 +958,6 @@ export default class Helpers { * @returns {object} EsqlHelper instance */ esql (params: T.EsqlQueryRequest, reqOptions: TransportRequestOptions = {}): EsqlHelper { - if (this[kMetaHeader] !== null) { - reqOptions.headers = reqOptions.headers ?? {} - reqOptions.headers['x-elastic-client-meta'] = `${this[kMetaHeader] as string},h=qo` - } - const client = this[kClient] function toRecords (response: EsqlResponse): TDocument[] { @@ -975,17 +973,50 @@ export default class Helpers { }) } + const metaHeader = this[kMetaHeader] + const helper: EsqlHelper = { /** * Pivots ES|QL query results into an array of row objects, rather than the default format where each row is an array of values. */ async toRecords(): Promise> { + if (metaHeader !== null) { + reqOptions.headers = reqOptions.headers ?? {} + reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qo` + } + params.format = 'json' + params.columnar = false // @ts-expect-error it's typed as ArrayBuffer but we know it will be JSON const response: EsqlResponse = await client.esql.query(params, reqOptions) const records: TDocument[] = toRecords(response) const { columns } = response return { records, columns } + }, + + async toArrowTable (): Promise> { + if (metaHeader !== null) { + reqOptions.headers = reqOptions.headers ?? {} + reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa` + } + + params.format = 'arrow' + + const response = await client.esql.query(params, reqOptions) + return tableFromIPC(response) + }, + + async toArrowReader (): Promise { + if (metaHeader !== null) { + reqOptions.headers = reqOptions.headers ?? {} + reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa` + reqOptions.asStream = true + } + + params.format = 'arrow' + + const response = await client.esql.query(params, reqOptions) + return RecordBatchStreamReader.from(response) } } diff --git a/test/unit/helpers/esql.test.ts b/test/unit/helpers/esql.test.ts index b029e13..c91e3cb 100644 --- a/test/unit/helpers/esql.test.ts +++ b/test/unit/helpers/esql.test.ts @@ -18,6 +18,7 @@ */ import { test } from 'tap' +import * as arrow from 'apache-arrow' import { connection } from '../../utils' import { Client } from '../../../' @@ -109,5 +110,184 @@ test('ES|QL helper', t => { t.end() }) + + test('toArrowTable', t => { + t.test('Parses a binary response into an Arrow table', async t => { + const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA=' + + const MockConnection = connection.buildMockConnection({ + onRequest (_params) { + return { + body: Buffer.from(binaryContent, 'base64'), + statusCode: 200, + headers: { + 'content-type': 'application/vnd.elasticsearch+arrow+stream' + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowTable() + t.ok(result instanceof arrow.Table) + + const table = [...result] + t.same(table[0], [ + ["amount", 4.900000095367432], + ["date", 1729532586965], + ]) + t.end() + }) + + t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => { + const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA=' + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + const header = params.headers?.['x-elastic-client-meta'] ?? '' + t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`) + return { + body: Buffer.from(binaryContent, 'base64'), + statusCode: 200, + headers: { + 'content-type': 'application/vnd.elasticsearch+arrow+stream' + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + await client.helpers.esql({ query: 'FROM sample_data' }).toArrowTable() + t.end() + }) + + t.end() + }) + + test('toArrowReader', t => { + t.test('Parses a binary response into an Arrow stream reader', async t => { + const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA=' + + const MockConnection = connection.buildMockConnection({ + onRequest (_params) { + return { + body: Buffer.from(binaryContent, 'base64'), + statusCode: 200, + headers: { + 'content-type': 'application/vnd.elasticsearch+arrow+stream' + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader() + t.ok(result.isStream()) + + const recordBatch = result.next().value + t.same(recordBatch.get(0)?.toJSON(), { + amount: 4.900000095367432, + date: 1729532586965, + }) + t.end() + }) + + t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => { + const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA=' + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + const header = params.headers?.['x-elastic-client-meta'] ?? '' + t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`) + return { + body: Buffer.from(binaryContent, 'base64'), + statusCode: 200, + headers: { + 'content-type': 'application/vnd.elasticsearch+arrow+stream' + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader() + t.end() + }) + + t.test('multi-batch support', async t => { + const intType = new arrow.Uint32 + const floatType = new arrow.Float32 + const schema = new arrow.Schema([ + arrow.Field.new('id', intType), + arrow.Field.new('val', floatType) + ]) + + function getBatch(ids: number[], vals: number[]) { + const id = arrow.makeData({ type: intType, data: ids }) + const val = arrow.makeData({ type: floatType, data: vals }) + return new arrow.RecordBatch({ id, val }) + } + + const batch1 = getBatch([1, 2, 3], [0.1, 0.2, 0.3]) + const batch2 = getBatch([4, 5, 6], [0.4, 0.5, 0.6]) + const batch3 = getBatch([7, 8, 9], [0.7, 0.8, 0.9]) + + const table = new arrow.Table(schema, [ + new arrow.RecordBatch(schema, batch1.data), + new arrow.RecordBatch(schema, batch2.data), + new arrow.RecordBatch(schema, batch3.data), + ]) + + const MockConnection = connection.buildMockConnection({ + onRequest (_params) { + return { + body: Buffer.from(arrow.tableToIPC(table, "stream")), + statusCode: 200, + headers: { + 'content-type': 'application/vnd.elasticsearch+arrow+stream' + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader() + t.ok(result.isStream()) + + let counter = 0 + for (const batch of result) { + for (const row of batch) { + counter++ + const { id, val } = row.toJSON() + t.equal(id, counter) + // floating points are hard in JS + t.equal((Math.round(val * 10) / 10).toFixed(1), (counter * 0.1).toFixed(1)) + } + } + t.end() + }) + + t.end() + }) t.end() }) diff --git a/tsconfig.json b/tsconfig.json index e93828b..a7d7a13 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,6 +1,6 @@ { "compilerOptions": { - "target": "es2019", + "target": "ES2019", "module": "commonjs", "moduleResolution": "node", "declaration": true, @@ -21,7 +21,8 @@ "importHelpers": true, "outDir": "lib", "lib": [ - "esnext" + "ES2019", + "dom" ] }, "formatCodeOptions": {