diff --git a/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala b/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala index 4eb27dac18..0151917999 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala @@ -131,6 +131,9 @@ class DeltaTaskStatisticsTracker( override def newPartition(partitionValues: InternalRow): Unit = { } + protected def initializeAggBuf(buffer: SpecificInternalRow): InternalRow = + initializeStats.target(buffer).apply(EmptyRow) + override def newFile(newFilePath: String): Unit = { submittedFiles.getOrElseUpdate(newFilePath, { // `buffer` is a row that will start off by holding the initial values for the agg expressions @@ -138,7 +141,7 @@ class DeltaTaskStatisticsTracker( // is processed (see updateStats: Projection), and will finally serve as an input for // computing the per-file result of statsColExpr (see getStats: Projection) val buffer = new SpecificInternalRow(aggBufferAttrs.map(_.dataType)) - buffer + initializeAggBuf(buffer) }) }