Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13898][SQL] Merge DatasetHolder and DataFrameHolder #11737

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,6 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.DirectKafkaInputDStream.maxMessagesPerPartition")
) ++ Seq(
// [SPARK-13244][SQL] Migrates DataFrame to Dataset
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.apply"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.toDF"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.toDF"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.copy"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.copy$default$1"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.df$1"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.tables"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.tables"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.sql"),
Expand All @@ -316,6 +309,13 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrame"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrame$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.LegacyFunctions"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameHolder"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrameHolder$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.localSeqToDataFrameHolder"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.stringRddToDataFrameHolder"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.rddToDataFrameHolder"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.longRddToDataFrameHolder"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.intRddToDataFrameHolder"),

ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.evaluation.MultilabelMetrics.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.predictions"),
Expand Down
37 changes: 0 additions & 37 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala

This file was deleted.

11 changes: 5 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1994,13 +1994,12 @@ class Dataset[T] private[sql](
def write: DataFrameWriter = new DataFrameWriter(toDF())

/**
* Returns the content of the [[DataFrame]] as a RDD of JSON strings.
* @group rdd
* @since 1.3.0
* Returns the content of the [[Dataset]] as a Dataset of JSON strings.
* @since 2.0.0
*/
def toJSON: Dataset[String] = {
val rowSchema = this.schema
val rdd = queryExecution.toRdd.mapPartitions { iter =>
val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
Expand All @@ -2022,8 +2021,8 @@ class Dataset[T] private[sql](
}
}
}
import sqlContext.implicits._
rdd.toDS
import sqlContext.implicits.newStringEncoder
sqlContext.createDataset(rdd)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql

/**
* A container for a [[Dataset]], used for implicit conversions.
* A container for a [[Dataset]], used for implicit conversions in Scala.
*
* To use this, import implicit conversions in SQL:
* {{{
Expand All @@ -32,4 +32,10 @@ case class DatasetHolder[T] private[sql](private val ds: Dataset[T]) {
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDS("1")` as invoking this toDS and then apply on the returned Dataset.
def toDS(): Dataset[T] = ds

// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = ds.toDF()

def toDF(colNames: String*): DataFrame = ds.toDF(colNames : _*)
}
71 changes: 0 additions & 71 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,75 +147,4 @@ abstract class SQLImplicits {
*/
implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name)

/**
* Creates a DataFrame from an RDD of Product (e.g. case classes, tuples).
* @since 1.3.0
*/
implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): DataFrameHolder = {
DataFrameHolder(_sqlContext.createDataFrame(rdd))
}

/**
* Creates a DataFrame from a local Seq of Product.
* @since 1.3.0
*/
implicit def localSeqToDataFrameHolder[A <: Product : TypeTag](data: Seq[A]): DataFrameHolder =
{
DataFrameHolder(_sqlContext.createDataFrame(data))
}

// Do NOT add more implicit conversions for primitive types.
// They are likely to break source compatibility by making existing implicit conversions
// ambiguous. In particular, RDD[Double] is dangerous because of [[DoubleRDDFunctions]].

/**
* Creates a single column DataFrame from an RDD[Int].
* @since 1.3.0
*/
implicit def intRddToDataFrameHolder(data: RDD[Int]): DataFrameHolder = {
val dataType = IntegerType
val rows = data.mapPartitions { iter =>
val row = new SpecificMutableRow(dataType :: Nil)
iter.map { v =>
row.setInt(0, v)
row: InternalRow
}
}
DataFrameHolder(
_sqlContext.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil)))
}

/**
* Creates a single column DataFrame from an RDD[Long].
* @since 1.3.0
*/
implicit def longRddToDataFrameHolder(data: RDD[Long]): DataFrameHolder = {
val dataType = LongType
val rows = data.mapPartitions { iter =>
val row = new SpecificMutableRow(dataType :: Nil)
iter.map { v =>
row.setLong(0, v)
row: InternalRow
}
}
DataFrameHolder(
_sqlContext.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil)))
}

/**
* Creates a single column DataFrame from an RDD[String].
* @since 1.3.0
*/
implicit def stringRddToDataFrameHolder(data: RDD[String]): DataFrameHolder = {
val dataType = StringType
val rows = data.mapPartitions { iter =>
val row = new SpecificMutableRow(dataType :: Nil)
iter.map { v =>
row.update(0, UTF8String.fromString(v))
row: InternalRow
}
}
DataFrameHolder(
_sqlContext.internalCreateDataFrame(rows, StructType(StructField("_1", dataType) :: Nil)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -612,15 +612,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
val longString = Array.fill(21)("1").mkString
val df = sparkContext.parallelize(Seq("1", longString)).toDF()
val expectedAnswerForFalse = """+---------------------+
||_1 |
||value |
|+---------------------+
||1 |
||111111111111111111111|
|+---------------------+
|""".stripMargin
assert(df.showString(10, false) === expectedAnswerForFalse)
val expectedAnswerForTrue = """+--------------------+
|| _1|
|| value|
|+--------------------+
|| 1|
||11111111111111111...|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1619,15 +1619,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-10215 Div of Decimal returns null") {
val d = Decimal(1.12321)
val d = Decimal(1.12321).toBigDecimal
val df = Seq((d, 1)).toDF("a", "b")

checkAnswer(
df.selectExpr("b * a / b"),
Seq(Row(d.toBigDecimal)))
Seq(Row(d)))
checkAnswer(
df.selectExpr("b * a / b / b"),
Seq(Row(d.toBigDecimal)))
Seq(Row(d)))
checkAnswer(
df.selectExpr("b * a + b"),
Seq(Row(BigDecimal(2.12321))))
Expand All @@ -1636,7 +1636,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Seq(Row(BigDecimal(0.12321))))
checkAnswer(
df.selectExpr("b * a * b"),
Seq(Row(d.toBigDecimal)))
Seq(Row(d)))
}

test("precision smaller than scale") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode
import org.apache.spark.sql.test.SharedSQLContext

class ExchangeSuite extends SparkPlanTest with SharedSQLContext {
import testImplicits.localSeqToDataFrameHolder
import testImplicits._

test("shuffling UnsafeRows in exchange") {
val input = (1 to 1000).map(Tuple1.apply)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import org.apache.spark.sql.types._
* sorted by a reference implementation ([[ReferenceSort]]).
*/
class SortSuite extends SparkPlanTest with SharedSQLContext {
import testImplicits.localSeqToDataFrameHolder
import testImplicits.newProductEncoder
import testImplicits.localSeqToDatasetHolder

test("basic sorting using ExternalSort") {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
test("decimal type") {
// Casting is required here because ScalaReflection can't capture decimal precision information.
val df = (1 to 10)
.map(i => Tuple1(Decimal(i, 15, 10)))
.map(i => Tuple1(Decimal(i, 15, 10).toJavaBigDecimal))
.toDF("dec")
.select($"dec" cast DecimalType(15, 10))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
import testImplicits.localSeqToDataFrameHolder
import testImplicits.newProductEncoder
import testImplicits.localSeqToDatasetHolder

private lazy val myUpperCaseData = sqlContext.createDataFrame(
sparkContext.parallelize(Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,9 +956,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
assert(checkAddFileRDD.first())
}

case class LogEntry(filename: String, message: String)
case class LogFile(name: String)

createQueryTest("dynamic_partition",
"""
|DROP TABLE IF EXISTS dynamic_part_table;
Expand Down Expand Up @@ -1255,3 +1252,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {

// for SPARK-2180 test
case class HavingRow(key: Int, value: String, attr: Int)

case class LogEntry(filename: String, message: String)
case class LogFile(name: String)
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}

test("SPARK-5203 union with different decimal precision") {
Seq.empty[(Decimal, Decimal)]
Seq.empty[(java.math.BigDecimal, java.math.BigDecimal)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all these decimal related changes because merging DataFrameHolder and DatasetHolder breaks compilation, or just because Decimal is internal type and shouldn't be exposed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decimal's an internal type. (although I think we should expose it in the future -- but we haven't done that yet)

.toDF("d1", "d2")
.select($"d1".cast(DecimalType(10, 5)).as("d"))
.registerTempTable("dn")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}

test("Read/write all types with non-primitive type") {
val data = (0 to 255).map { i =>
val data: Seq[AllDataTypesWithNonPrimitiveType] = (0 to 255).map { i =>
AllDataTypesWithNonPrimitiveType(
s"$i", i, i.toLong, i.toFloat, i.toDouble, i.toShort, i.toByte, i % 2 == 0,
0 until i,
Expand Down