Skip to content

Commit

Permalink
[SPARK-19644][SQL] Clean up Scala reflection garbage after creating E…
Browse files Browse the repository at this point in the history
…ncoder

## What changes were proposed in this pull request?

Because of the memory leak issue in `scala.reflect.api.Types.TypeApi.<:<` (scala/bug#8302), creating an encoder may leak memory.

This PR adds `cleanUpReflectionObjects` to clean up these leaking objects for methods calling `scala.reflect.api.Types.TypeApi.<:<`.

## How was this patch tested?

The updated unit tests.

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #19687 from zsxwing/SPARK-19644.
  • Loading branch information
zsxwing committed Nov 10, 2017
1 parent 5ebdcd1 commit 24ea781
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object ScalaReflection extends ScalaReflection {
*/
def dataTypeFor[T : TypeTag]: DataType = dataTypeFor(localTypeOf[T])

private def dataTypeFor(tpe: `Type`): DataType = {
private def dataTypeFor(tpe: `Type`): DataType = cleanUpReflectionObjects {
tpe.dealias match {
case t if t <:< definitions.IntTpe => IntegerType
case t if t <:< definitions.LongTpe => LongType
Expand Down Expand Up @@ -93,7 +93,7 @@ object ScalaReflection extends ScalaReflection {
* Special handling is performed for primitive types to map them back to their raw
* JVM form instead of the Scala Array that handles auto boxing.
*/
private def arrayClassFor(tpe: `Type`): ObjectType = {
private def arrayClassFor(tpe: `Type`): ObjectType = cleanUpReflectionObjects {
val cls = tpe.dealias match {
case t if t <:< definitions.IntTpe => classOf[Array[Int]]
case t if t <:< definitions.LongTpe => classOf[Array[Long]]
Expand Down Expand Up @@ -146,7 +146,7 @@ object ScalaReflection extends ScalaReflection {
private def deserializerFor(
tpe: `Type`,
path: Option[Expression],
walkedTypePath: Seq[String]): Expression = {
walkedTypePath: Seq[String]): Expression = cleanUpReflectionObjects {

/** Returns the current path with a sub-field extracted. */
def addToPath(part: String, dataType: DataType, walkedTypePath: Seq[String]): Expression = {
Expand Down Expand Up @@ -441,7 +441,7 @@ object ScalaReflection extends ScalaReflection {
inputObject: Expression,
tpe: `Type`,
walkedTypePath: Seq[String],
seenTypeSet: Set[`Type`] = Set.empty): Expression = {
seenTypeSet: Set[`Type`] = Set.empty): Expression = cleanUpReflectionObjects {

def toCatalystArray(input: Expression, elementType: `Type`): Expression = {
dataTypeFor(elementType) match {
Expand Down Expand Up @@ -648,7 +648,7 @@ object ScalaReflection extends ScalaReflection {
* Returns true if the given type is option of product type, e.g. `Option[Tuple2]`. Note that,
* we also treat [[DefinedByConstructorParams]] as product type.
*/
def optionOfProductType(tpe: `Type`): Boolean = {
def optionOfProductType(tpe: `Type`): Boolean = cleanUpReflectionObjects {
tpe.dealias match {
case t if t <:< localTypeOf[Option[_]] =>
val TypeRef(_, _, Seq(optType)) = t
Expand Down Expand Up @@ -710,7 +710,7 @@ object ScalaReflection extends ScalaReflection {
def schemaFor[T: TypeTag]: Schema = schemaFor(localTypeOf[T])

/** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */
def schemaFor(tpe: `Type`): Schema = {
def schemaFor(tpe: `Type`): Schema = cleanUpReflectionObjects {
tpe.dealias match {
case t if t.typeSymbol.annotations.exists(_.tree.tpe =:= typeOf[SQLUserDefinedType]) =>
val udt = getClassFromType(t).getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance()
Expand Down Expand Up @@ -780,7 +780,7 @@ object ScalaReflection extends ScalaReflection {
/**
* Whether the fields of the given type is defined entirely by its constructor parameters.
*/
def definedByConstructorParams(tpe: Type): Boolean = {
def definedByConstructorParams(tpe: Type): Boolean = cleanUpReflectionObjects {
tpe.dealias <:< localTypeOf[Product] || tpe.dealias <:< localTypeOf[DefinedByConstructorParams]
}

Expand Down Expand Up @@ -809,6 +809,17 @@ trait ScalaReflection {
// Since the map values can be mutable, we explicitly import scala.collection.Map at here.
import scala.collection.Map

/**
* Any codes calling `scala.reflect.api.Types.TypeApi.<:<` should be wrapped by this method to
* clean up the Scala reflection garbage automatically. Otherwise, it will leak some objects to
* `scala.reflect.runtime.JavaUniverse.undoLog`.
*
* @see https://github.com/scala/bug/issues/8302
*/
def cleanUpReflectionObjects[T](func: => T): T = {
universe.asInstanceOf[scala.reflect.runtime.JavaUniverse].undoLog.undo(func)
}

/**
* Return the Scala Type for `T` in the current classloader mirror.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.ClosureCleaner

case class RepeatedStruct(s: Seq[PrimitiveData])

Expand Down Expand Up @@ -114,7 +115,9 @@ object ReferenceValueClass {
class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
OuterScopes.addOuterScope(this)

implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = ExpressionEncoder()
implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = verifyNotLeakingReflectionObjects {
ExpressionEncoder()
}

// test flat encoders
encodeDecodeTest(false, "primitive boolean")
Expand Down Expand Up @@ -370,8 +373,12 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
private def encodeDecodeTest[T : ExpressionEncoder](
input: T,
testName: String): Unit = {
test(s"encode/decode for $testName: $input") {
testAndVerifyNotLeakingReflectionObjects(s"encode/decode for $testName: $input") {
val encoder = implicitly[ExpressionEncoder[T]]

// Make sure encoder is serializable.
ClosureCleaner.clean((s: String) => encoder.getClass.getName)

val row = encoder.toRow(input)
val schema = encoder.schema.toAttributes
val boundEncoder = encoder.resolveAndBind()
Expand Down Expand Up @@ -441,4 +448,28 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
}
}
}

/**
* Verify the size of scala.reflect.runtime.JavaUniverse.undoLog before and after `func` to
* ensure we don't leak Scala reflection garbage.
*
* @see org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects
*/
private def verifyNotLeakingReflectionObjects[T](func: => T): T = {
def undoLogSize: Int = {
scala.reflect.runtime.universe
.asInstanceOf[scala.reflect.runtime.JavaUniverse].undoLog.log.size
}

val previousUndoLogSize = undoLogSize
val r = func
assert(previousUndoLogSize == undoLogSize)
r
}

private def testAndVerifyNotLeakingReflectionObjects(testName: String)(testFun: => Any) {
test(testName) {
verifyNotLeakingReflectionObjects(testFun)
}
}
}

0 comments on commit 24ea781

Please sign in to comment.