Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5: Support executor cache locality #9563

Merged
merged 1 commit into from Feb 5, 2024

Conversation

aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Jan 26, 2024

This PR adds an ability to enable executor cache locality. The new SQL property is off by default as it does not make sense in all cases and may introduce unnecessary task waits. When enabled and there are deletes, this logic will try to co-locate tasks for one partition to the same executors to increase the probability of the cache reuse.

@github-actions github-actions bot added the spark label Jan 26, 2024
@aokolnychyi aokolnychyi added this to the Iceberg 1.5.0 milestone Jan 26, 2024
@aokolnychyi aokolnychyi force-pushed the executor-cache-locality branch 2 times, most recently from f19a39b to e118b88 Compare January 31, 2024 03:28
}

private static boolean isPartitioned(PartitionScanTask task) {
return task.partition() != null && task.partition().size() > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about checks task.spec().isPartitioned() instead? Otherwise, VoidTransform in V1 table is not handled in the above code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I started with but task.spec() may be quite expensive.

@Override
public PartitionSpec spec() {
  if (spec == null) {
    synchronized (this) {
      if (spec == null) {
        this.spec = PartitionSpecParser.fromJson(schema(), specString);
      }
    }
  }
  return spec;
}

Even though there is a cache in PartitionSpecParser, we need to do to a lookup by the spec json. I don't think void transforms will be an issue as we check for presence of deletes and V1 tables can't have them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I started with but task.spec() may be quite expensive.

I think we already called specs() in SparkPartitioningAwareScan to preserve data grouping. If it's quite expensive to call task.spec(), it would be better that we can reduce this overhead.

First thing came out of my mind: how about add a int specId() method in the PartitionScanTask interface? The specId should be easier to store and retrieve. In the driver side, we can leverage table.specs() to retrieve the actual spec.

I don't think void transforms will be an issue as we check for presence of deletes and V1 tables can't have them.

Maybe some extreme case, such as: v1 table with identity(c1) -- dropped partition transform --> v1 table with void transform -- upgrade to v2 --> v2 table with void transform --> added deletes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. Let me profile that part and see how expensive it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the schema and spec caches are doing their jobs, I don't see much time spent on that, I'll switch to using specs. Good call, @advancedxy!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched to proper hashes, will add another JMH benchmark later but seems to perform alright.

return task.partition() != null && task.partition().size() > 0;
}

private static int hash(StructLike struct) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to reuse org.apache.iceberg.types.JavaHash(es) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would require having access to task.spec() to know the struct type, which I think is expensive to get.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic would cause issues if the underlying StructLike instances may have different value representations for the same column. For instance, one struct has String and the other one has some other form of CharSequence. I am not sure how realistic that would be. Even if that happens, there would be no correctness issues as such tasks would simply be assigned to different slots.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a cheap way to leverage JavaHashes, I am all for it.

.parse();
}

private boolean executorCacheLocalityEnabledInternal() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned that this doesn't play well with Spark's dynamicAllocation which should be enabled by default for most production systems.

Did you test how would this work with dynamic allocation enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original approach was to enable the executor cache locality by default only if dynamic allocation is disabled. After thinking more about it, I decided to simply disable it by default no matter whether static or dynamic allocation is used. As of right now, folks have to opt-in explicitly to enable executor cache locality. That way, we ensure there are no extra waits added on our end as we can't guarantee the locality would be beneficial.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original approach was to enable the executor cache locality by default only if dynamic allocation is disabled.

That's my first thought too. Then I realize what if users want to enable this anyway. It should be up to the users to decide.

What about log a warning when both dynamic allocation and executor cache locality are enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My worry is that we don't really know if enabling this with dynamic allocation is going to hurt. For instance, it still may make sense if the min number of executors is big enough or if the cluster is hot. Given that we would also have to add logic to parse the dynamic allocation config, I'd probably not log it and trust the person setting this for now.

sql("DELETE FROM %s WHERE id = 1", commitTarget());
sql("DELETE FROM %s WHERE id = 3", commitTarget());

assertEquals(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it only checks for records equality, but doesn't check the executor cache locality?

I think we may check spark RDD's getPreferredLocations instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid we can't really test this as SparkUtil$executorLocations would return an empty list in our local testing env. This test is to simply ensure nothing breaks if run on the driver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Iceberg's spark tests only support local mode. There's a local-cluster mode which requires extra setups. I think it's fine to left it as it is.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks reasonable to me, and low risk since it is disabled by default.

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@aokolnychyi
Copy link
Contributor Author

Thanks, @advancedxy @rdblue! I am going to test this with our RC on a cluster. I can't cover everything locally. I tested the initial prototype on a cluster and it worked well.

@aokolnychyi aokolnychyi merged commit c745ac3 into apache:main Feb 5, 2024
41 checks passed
devangjhabakh pushed a commit to cdouglas/iceberg that referenced this pull request Apr 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants