-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-8635][SQL] improve performance of CatalystTypeConverters #7018
Conversation
the benchmark (ScalaUdf will convert from catalyst to scala and back again): case class Floor(child: Expression) extends UnaryExpression with Predicate {
override def toString = s"Floor $child"
override def eval(input: InternalRow): Any = {
child.eval(input) match {
case null => null
case s: Seq[Int] => s.sum
}
}
}
object T {
def benchmark(count: Int, expr: Expression): Unit = {
var i = 0
val row = new GenericRow(Array[Any]((1 to 10).toSeq))
val s = System.currentTimeMillis()
while (i < count) {
expr.eval(row)
i += 1
}
val e = System.currentTimeMillis()
println (s"${expr.getClass.getSimpleName} -- ${e - s} ms")
}
def main(args: Array[String]) {
def func(s: Seq[Int]) = s.sum
val attr = BoundReference(0, ArrayType(IntegerType), true)
val udf0 = ScalaUdf(func _, IntegerType, attr :: Nil)
val udf1 = Floor(attr)
benchmark(1000000, udf0)
benchmark(1000000, udf0)
benchmark(1000000, udf0)
benchmark(1000000, udf1)
benchmark(1000000, udf1)
benchmark(1000000, udf1)
}
} before: after: |
@@ -258,16 +273,13 @@ object CatalystTypeConverters { | |||
toScala(row(column).asInstanceOf[InternalRow]) | |||
} | |||
|
|||
private object StringConverter extends CatalystTypeConverter[Any, String, Any] { | |||
private object StringConverter extends CatalystTypeConverter[Any, String, UTF8String] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The internal type of StringType
should always be UTF8String
.
@@ -90,7 +90,7 @@ private[sql] object FrequentItems extends Logging { | |||
(name, originalSchema.fields(index).dataType) | |||
} | |||
|
|||
val freqItems = df.select(cols.map(Column(_)) : _*).rdd.aggregate(countMaps)( | |||
val freqItems = df.select(cols.map(Column(_)) : _*).internalRowRdd.aggregate(countMaps)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we calculate singlePassFreqItems
, we don't need to convert catalyst types to scala types before calculation. DataFrame.rdd
is RDD[Row]
and we need RDD[InternalRow]
here.
Test build #35787 has finished for PR 7018 at commit
|
Test build #35789 has finished for PR 7018 at commit
|
* Typical use case would be converting a collection of rows that have the same schema. You will | ||
* call this function once to get a converter, and apply it to every row. | ||
*/ | ||
private[sql] def createToScalaConverter(dataType: DataType): Any => Any = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update the StructConverter to use createToScalaConverter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never mind, the StructConverter also need toCatalystConverter
.
LGTM, only one minor comment. |
merged this into master, thanks! |
In
CatalystTypeConverters.createToCatalystConverter
, we add special handling for primitive types. We can apply this strategy to more places to improve performance.