From 1469cb082c00d7f092d8740831e1d3a94f1843c1 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 2 Sep 2015 12:43:19 -0700 Subject: [PATCH 1/3] override clone for String. --- .../scala/org/apache/spark/sql/columnar/ColumnType.scala | 2 ++ .../spark/sql/columnar/InMemoryColumnarQuerySuite.scala | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index 531a8244d55d1..ab482a3636121 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -339,6 +339,8 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) { override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { setField(to, toOrdinal, getField(from, fromOrdinal)) } + + override def clone(v: UTF8String): UTF8String = v.clone() } private[sql] object DATE extends NativeColumnType(DateType, 8, 4) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index 952637c5f9cb8..6a463e5652d31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -191,4 +191,13 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { ctx.table("InMemoryCache_different_data_types").collect()) ctx.dropTempTable("InMemoryCache_different_data_types") } + + test("SPARK-10422: String column in InMemoryColumnarCache needs to override clone method") { + val df = + ctx.range(1, 30000).selectExpr("id % 500 as id").rdd.map(id => Tuple1(s"str_$id")).toDF("i") + val cached = df.cache() + // count triggers the caching action. It should not throw. + cached.count() + cached.unpersist() + } } From c9e22187c6c5b94c55b78e1df40a1a9131baa32b Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 2 Sep 2015 17:53:14 -0700 Subject: [PATCH 2/3] Check result. --- .../spark/sql/columnar/InMemoryColumnarQuerySuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index 6a463e5652d31..192d046b654f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -198,6 +198,12 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { val cached = df.cache() // count triggers the caching action. It should not throw. cached.count() + + checkAnswer( + cached, + ctx.range(1, 30000).selectExpr("id % 500 as id").rdd.map(id => Tuple1(s"str_$id")).toDF("i") + ) + cached.unpersist() } } From 2905fd543b25251f9c50e8059668112c27a251d4 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 2 Sep 2015 17:58:33 -0700 Subject: [PATCH 3/3] Update test. --- .../spark/sql/columnar/InMemoryColumnarQuerySuite.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index 192d046b654f8..83db9b6510b36 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -194,16 +194,21 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-10422: String column in InMemoryColumnarCache needs to override clone method") { val df = - ctx.range(1, 30000).selectExpr("id % 500 as id").rdd.map(id => Tuple1(s"str_$id")).toDF("i") + ctx.range(1, 100).selectExpr("id % 10 as id").rdd.map(id => Tuple1(s"str_$id")).toDF("i") val cached = df.cache() // count triggers the caching action. It should not throw. cached.count() + // Make sure, the DataFrame is indeed cached. + assert(sqlContext.cacheManager.lookupCachedData(cached).nonEmpty) + + // Check result. checkAnswer( cached, - ctx.range(1, 30000).selectExpr("id % 500 as id").rdd.map(id => Tuple1(s"str_$id")).toDF("i") + ctx.range(1, 100).selectExpr("id % 10 as id").rdd.map(id => Tuple1(s"str_$id")).toDF("i") ) + // Drop the cache. cached.unpersist() } }