Skip to content

[SPARK-46992]Fix cache consistency#45181

Closed
doki23 wants to merge 12 commits intoapache:masterfrom
doki23:cache_return_new_ds
Closed

[SPARK-46992]Fix cache consistency#45181
doki23 wants to merge 12 commits intoapache:masterfrom
doki23:cache_return_new_ds

Conversation

@doki23
Copy link

@doki23 doki23 commented Feb 20, 2024

What changes were proposed in this pull request?

This pr fixes SPARK-46992.
Whenever which one child plan of the root plan is cached/uncached, the root plan will always get the newest consistent executedPlan.

Why are the changes needed?

See comments of SPARK-46992.

df.collect()
df.cache()
df.collect() // won't read cached data because the variable `executedPlan` of df's `QueryExecution` is already initialized.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Test case checks the consistence of cached.count() and cached.collect(), and makes sure that size of collect is same as count.

Was this patch authored or co-authored using generative AI tooling?

No.

@dongjoon-hyun
Copy link
Member

Could you enable GitHub Action on your repository, @doki23 ? Apache Spark community uses the contributor's GitHub Action resources.

Screenshot 2024-02-20 at 12 22 49

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, the original code came from #4686 (at least) and seems to be the default Apache Spark behavior for a long time.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, Apache Spark's the persist data (underlying RDD) can be recomputed always, doesn't it? It's only for the best-effort approach to reduce re-computation. Do we guarantee the cached data's immutability?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...Sorry I do not understand your concerns. It does not change the immutability of cached data.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Feb 20, 2024

cc @cloud-fan and @HyukjinKwon because this seems to be claimed as a long standing correctness issue.

@cloud-fan
Copy link
Contributor

It's not an ideal behavior but should be easy to work around (df.select("*").collect()). IIUC this PR is also like a workaround, as the original df can't apply cache anyway because the physical plan is materialized.

@doki23
Copy link
Author

doki23 commented Feb 21, 2024

Could you enable GitHub Action on your repository, @doki23 ? Apache Spark community uses the contributor's GitHub Action resources.

Screenshot 2024-02-20 at 12 22 49

Ok, I've enabled it. @dongjoon-hyun

@github-actions github-actions bot added the INFRA label Feb 21, 2024
@doki23 doki23 marked this pull request as ready for review February 21, 2024 03:24
@doki23 doki23 force-pushed the cache_return_new_ds branch from b1870a7 to c2ea02a Compare February 21, 2024 03:43
Copy link
Contributor

@nchammas nchammas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In its current state, I don't think this PR fixes the issue described on Jira.

Here is the Python test I am running, which is simplified from the original reproduction that Denis posted:

data = spark.createDataFrame([(0,), (1,)], "val int")
data = data.orderBy("val").sample(fraction=0.5, seed=-1)
data.collect()
data.cache()
assert len(data.collect()) == data.count(), f"{data.collect()}, {data.count()}"

The assertion fails as follows:

AssertionError: [Row(val=0), Row(val=1)], 1

Changing data.cache() to data = data.cache() doesn't change the result.

Comment on lines 97 to 99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this is a valid test? Because this particular check passes for me on master.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sure that the cached data of the new Dataset instance is as expected. I'll also add one more case that proves the results of cached.count() and cached.collect() are consistent.

@nchammas
Copy link
Contributor

nchammas commented Feb 21, 2024

It's not an ideal behavior but should be easy to work around

Just to be clear, do you not consider it a correctness issue? To me, it's a correctness issue since the existing behavior on master violates what should be a basic invariant: df.count() and df.collect().size should always agree.

But this is not always true, as the repro documented in SPARK-46992 shows. I also posted my own repro just above.

@doki23 doki23 changed the title [SPARK-46992]Fix "Inconsistent results with 'sort', 'cache', and AQE." [SPARK-46992]Fix cache consistence Mar 7, 2024
@dtarima
Copy link
Contributor

dtarima commented Mar 7, 2024

All children have to be considered for changes of their persistence state. Currently it only checks the fist found child.
For clarity there is a test which fails: doki23#1

@doki23
Copy link
Author

doki23 commented Mar 8, 2024

All children have to be considered for changes of their persistence state. Currently it only checks the fist found child. For clarity there is a test which fails: doki23#1

So, we need a cache state signature for queryExecution

@doki23 doki23 marked this pull request as draft March 8, 2024 00:44
def queryExecution: QueryExecution = {
val cacheStatesSign = queryUnpersisted.computeCacheStateSignature()
// If all children aren't cached, directly return the queryUnpersisted
if (cacheStatesSign.forall(b => !b)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It doesn't look like it's necessary to distinguish between persisted and unpersisted anymore. If we wanted we could have a cache Map[State, QueryExecution] for different states, but I think it'd add unjustified complexity.
  2. We cannot use var - it's not thread-safe.
class Dataset[T] private[sql](
    @Unstable @transient val queryExecutionRef: AtomicReference[(Array[Boolean], QueryExecution)],
    @DeveloperApi @Unstable @transient val encoder: Encoder[T])
  extends Serializable {

  @DeveloperApi
  def queryExecution: QueryExecution = {
    val (state, queryExecution) = queryExecutionRef.get()
    val newState = queryExecution.computeCacheStateSignature()

    if (state.sameElements(newState)) queryExecution
    else {
      val newQueryExecution = new QueryExecution(queryExecution)
      queryExecutionRef.set((newState, newQueryExecution))
      newQueryExecution
    }
  }

  ...

Copy link
Author

@doki23 doki23 Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think making queryExecution wrapped by AtomicReference means it's thread-safe. For example, we unpersist one of it's children in another thread, and at meanwhile we call ds.count(), the cache consistency may be incorrect.

Using 2 queryExecution variables may help reduce count of analysis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think making queryExecution wrapped by AtomicReference means it's thread-safe. For example, we unpersist one of it's children in another thread, and at meanwhile we call ds.count(), the cache consistency may be incorrect.

Yes, consistency of results cannot be guaranteed when persistence state changes concurrently in different threads, but this is not what I was pointing to. Thread safety is a basic concept, not related to business logic: when we change a var in one thread, other threads might not see the updated reference. In order to avoid it the reference needs to be marked volatile. In the example above I used AtomicReference's set for simplicity, but it might make sense to implement it using compareAndSet to get additional guarantees.

Using 2 queryExecution variables may help reduce count of analysis.

I doubt that the additional complexity worth it. It's not a big deal... Let's see what reviewers think.

@doki23 doki23 marked this pull request as ready for review March 13, 2024 03:17
* This method performs a pre-order traversal and return a boolean Array
* representing whether some nodes of the logical tree are persisted.
*/
def computeCacheStateSignature(): Array[Boolean] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using BitSet for persistence state representation?
It'll be easier to work with and it's more efficient.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BitSet requires a numBits parameter. I cannot know the number of children in advance. Although current implementation is less efficient, it's still acceptable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary to know the number of bits during construction.
It's up to you, but just FYI, here is how it'd look like:

    val builder = BitSet.newBuilder
    var index = 0
    normalized.foreach { fragment =>
      val cached = fragment match {
        case _: IgnoreCachedData => false
        case _ => cacheManager.lookupCachedData(fragment).isDefined
      }
      if (cached) builder += index
      index += 1
    }
    builder.result()

Copy link
Author

@doki23 doki23 Mar 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that scala's BitSet doesn't record the last index so that I can't judge the num of fragments.

@doki23 doki23 marked this pull request as draft March 16, 2024 00:33
@doki23 doki23 changed the title [SPARK-46992]Fix cache consistence [SPARK-46992]Fix cache consistency Mar 17, 2024
@github-actions github-actions bot added the INFRA label Mar 20, 2024
@doki23 doki23 marked this pull request as ready for review March 22, 2024 23:00
@doki23
Copy link
Author

doki23 commented Mar 22, 2024

This pr may get ready. All tests are passed.

@doki23
Copy link
Author

doki23 commented Mar 25, 2024

@dongjoon-hyun @cloud-fan @nchammas Hi, would you please take a look?

@github-actions
Copy link

github-actions bot commented Jul 4, 2024

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jul 4, 2024
@github-actions github-actions bot closed this Jul 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants