Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19644][SQL]Clean up Scala reflection garbage after creating Encoder #19687

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object ScalaReflection extends ScalaReflection {
*/
def dataTypeFor[T : TypeTag]: DataType = dataTypeFor(localTypeOf[T])

private def dataTypeFor(tpe: `Type`): DataType = {
private def dataTypeFor(tpe: `Type`): DataType = cleanUpReflectionObjects {
tpe.dealias match {
case t if t <:< definitions.IntTpe => IntegerType
case t if t <:< definitions.LongTpe => LongType
Expand Down Expand Up @@ -93,7 +93,7 @@ object ScalaReflection extends ScalaReflection {
* Special handling is performed for primitive types to map them back to their raw
* JVM form instead of the Scala Array that handles auto boxing.
*/
private def arrayClassFor(tpe: `Type`): ObjectType = {
private def arrayClassFor(tpe: `Type`): ObjectType = cleanUpReflectionObjects {
val cls = tpe.dealias match {
case t if t <:< definitions.IntTpe => classOf[Array[Int]]
case t if t <:< definitions.LongTpe => classOf[Array[Long]]
Expand Down Expand Up @@ -140,7 +140,7 @@ object ScalaReflection extends ScalaReflection {
private def deserializerFor(
tpe: `Type`,
path: Option[Expression],
walkedTypePath: Seq[String]): Expression = {
walkedTypePath: Seq[String]): Expression = cleanUpReflectionObjects {

/** Returns the current path with a sub-field extracted. */
def addToPath(part: String, dataType: DataType, walkedTypePath: Seq[String]): Expression = {
Expand Down Expand Up @@ -435,7 +435,7 @@ object ScalaReflection extends ScalaReflection {
inputObject: Expression,
tpe: `Type`,
walkedTypePath: Seq[String],
seenTypeSet: Set[`Type`] = Set.empty): Expression = {
seenTypeSet: Set[`Type`] = Set.empty): Expression = cleanUpReflectionObjects {

def toCatalystArray(input: Expression, elementType: `Type`): Expression = {
dataTypeFor(elementType) match {
Expand Down Expand Up @@ -642,7 +642,7 @@ object ScalaReflection extends ScalaReflection {
* Returns true if the given type is option of product type, e.g. `Option[Tuple2]`. Note that,
* we also treat [[DefinedByConstructorParams]] as product type.
*/
def optionOfProductType(tpe: `Type`): Boolean = {
def optionOfProductType(tpe: `Type`): Boolean = cleanUpReflectionObjects {
tpe.dealias match {
case t if t <:< localTypeOf[Option[_]] =>
val TypeRef(_, _, Seq(optType)) = t
Expand Down Expand Up @@ -704,7 +704,7 @@ object ScalaReflection extends ScalaReflection {
def schemaFor[T: TypeTag]: Schema = schemaFor(localTypeOf[T])

/** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */
def schemaFor(tpe: `Type`): Schema = {
def schemaFor(tpe: `Type`): Schema = cleanUpReflectionObjects {
tpe.dealias match {
case t if t.typeSymbol.annotations.exists(_.tree.tpe =:= typeOf[SQLUserDefinedType]) =>
val udt = getClassFromType(t).getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance()
Expand Down Expand Up @@ -774,7 +774,7 @@ object ScalaReflection extends ScalaReflection {
/**
* Whether the fields of the given type is defined entirely by its constructor parameters.
*/
def definedByConstructorParams(tpe: Type): Boolean = {
def definedByConstructorParams(tpe: Type): Boolean = cleanUpReflectionObjects {
tpe.dealias <:< localTypeOf[Product] || tpe.dealias <:< localTypeOf[DefinedByConstructorParams]
}

Expand Down Expand Up @@ -803,6 +803,17 @@ trait ScalaReflection {
// Since the map values can be mutable, we explicitly import scala.collection.Map at here.
import scala.collection.Map

/**
* Any codes calling `scala.reflect.api.Types.TypeApi.<:<` should be wrapped by this method to
* clean up the Scala reflection garbage automatically. Otherwise, it will leak some objects to
* `scala.reflect.runtime.JavaUniverse.undoLog`.
*
* @see https://github.com/scala/bug/issues/8302
*/
def cleanUpReflectionObjects[T](func: => T): T = {
universe.asInstanceOf[scala.reflect.runtime.JavaUniverse].undoLog.undo(func)
}

/**
* Return the Scala Type for `T` in the current classloader mirror.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.ClosureCleaner

case class RepeatedStruct(s: Seq[PrimitiveData])

Expand Down Expand Up @@ -114,7 +115,9 @@ object ReferenceValueClass {
class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
OuterScopes.addOuterScope(this)

implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = ExpressionEncoder()
implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = verifyNotLeakingReflectionObjects {
ExpressionEncoder()
}

// test flat encoders
encodeDecodeTest(false, "primitive boolean")
Expand Down Expand Up @@ -370,8 +373,12 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
private def encodeDecodeTest[T : ExpressionEncoder](
input: T,
testName: String): Unit = {
test(s"encode/decode for $testName: $input") {
testAndVerifyNotLeakingReflectionObjects(s"encode/decode for $testName: $input") {
val encoder = implicitly[ExpressionEncoder[T]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we will verify the memory leak, seems no need to create testAndVerifyNotLeakingReflectionObjects

Copy link
Member Author

@zsxwing zsxwing Nov 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to verify that some special objects such as UnresolvedMapObjects don't leak. E.g., ScalaReflection creates a function here but the function runs after creating the encoder:

val mapFunction: Expression => Expression = element => {


// Make sure encoder is serializable.
ClosureCleaner.clean((s: String) => encoder.getClass.getName)

val row = encoder.toRow(input)
val schema = encoder.schema.toAttributes
val boundEncoder = encoder.resolveAndBind()
Expand Down Expand Up @@ -441,4 +448,28 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
}
}
}

/**
* Verify the size of scala.reflect.runtime.JavaUniverse.undoLog before and after `func` to
* ensure we don't leak Scala reflection garbage.
*
* @see org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects
*/
private def verifyNotLeakingReflectionObjects[T](func: => T): T = {
def undoLogSize: Int = {
scala.reflect.runtime.universe
.asInstanceOf[scala.reflect.runtime.JavaUniverse].undoLog.log.size
}

val previousUndoLogSize = undoLogSize
val r = func
assert(previousUndoLogSize == undoLogSize)
r
}

private def testAndVerifyNotLeakingReflectionObjects(testName: String)(testFun: => Any) {
test(testName) {
verifyNotLeakingReflectionObjects(testFun)
Copy link
Member

@kiszk kiszk Nov 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity, do we have to call verifyNotLeakingReflectionObjects twice? One is to call implicit. The other is here.

}
}
}