Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47804] Add Dataframe cache debug log #45990

Closed
wants to merge 4 commits into from

Conversation

anchovYu
Copy link
Contributor

@anchovYu anchovYu commented Apr 10, 2024

What changes were proposed in this pull request?

This PR adds a debug log for Dataframe cache that uses SQL conf to turn on. It logs necessary information on

  • cache hit during cache application (the application happens basically on every query)
  • cache miss
  • adding new cache entries
  • removing cache entries (including clear all entries)

Because every query applies cache, this log could be huge and should be only turned on during some debugging process, and should not enabled by default in production.

Example:

spark.conf.set("spark.sql.dataframeCache.logLevel", "warn")
val df = spark.range(1, 10)


df.collect()
{"ts":"2024-04-10T16:41:10.010-0700","level":"WARN","msg":"Dataframe cache miss for input plan:\nRange (1, 10, step=1, splits=Some(10))\n","logger":"org.apache.spark.sql.execution.CacheManager"}
{"ts":"2024-04-10T16:41:10.010-0700","level":"WARN","msg":"Last 20 Dataframe cache entry logical plans:\n[]","logger":"org.apache.spark.sql.execution.CacheManager"}

df.cache()
{"ts":"2024-04-10T16:42:18.647-0700","level":"WARN","msg":"Dataframe cache miss for input plan:\nRange (1, 10, step=1, splits=Some(10))\n","logger":"org.apache.spark.sql.execution.CacheManager"}
{"ts":"2024-04-10T16:42:18.647-0700","level":"WARN","msg":"Last 20 Dataframe cache entry logical plans:\n[]","logger":"org.apache.spark.sql.execution.CacheManager"}
{"ts":"2024-04-10T16:42:18.662-0700","level":"WARN","msg":"Added Dataframe cache entry:\nCachedData(\nlogicalPlan=Range (1, 10, step=1, splits=Some(10))\n\nInMemoryRelation=InMemoryRelation [id#2L], StorageLevel(disk, memory, deserialized, 1 replicas)\n   +- *(1) Range (1, 10, step=1, splits=10)\n)\n","logger":"org.apache.spark.sql.execution.CacheManager"}


df.count()
{"ts":"2024-04-10T16:43:36.033-0700","level":"WARN","msg":"Dataframe cache hit for input plan:\nRange (1, 10, step=1, splits=Some(10))\nmatched with cache entry:\nCachedData(\nlogicalPlan=Range (1, 10, step=1, splits=Some(10))\n\nInMemoryRelation=InMemoryRelation [id#2L], StorageLevel(disk, memory, deserialized, 1 replicas)\n   +- *(1) Range (1, 10, step=1, splits=10)\n)\n","logger":"org.apache.spark.sql.execution.CacheManager"}
{"ts":"2024-04-10T16:43:36.041-0700","level":"WARN","msg":"Dataframe cache hit plan change summary:\n Aggregate [count(1) AS count#13L]           Aggregate [count(1) AS count#13L]\n!+- Range (1, 10, step=1, splits=Some(10))   +- InMemoryRelation [id#2L], StorageLevel(disk, memory, deserialized, 1 replicas)\n!                                                  +- *(1) Range (1, 10, step=1, splits=10)","logger":"org.apache.spark.sql.execution.CacheManager"}


df.unpersist()
{"ts":"2024-04-10T16:44:15.965-0700","level":"WARN","msg":"Removed 1 Dataframe cache entries, with logical plans being \n[Range (1, 10, step=1, splits=Some(10))\n]","logger":"org.apache.spark.sql.execution.CacheManager"}

Why are the changes needed?

Easier debugging.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Run local spark shell.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Apr 10, 2024
@@ -204,6 +215,8 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd))
}
plansToUncache.foreach { _.cachedRepresentation.cacheBuilder.clearCache(blocking) }
CacheManager.logCacheOperation(s"Removed ${plansToUncache.size} Dataframe cache " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to migrate to the structured logging API here.
Framework: #45729
Example migration PR: #45834

@@ -1609,6 +1609,19 @@ object SQLConf {
.checkValues(StorageLevelMapper.values.map(_.name()).toSet)
.createWithDefault(StorageLevelMapper.MEMORY_AND_DISK.name())

val DATAFRAME_CACHE_LOG_LEVEL = buildConf("spark.sql.dataframeCache.logLevel")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we need to debug cache table as well? Shall we rename the config as
spark.sql.cache.logLevel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, let's make it an internal conf since it is for developers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the Dataframe cache naming to differentiate it from the RDD cache.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RDD is a Spark core concept. Anyway I respect your choice here.

@gengliangwang
Copy link
Member

Thanks, merging to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants