Skip to content

Commit

Permalink
[SPARK-19736][SQL] refreshByPath should clear all cached plans with t…
Browse files Browse the repository at this point in the history
…he specified path

## What changes were proposed in this pull request?

`Catalog.refreshByPath` can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path.

However, `CacheManager.invalidateCachedPath` doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #17064 from viirya/fix-refreshByPath.
  • Loading branch information
viirya authored and cloud-fan committed Mar 1, 2017
1 parent 4913c92 commit 38e7835
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,16 @@ class CacheManager extends Logging {
(fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}

cachedData.foreach {
case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined =>
val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan))
if (dataIndex >= 0) {
data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true)
cachedData.remove(dataIndex)
}
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan))
case _ => // Do Nothing
cachedData.filter {
case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => true
case _ => false
}.foreach { data =>
val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan))
if (dataIndex >= 0) {
data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true)
cachedData.remove(dataIndex)
}
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,4 +634,20 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
assert(getNumInMemoryRelations(cachedPlan2) == 4)
}
}

test("refreshByPath should refresh all cached plans with the specified path") {
withTempDir { dir =>
val path = dir.getCanonicalPath()

spark.range(10).write.mode("overwrite").parquet(path)
spark.read.parquet(path).cache()
spark.read.parquet(path).filter($"id" > 4).cache()
assert(spark.read.parquet(path).filter($"id" > 4).count() == 5)

spark.range(20).write.mode("overwrite").parquet(path)
spark.catalog.refreshByPath(path)
assert(spark.read.parquet(path).count() == 20)
assert(spark.read.parquet(path).filter($"id" > 4).count() == 15)
}
}
}

0 comments on commit 38e7835

Please sign in to comment.