From 479d7e40bb99147045add38865e5872e21bb840b Mon Sep 17 00:00:00 2001 From: echo567 Date: Sun, 16 Nov 2025 19:01:30 +0800 Subject: [PATCH 1/2] fix(spark): fix arrow batch converter error --- .../execution/arrow/KyuubiArrowConverters.scala | 15 ++++++--------- .../spark/sql/kyuubi/SparkDatasetHelper.scala | 2 +- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala index e13653b01cb..29eb53cd74e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala @@ -275,18 +275,15 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { Utils.tryWithSafeFinally { // Always write the first row. - while (rowIter.hasNext && ( + while (rowIter.hasNext && (rowCount < limit || limit < 0) && ( // For maxBatchSize and maxRecordsPerBatch, respect whatever smaller. // If the size in bytes is positive (set properly), always write the first row. - rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0 || + rowCountInLastBatch == 0 || // If the size in bytes of rows are 0 or negative, unlimit it. - estimatedBatchSize <= 0 || - estimatedBatchSize < maxEstimatedBatchSize || - // If the size of rows are 0 or negative, unlimit it. - maxRecordsPerBatch <= 0 || - rowCountInLastBatch < maxRecordsPerBatch || - rowCount < limit || - limit < 0)) { + ((estimatedBatchSize <= 0 || estimatedBatchSize < maxEstimatedBatchSize) && + // If the size of rows are 0 or negative, unlimit it. + (maxRecordsPerBatch <= 0 || rowCountInLastBatch < maxRecordsPerBatch)) + )) { val row = rowIter.next() arrowWriter.write(row) estimatedBatchSize += (row match { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 73e7f779934..7e9088c7df5 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -165,7 +165,7 @@ object SparkDatasetHelper extends Logging { .getConf .getOption("spark.connect.grpc.arrow.maxBatchSize") .orElse(Option("4m")) - .map(JavaUtils.byteStringAs(_, ByteUnit.MiB)) + .map(JavaUtils.byteStringAs(_, ByteUnit.BYTE)) .get } From c9d0d186f5135a2ee63416ccaca0943c3dc8048e Mon Sep 17 00:00:00 2001 From: echo567 Date: Wed, 26 Nov 2025 20:14:26 +0800 Subject: [PATCH 2/2] fix(arrow): repairing arrow based on spark --- .../arrow/KyuubiArrowConverters.scala | 26 ++++++++++++++----- .../spark/sql/kyuubi/SparkDatasetHelper.scala | 3 ++- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala index 29eb53cd74e..75618e3124b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala @@ -274,16 +274,28 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { var estimatedBatchSize = 0L Utils.tryWithSafeFinally { + def isBatchSizeLimitExceeded: Boolean = { + // If `maxEstimatedBatchSize` is zero or negative, it implies unlimited. + maxEstimatedBatchSize > 0 && estimatedBatchSize >= maxEstimatedBatchSize + } + def isRecordLimitExceeded: Boolean = { + // If `maxRecordsPerBatch` is zero or negative, it implies unlimited. + maxRecordsPerBatch > 0 && rowCountInLastBatch >= maxRecordsPerBatch + } + def isGlobalLimitNotReached: Boolean = { + // If the limit is negative, it means no restriction + // or the current number of rows has not reached the limit. + rowCount < limit || limit < 0 + } + // Always write the first row. - while (rowIter.hasNext && (rowCount < limit || limit < 0) && ( - // For maxBatchSize and maxRecordsPerBatch, respect whatever smaller. + while (rowIter.hasNext && isGlobalLimitNotReached && ( // If the size in bytes is positive (set properly), always write the first row. rowCountInLastBatch == 0 || - // If the size in bytes of rows are 0 or negative, unlimit it. - ((estimatedBatchSize <= 0 || estimatedBatchSize < maxEstimatedBatchSize) && - // If the size of rows are 0 or negative, unlimit it. - (maxRecordsPerBatch <= 0 || rowCountInLastBatch < maxRecordsPerBatch)) - )) { + // If either limit is hit, create a batch. This implies that the limit that is hit + // first triggers the creation of a batch even if the other limit is not yet hit + // hence preferring the more restrictive limit. + (!isBatchSizeLimitExceeded && !isRecordLimitExceeded))) { val row = rowIter.next() arrowWriter.write(row) estimatedBatchSize += (row match { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 7e9088c7df5..7d6e5f8fdb7 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -164,7 +164,8 @@ object SparkDatasetHelper extends Logging { KyuubiSparkUtil.globalSparkContext .getConf .getOption("spark.connect.grpc.arrow.maxBatchSize") - .orElse(Option("4m")) + // 4m + .orElse(Option("4194304b")) .map(JavaUtils.byteStringAs(_, ByteUnit.BYTE)) .get }