From 7df5d58acedec7e08c5f8964f3206b9e82ca8295 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 20 Sep 2016 09:32:29 -0700 Subject: [PATCH] Revert "[SPARK-17549][SQL] Only collect table size stat in driver for cached relation." This reverts changes made to InMemoryRelation and InMemoryColumnarQuerySuite in commit 39e2bad6a866d27c3ca594d15e574a1da3ee84cc. But, it keeps the change in CodeGenerator.scala to make recordCompilationStats tolerant to errors thrown by janino. --- .../execution/columnar/InMemoryRelation.scala | 24 ++++++++++++++----- .../columnar/InMemoryColumnarQuerySuite.scala | 14 ----------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 56bd5c1891e8d..479934a7afc75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.columnar +import scala.collection.JavaConverters._ + import org.apache.commons.lang3.StringUtils import org.apache.spark.network.util.JavaUtils @@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.LongAccumulator +import org.apache.spark.util.CollectionAccumulator object InMemoryRelation { @@ -61,7 +63,8 @@ case class InMemoryRelation( @transient child: SparkPlan, tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, - val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) + val batchStats: CollectionAccumulator[InternalRow] = + child.sqlContext.sparkContext.collectionAccumulator[InternalRow]) extends logical.LeafNode with MultiInstanceRelation { override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) @@ -71,12 +74,21 @@ case class InMemoryRelation( @transient val partitionStatistics = new PartitionStatistics(output) override lazy val statistics: Statistics = { - if (batchStats.value == 0L) { + if (batchStats.value.isEmpty) { // Underlying columnar RDD hasn't been materialized, no useful statistics information // available, return the default statistics. Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) } else { - Statistics(sizeInBytes = batchStats.value.longValue) + // Underlying columnar RDD has been materialized, required information has also been + // collected via the `batchStats` accumulator. + val sizeOfRow: Expression = + BindReferences.bindReference( + output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add), + partitionStatistics.schema) + + val sizeInBytes = + batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum + Statistics(sizeInBytes = sizeInBytes) } } @@ -127,10 +139,10 @@ case class InMemoryRelation( rowCount += 1 } - batchStats.add(totalSize) - val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics) .flatMap(_.values)) + + batchStats.add(stats) CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, stats) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 0daa29b666f62..937839644ad5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -232,18 +232,4 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val columnTypes2 = List.fill(length2)(IntegerType) val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2) } - - test("SPARK-17549: cached table size should be correctly calculated") { - val data = spark.sparkContext.parallelize(1 to 10, 5).toDF() - val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan - val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None) - - // Materialize the data. - val expectedAnswer = data.collect() - checkAnswer(cached, expectedAnswer) - - // Check that the right size was calculated. - assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize) - } - }