diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala index e13653b01cb..75618e3124b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala @@ -274,19 +274,28 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { var estimatedBatchSize = 0L Utils.tryWithSafeFinally { + def isBatchSizeLimitExceeded: Boolean = { + // If `maxEstimatedBatchSize` is zero or negative, it implies unlimited. + maxEstimatedBatchSize > 0 && estimatedBatchSize >= maxEstimatedBatchSize + } + def isRecordLimitExceeded: Boolean = { + // If `maxRecordsPerBatch` is zero or negative, it implies unlimited. + maxRecordsPerBatch > 0 && rowCountInLastBatch >= maxRecordsPerBatch + } + def isGlobalLimitNotReached: Boolean = { + // If the limit is negative, it means no restriction + // or the current number of rows has not reached the limit. + rowCount < limit || limit < 0 + } + // Always write the first row. - while (rowIter.hasNext && ( - // For maxBatchSize and maxRecordsPerBatch, respect whatever smaller. + while (rowIter.hasNext && isGlobalLimitNotReached && ( // If the size in bytes is positive (set properly), always write the first row. - rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0 || - // If the size in bytes of rows are 0 or negative, unlimit it. - estimatedBatchSize <= 0 || - estimatedBatchSize < maxEstimatedBatchSize || - // If the size of rows are 0 or negative, unlimit it. - maxRecordsPerBatch <= 0 || - rowCountInLastBatch < maxRecordsPerBatch || - rowCount < limit || - limit < 0)) { + rowCountInLastBatch == 0 || + // If either limit is hit, create a batch. This implies that the limit that is hit + // first triggers the creation of a batch even if the other limit is not yet hit + // hence preferring the more restrictive limit. + (!isBatchSizeLimitExceeded && !isRecordLimitExceeded))) { val row = rowIter.next() arrowWriter.write(row) estimatedBatchSize += (row match { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 73e7f779934..7d6e5f8fdb7 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -164,8 +164,9 @@ object SparkDatasetHelper extends Logging { KyuubiSparkUtil.globalSparkContext .getConf .getOption("spark.connect.grpc.arrow.maxBatchSize") - .orElse(Option("4m")) - .map(JavaUtils.byteStringAs(_, ByteUnit.MiB)) + // 4m + .orElse(Option("4194304b")) + .map(JavaUtils.byteStringAs(_, ByteUnit.BYTE)) .get }