Skip to content

Commit

Permalink
Address comments, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Aug 20, 2014
1 parent b2e8ef3 commit c122cca
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ class JoinedRow extends Row {

/**
* JIT HACK: Replace with macros
* The `JoinedRow` class is used in many performance critical situation. Unfortunately, since there
* are multiple different types of `Rows` that could be stored as `row1` and `row2` most of the
* calls in the critical path are polymorphic. By creating special versions of this class that are
* used in only a single location of the code, we increase the chance that only a single type of
* Row will be referenced, increasing the opportunity for the JIT to play tricks. This sounds
* crazy but in benchmarks it had noticeable effects.
*/
class JoinedRow2 extends Row {
private[this] var row1: Row = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,43 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.types._

/**
* A parent class for mutable container objects that are reused when the values are changed,
* resulting in less garbage. These values are held by a [[SpecificMutableRow]].
*
* The following code was roughly used to generate these objects:
* {{{
* val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",")
* types.map {tpe =>
* s"""
* final class Mutable$tpe extends MutableValue {
* var value: $tpe = 0
* def boxed = if (isNull) null else value
* def update(v: Any) = value = {
* isNull = false
* v.asInstanceOf[$tpe]
* }
* def copy() = {
* val newCopy = new Mutable$tpe
* newCopy.isNull = isNull
* newCopy.value = value
* newCopy.asInstanceOf[this.type]
* }
* }"""
* }.foreach(println)
*
* types.map { tpe =>
* s"""
* override def set$tpe(ordinal: Int, value: $tpe): Unit = {
* val currentValue = values(ordinal).asInstanceOf[Mutable$tpe]
* currentValue.isNull = false
* currentValue.value = value
* }
*
{{{
val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",")
types.map {tpe =>
s"""
final class Mutable$tpe extends MutableValue {
var value: $tpe = 0
def boxed = if (isNull) null else value
def update(v: Any) = value = {
isNull = false
v.asInstanceOf[$tpe]
}
def copy() = {
val newCopy = new Mutable$tpe
newCopy.isNull = isNull
newCopy.value = value
newCopy.asInstanceOf[this.type]
}
}"""
}.foreach(println)
types.map { tpe =>
s"""
override def set$tpe(ordinal: Int, value: $tpe): Unit = {
val currentValue = values(ordinal).asInstanceOf[Mutable$tpe]
currentValue.isNull = false
currentValue.value = value
}
override def get$tpe(i: Int): $tpe = {
values(i).asInstanceOf[Mutable$tpe].value
}"""
}.foreach(println)
}}}
* override def get$tpe(i: Int): $tpe = {
* values(i).asInstanceOf[Mutable$tpe].value
* }"""
* }.foreach(println)
* }}}
*/
abstract class MutableValue extends Serializable {
var isNull: Boolean = true
Expand Down Expand Up @@ -184,7 +185,12 @@ final class MutableAny extends MutableValue {
}
}

class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow {
/**
* A row type that holds an array specialized container objects, of type [[MutableValue]], chosen
* based on the dataTypes of each column. The intent is to decrease garbage when modifying the
* values of primitive columns.
*/
final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow {

def this(dataTypes: Seq[DataType]) =
this(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,25 @@ case class MaxOf(left: Expression, right: Expression) extends Expression {

override def children = left :: right :: Nil

override def references = (left.flatMap(_.references) ++ right.flatMap(_.references)).toSet
override def references = left.references ++ right.references

override def dataType = left.dataType

override def eval(input: Row): Any = {
val leftEval = left.eval(input)
val rightEval = right.eval(input)
if (leftEval == null) {
rightEval
} else if (rightEval == null) {
leftEval
} else {
val numeric = left.dataType.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]]
if (numeric.compare(leftEval, rightEval) < 0) {
rightEval
} else {
leftEval
}
}
}

override def toString = s"MaxOf($left, $right)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._

// These classes are here to avoid issues with serialization and integration with quasiquotes.
class IntegerHashSet extends org.apache.spark.util.collection.OpenHashSet[Int]
class LongHashSet extends org.apache.spark.util.collection.OpenHashSet[Long]

Expand Down Expand Up @@ -53,6 +54,11 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
private val curId = new java.util.concurrent.atomic.AtomicInteger()
private val javaSeparator = "$"

/**
* Can be flipped on manually in the console to add (expensive) expression evaluation trace code.
*/
var debugLogging = false

/**
* Generates a class for a given input expression. Called when there is not cached code
* already available.
Expand Down Expand Up @@ -496,7 +502,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin

// Only inject debugging code if debugging is turned on.
val debugCode =
if (false) {
if (debugLogging) {
val localLogger = log
val localLoggerTree = reify { localLogger }
q"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ class ExpressionEvaluationSuite extends FunSuite {
checkEvaluation(In(Literal(1), Seq(Literal(1), Literal(2))) && In(Literal(2), Seq(Literal(1), Literal(2))), true)
}

test("MaxOf") {
checkEvaluation(MaxOf(1, 2), 2)
checkEvaluation(MaxOf(2, 1), 2)
checkEvaluation(MaxOf(1L, 2L), 2L)
checkEvaluation(MaxOf(2L, 1L), 2L)

checkEvaluation(MaxOf(Literal(null, IntegerType), 2), 2)
checkEvaluation(MaxOf(2, Literal(null, IntegerType)), 2)
}

test("LIKE literal Regular Expression") {
checkEvaluation(Literal(null, StringType).like("a"), null)
checkEvaluation(Literal("a", StringType).like(Literal(null, StringType)), null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
}
assert(sizes.size === 2 && sizes(0) <= autoBroadcastJoinThreshold,
s"query should contain two relations, each of which has size smaller than autoConvertSize instead ${rdd.queryExecution}")
s"query should contain two relations, each of which has size smaller than autoConvertSize")

// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
Expand Down

0 comments on commit c122cca

Please sign in to comment.