From b6b1f874fd54bd8dc9c4713294d12173361f0336 Mon Sep 17 00:00:00 2001 From: Shivam Raj Date: Tue, 13 May 2025 19:34:26 +0530 Subject: [PATCH 1/2] disable compression if cf is enabled --- lib/DBSQLSession.ts | 8 ++-- tests/unit/DBSQLSession.test.ts | 85 +++++++++++++++++++++++++++++++-- 2 files changed, 85 insertions(+), 8 deletions(-) diff --git a/lib/DBSQLSession.ts b/lib/DBSQLSession.ts index 13146cdc..d88331a1 100644 --- a/lib/DBSQLSession.ts +++ b/lib/DBSQLSession.ts @@ -227,14 +227,14 @@ export default class DBSQLSession implements IDBSQLSession { request.parameters = getQueryParameters(options.namedParameters, options.ordinalParameters); } - if (ProtocolVersion.supportsArrowCompression(this.serverProtocolVersion)) { - request.canDecompressLZ4Result = (options.useLZ4Compression ?? clientConfig.useLZ4Compression) && Boolean(LZ4); - } - if (ProtocolVersion.supportsCloudFetch(this.serverProtocolVersion)) { request.canDownloadResult = options.useCloudFetch ?? clientConfig.useCloudFetch; } + if (ProtocolVersion.supportsArrowCompression(this.serverProtocolVersion) && request.canDownloadResult !== true) { + request.canDecompressLZ4Result = (options.useLZ4Compression ?? clientConfig.useLZ4Compression) && Boolean(LZ4); + } + const operationPromise = driver.executeStatement(request); const response = await this.handleResponse(operationPromise); const operation = this.createOperation(response); diff --git a/tests/unit/DBSQLSession.test.ts b/tests/unit/DBSQLSession.test.ts index 055483ad..ddf843dc 100644 --- a/tests/unit/DBSQLSession.test.ts +++ b/tests/unit/DBSQLSession.test.ts @@ -86,7 +86,8 @@ describe('DBSQLSession', () => { }); it('should apply defaults for Arrow options', async () => { - case1: { + // case 1 + { const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub({ arrowEnabled: true }), @@ -95,7 +96,8 @@ describe('DBSQLSession', () => { expect(result).instanceOf(DBSQLOperation); } - case2: { + // case 2 + { const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub({ arrowEnabled: true, useArrowNativeTypes: false }), @@ -158,9 +160,14 @@ describe('DBSQLSession', () => { } if (version >= TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6) { - expect(req.canDecompressLZ4Result).to.be.true; + // Since cloud fetch is enabled, canDecompressLZ4Result should not be set + if (req.canDownloadResult === true) { + expect(req.canDecompressLZ4Result).to.not.be.true; + } else { + expect(req.canDecompressLZ4Result).to.be.true; + } } else { - expect(req.canDecompressLZ4Result).to.not.exist; + expect(req.canDecompressLZ4Result).to.not.be.true; } if (version >= TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V5) { @@ -180,6 +187,76 @@ describe('DBSQLSession', () => { }); } }); + + describe('LZ4 compression with cloud fetch', () => { + it('should not set canDecompressLZ4Result when cloud fetch is enabled (canDownloadResult=true)', async () => { + const context = new ClientContextStub({ useLZ4Compression: true }); + const driver = sinon.spy(context.driver); + const statement = 'SELECT * FROM table'; + + // Use V6+ which supports arrow compression + const session = new DBSQLSession({ + handle: sessionHandleStub, + context, + serverProtocolVersion: TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6, + }); + + // Execute with cloud fetch enabled + await session.executeStatement(statement, { useCloudFetch: true }); + + expect(driver.executeStatement.callCount).to.eq(1); + const req = driver.executeStatement.firstCall.args[0]; + + // canDownloadResult should be true and canDecompressLZ4Result should NOT be set + expect(req.canDownloadResult).to.be.true; + expect(req.canDecompressLZ4Result).to.not.be.true; + }); + + it('should set canDecompressLZ4Result when cloud fetch is disabled (canDownloadResult=false)', async () => { + const context = new ClientContextStub({ useLZ4Compression: true }); + const driver = sinon.spy(context.driver); + const statement = 'SELECT * FROM table'; + + // Use V6+ which supports arrow compression + const session = new DBSQLSession({ + handle: sessionHandleStub, + context, + serverProtocolVersion: TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6, + }); + + // Execute with cloud fetch disabled + await session.executeStatement(statement, { useCloudFetch: false }); + + expect(driver.executeStatement.callCount).to.eq(1); + const req = driver.executeStatement.firstCall.args[0]; + + // canDownloadResult should be false and canDecompressLZ4Result should be set + expect(req.canDownloadResult).to.be.false; + expect(req.canDecompressLZ4Result).to.be.true; + }); + + it('should not set canDecompressLZ4Result when server protocol does not support Arrow compression', async () => { + const context = new ClientContextStub({ useLZ4Compression: true }); + const driver = sinon.spy(context.driver); + const statement = 'SELECT * FROM table'; + + // Use V5 which does not support arrow compression + const session = new DBSQLSession({ + handle: sessionHandleStub, + context, + serverProtocolVersion: TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V5, + }); + + // Execute with cloud fetch disabled + await session.executeStatement(statement, { useCloudFetch: false }); + + expect(driver.executeStatement.callCount).to.eq(1); + const req = driver.executeStatement.firstCall.args[0]; + + // canDecompressLZ4Result should NOT be set regardless of cloud fetch setting + expect(req.canDecompressLZ4Result).to.not.be.true; + }); + }); }); describe('getTypeInfo', () => { From 38aa65f41060b95d14089cc49e618579d3eff443 Mon Sep 17 00:00:00 2001 From: Shivam Raj Date: Tue, 13 May 2025 19:55:15 +0530 Subject: [PATCH 2/2] updated test --- tests/e2e/arrow.test.ts | 5 ++++- tests/e2e/cloudfetch.test.ts | 7 ++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/e2e/arrow.test.ts b/tests/e2e/arrow.test.ts index 12d87e27..9202ea9c 100644 --- a/tests/e2e/arrow.test.ts +++ b/tests/e2e/arrow.test.ts @@ -187,7 +187,10 @@ describe('Arrow support', () => { 'should handle LZ4 compressed data', createTest( async (session) => { - const operation = await session.executeStatement(`SELECT * FROM ${tableName}`); + const operation = await session.executeStatement( + `SELECT * FROM ${tableName}`, + { useCloudFetch: false }, // Explicitly disable cloud fetch to test LZ4 compression + ); const result = await operation.fetchAll(); expect(fixArrowResult(result)).to.deep.equal(expectedArrow); diff --git a/tests/e2e/cloudfetch.test.ts b/tests/e2e/cloudfetch.test.ts index 5ac46296..ae75e4be 100644 --- a/tests/e2e/cloudfetch.test.ts +++ b/tests/e2e/cloudfetch.test.ts @@ -97,11 +97,11 @@ describe('CloudFetch', () => { expect(fetchedRowCount).to.be.equal(queriedRowsCount); }); - it('should handle LZ4 compressed data', async () => { + it('should not use LZ4 compression with cloud fetch', async () => { const cloudFetchConcurrentDownloads = 5; const session = await openSession({ cloudFetchConcurrentDownloads, - useLZ4Compression: true, + useLZ4Compression: true, // This is ignored when cloud fetch is enabled }); const queriedRowsCount = 10000000; // result has to be quite big to enable CloudFetch @@ -126,7 +126,8 @@ describe('CloudFetch', () => { expect(resultHandler).to.be.instanceof(ResultSlicer); expect(resultHandler.source).to.be.instanceof(ArrowResultConverter); expect(resultHandler.source.source).to.be.instanceOf(CloudFetchResultHandler); - expect(resultHandler.source.source.isLZ4Compressed).to.be.true; + // LZ4 compression should not be enabled with cloud fetch + expect(resultHandler.source.source.isLZ4Compressed).to.be.false; const chunk = await operation.fetchChunk({ maxRows: 100000, disableBuffering: true }); expect(chunk.length).to.be.gt(0);