Skip to content

Commit

Permalink
[SPARK-14838] [SQL] Set default size for ObjecType to avoid failure w…
Browse files Browse the repository at this point in the history
…hen estimating sizeInBytes in ObjectProducer

## What changes were proposed in this pull request?

We have logical plans that produce domain objects which are `ObjectType`. As we can't estimate the size of `ObjectType`, we throw an `UnsupportedOperationException` if trying to do that. We should set a default size for `ObjectType` to avoid this failure.

## How was this patch tested?

`DatasetSuite`.

Author: Liang-Chi Hsieh <simonh@tw.ibm.com>

Closes #12599 from viirya/skip-broadcast-objectproducer.
  • Loading branch information
viirya authored and davies committed Apr 24, 2016
1 parent 1b7eab7 commit ba5e0b8
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ private[sql] object ObjectType extends AbstractDataType {
* outside of the execution engine.
*/
private[sql] case class ObjectType(cls: Class[_]) extends DataType {
override def defaultSize: Int =
throw new UnsupportedOperationException("No size estimation available for objects.")
override def defaultSize: Int = 4096

def asNullable: DataType = this

Expand Down
23 changes: 23 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,29 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
// Make sure the generated code for this plan can compile and execute.
checkDataset(wideDF.map(_.getLong(0)), 0L until 10 : _*)
}

test("SPARK-14838: estimating sizeInBytes in operators with ObjectProducer shouldn't fail") {
val dataset = Seq(
(0, 3, 54f),
(0, 4, 44f),
(0, 5, 42f),
(1, 3, 39f),
(1, 5, 33f),
(1, 4, 26f),
(2, 3, 51f),
(2, 5, 45f),
(2, 4, 30f)
).toDF("user", "item", "rating")

val actual = dataset
.select("user", "item")
.as[(Int, Int)]
.groupByKey(_._1)
.mapGroups { case (src, ids) => (src, ids.map(_._2).toArray) }
.toDF("id", "actual")

dataset.join(actual, dataset("user") === actual("id")).collect()
}
}

case class OtherTuple(_1: String, _2: Int)
Expand Down

0 comments on commit ba5e0b8

Please sign in to comment.