Skip to content

Commit

Permalink
Use waitUntilEmpty to fix can't get correct numTotalCachedHit value
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyum committed Nov 3, 2018
1 parent eca075a commit a799e3f
Showing 1 changed file with 5 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -904,30 +904,24 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}

test("SPARK-24869 SaveIntoDataSourceCommand's input Dataset does not use cached data") {
var numTotalCachedHit = 0
val listener = new QueryExecutionListener {

private var totalCachedHit = 0

override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {}
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
qe.withCachedData match {
case c: SaveIntoDataSourceCommand if c.query.isInstanceOf[InMemoryRelation] =>
totalCachedHit += 1
numTotalCachedHit += 1
case _ =>
}
}

def numTotalCachedHit: Int = synchronized {
totalCachedHit
}
}
spark.listenerManager.register(listener)

val udf1 = udf({ (x: Int, y: Int) => x + y })
val df = spark.range(0, 3).toDF("a").withColumn("b", udf1(col("a"), lit(10)))
val df = spark.range(2)
df.cache()
df.write.format("org.apache.spark.sql.test.DefaultSource").save()
assert(listener.numTotalCachedHit === 1, "expected to use cached data")
sparkContext.listenerBus.waitUntilEmpty(1000)
assert(numTotalCachedHit === 1, "expected to use cached data")

spark.listenerManager.unregister(listener)
}
Expand Down

0 comments on commit a799e3f

Please sign in to comment.