-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19644][SQL]Clean up Scala reflection garbage after creating Encoder #19687
Conversation
cc @cloud-fan |
Test build #83571 has finished for PR 19687 at commit
|
there is something wrong in my program |
java.io.NotSerializableException: scala.reflect.api.TypeTags$PredefTypeCreator |
LGTM, is it targeted for branch 2.2 too? |
*/ | ||
private def verifyNotLeakingReflectionObjects[T](func: => T): T = { | ||
def undoLogSize: Int = { | ||
import scala.reflect.runtime.{JavaUniverse, universe} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just out of curiosity why import here, and for one usage in the line below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No special reason. I changed to use the fully qualified class name now.
|
||
private def testAndVerifyNotLeakingReflectionObjects(testName: String)(testFun: => Any) { | ||
test(testName) { | ||
verifyNotLeakingReflectionObjects(testFun) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just out of curiosity, do we have to call verifyNotLeakingReflectionObjects
twice? One is to call implicit
. The other is here.
@@ -370,7 +372,7 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest { | |||
private def encodeDecodeTest[T : ExpressionEncoder]( | |||
input: T, | |||
testName: String): Unit = { | |||
test(s"encode/decode for $testName: $input") { | |||
testAndVerifyNotLeakingReflectionObjects(s"encode/decode for $testName: $input") { | |||
val encoder = implicitly[ExpressionEncoder[T]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we will verify the memory leak, seems no need to create testAndVerifyNotLeakingReflectionObjects
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to verify that some special objects such as UnresolvedMapObjects
don't leak. E.g., ScalaReflection creates a function here but the function runs after creating the encoder:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
Line 280 in 6327ea5
val mapFunction: Expression => Expression = element => { |
@ManchesterUnited16 Probably this change may capture some unnecessary objects. Could you provide your codes to help me investigate it, or provide the full stack trace outputted by Spark? |
@ManchesterUnited16 Anyway, I added some defensive codes in the test to make sure encoders are serializable. |
Yeah, I will backport this to 2.2 once this PR gets merged. |
Test build #83608 has finished for PR 19687 at commit
|
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
// $example off$
import org.apache.spark.sql.SparkSession
object ALSExample {
// $example on$
case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
val fields = str.split("::")
assert(fields.size == 4)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat,fields(3).toLong)
}
// $example off$
def main(args: Array[String]) {
val spark = SparkSession
.builder
.master("local")
.appName("ALSExample")
.getOrCreate()
import spark.implicits._
// $example on$
val ratings: DataFrame = spark.read.textFile("D:\\xcar\\Spark_MLib\\ml_2.11.1\\src\\data\\mllib\\als\\sample_movielens_ratings.txt")
.map(parseRating)
.toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
// Build the recommendation model using ALS on the training data
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
val model: ALSModel = als.fit(training)
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions: DataFrame = model.transform(test)
val evaluator: RegressionEvaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
// Generate top 10 movie recommendations for each user
val userRecs: DataFrame = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs: DataFrame = model.recommendForAllItems(10)
// Generate top 10 movie recommendations for a specified set of users
// val users = ratings.select(als.getUserCol).distinct().limit(3)
// val userSubsetRecs = model.recommendForUserSubset(users, 10)
// // Generate top 10 user recommendations for a specified set of movies
// val movies = ratings.select(als.getItemCol).distinct().limit(3)
// val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
// $example off$
userRecs.show()
//movieRecs.show()
// userSubsetRecs.show()
// movieSubSetRecs.show()
spark.stop()
}
}
this is my code ,and when I run the line"userRecs.show()",there are some error "java.io.NotSerializableException: scala.reflect.api.TypeTags$PredefTypeCreator"
At 2017-11-09 05:29:55, "Shixiong Zhu" <notifications@github.com> wrote:
@zsxwing commented on this pull request.
In sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala:
@@ -441,4 +443,28 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
}
}
}
+
+ /**
+ * Verify the size of scala.reflect.runtime.JavaUniverse.undoLog before and after `func` to
+ * ensure we don't leak Scala reflection garbage.
+ *
+ * @see org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects
+ */
+ private def verifyNotLeakingReflectionObjects[T](func: => T): T = {
+ def undoLogSize: Int = {
+ import scala.reflect.runtime.{JavaUniverse, universe}
No special reason. I changed to use the fully qualified class name now.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
|
@ManchesterUnited16 I ran your codes and didn't see |
can you show me you maven dependency when you ran the program,thank you very much!
At 2017-11-09 13:37:46, "Shixiong Zhu" <notifications@github.com> wrote:
@ManchesterUnited16 I ran your codes and didn't see NotSerializableException. How did you patch Spark with my PR?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
|
maven dependency is used only when I compiled the codes. I used the official Spark 2.2.0 to compile the codes, and built Spark based on this PR (commit hash: f88fb50) and ran my problem using it. Could you show how did you compile the codes, how did you build Spark, and how did you run the codes? |
@ManchesterUnited16 Since I tested your codes and it works for me, I'm going to merge this PR and backport it to 2.2. If it's a real issue, feel free to open a ticket with steps to reproduce it. |
…ncoder Because of the memory leak issue in `scala.reflect.api.Types.TypeApi.<:<` (scala/bug#8302), creating an encoder may leak memory. This PR adds `cleanUpReflectionObjects` to clean up these leaking objects for methods calling `scala.reflect.api.Types.TypeApi.<:<`. The updated unit tests. Author: Shixiong Zhu <zsxwing@gmail.com> Closes apache#19687 from zsxwing/SPARK-19644.
…ncoder (branch-2.2) ## What changes were proposed in this pull request? Backport #19687 to branch-2.2. The major difference is `cleanUpReflectionObjects` is protected by `ScalaReflectionLock.synchronized` in this PR for Scala 2.10. ## How was this patch tested? Jenkins Author: Shixiong Zhu <zsxwing@gmail.com> Closes #19718 from zsxwing/SPARK-19644-2.2.
I have the same issue @ManchesterUnited16 , did you find a solution? |
…ncoder (branch-2.2) ## What changes were proposed in this pull request? Backport apache#19687 to branch-2.2. The major difference is `cleanUpReflectionObjects` is protected by `ScalaReflectionLock.synchronized` in this PR for Scala 2.10. ## How was this patch tested? Jenkins Author: Shixiong Zhu <zsxwing@gmail.com> Closes apache#19718 from zsxwing/SPARK-19644-2.2.
What changes were proposed in this pull request?
Because of the memory leak issue in
scala.reflect.api.Types.TypeApi.<:<
(scala/bug#8302), creating an encoder may leak memory.This PR adds
cleanUpReflectionObjects
to clean up these leaking objects for methods callingscala.reflect.api.Types.TypeApi.<:<
.How was this patch tested?
The updated unit tests.