Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement]: The optimizer adds a cache of eq delete files to reduce repeated IO cost of eq delete files #2553

Open
3 tasks done
zhongqishang opened this issue Feb 18, 2024 · 6 comments · May be fixed by #2584
Open
3 tasks done

Comments

@zhongqishang
Copy link
Contributor

zhongqishang commented Feb 18, 2024

Search before asking

  • I have searched in the issues and found no similar issues.

What would you like to be improved?

For large tables written by Flink, each commit will submit an EQ DELETE file associated with all previous data files. Most of the generated optimize tasks will repeatedly read this EQ DELETE file, causing duplicate IO cost.

How should we improve?

Each JVM(taskmanager, executor) in the Optimizer generates a Cache to cache the EQ DELETE File.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Subtasks

No response

Code of Conduct

@zhoujinsong
Copy link
Contributor

Hi, it is a good idea to cache the delete file data.
However, before proceeding with the actual implementation, we may need to clarify some details like:

  • Do we only cache data for equality-deleted files?
  • How to limit the size of the cache?
  • What kind of cache eviction strategy to use?
  • When allocating tasks in AMS, is it necessary to consider cache hits?

@zhoujinsong
Copy link
Contributor

zhoujinsong commented Feb 19, 2024

IMO,

Do we only cache data for equality-deleted files?

Yes, the position delete files only match limited insert files, so it seems to be unnecessary to cache them for now.

How to limit the size of the cache?

If we cannot fully cache a delete file, the cache will be ineffective. To better protect memory and cache as many files as possible, we may only want to cache delete files that are relatively small in size. Additionally, for delete files that are too large, they are now filtered during reading using the bloom filter of insert files, and they are not suitable for caching either.
Besides, we should add a configuration to control the maximum size of the cache, preferably in bytes.

What kind of cache eviction strategy to use?

LRU seems to be good enough.

When allocating tasks in AMS, is it necessary to consider cache hits?

Considering cache hits during scheduling can better utilize the cache. However, initially, we can overlook this and consider adding this optimization later.

@zhongqishang
Copy link
Contributor Author

Thanks for your reminder on the details.

The idea is from Iceberg community, Introduced delete file cache in PR #8755 for spark executors.

IMO,

Do we only cache data for equality-deleted files?

Yes, the position delete files only match limited insert files, so it seems to be unnecessary to cache them for now.

Yes.

How to limit the size of the cache?

If we cannot fully cache a delete file, the cache will be ineffective. To better protect memory and cache as many files as possible, we may only want to cache delete files that are relatively small in size. Additionally, for delete files that are too large, they are now filtered during reading using the bloom filter of insert files, and they are not suitable for caching either. Besides, we should add a configuration to control the maximum size of the cache, preferably in bytes.

Iceberg community imported spark.sql.iceberg.executor-cache.max-total-size to limit memory usage.
The cache is mainly on the Optimizer side, we can add a parameter, like -msz, which requires an upper limit on memory usage to stabilize the operation of the optimizer.

For the part using the bloom filter, we might be able to skip caching.

What kind of cache eviction strategy to use?

LRU seems to be good enough.

How about LRU + active expiration (when optimizer table changed)?

When allocating tasks in AMS, is it necessary to consider cache hits?

Considering cache hits during scheduling can better utilize the cache. However, initially, we can overlook this and consider adding this optimization later.

I also think configuring it on the optimizer side at first is enough.

@zhoujinsong
Copy link
Contributor

zhoujinsong commented Feb 19, 2024

@zhongqishang Thank you for providing detailed information on specific improvements to the Iceberg project. I'm delighted to see that this improvement has been merged into the Iceberg Spark connector.

How about LRU + active expiration (when optimizer table changed)?

I understand your point that if the executor thread switches to a new process, it can evict the existing cache. This seems like a good strategy, but it requires AMS to always schedule tasks for a process together, which is currently the case. However, it seems difficult to determine the timing of switching to a new process in situations like multiple threads sharing a JVM environment.

@majin1102
Copy link
Contributor

This feature would quite benefit for Spark because cache could be shared among tasks in a single executor(which would not previously), and tasks here are belonged to one task type.

This situation is not guaranteed in Amoro, this feature may have some influences on process and task scheduler

@zhongqishang
Copy link
Contributor Author

zhongqishang commented Mar 27, 2024

@majin1102 Thanks for your reply.

I just want to cover some tables with many data files. The newly added eq files will all be associated. The eq delete in the optimizer will read the number of tasks repeatedly because Optimizer concurrency will be much smaller than the number of data files. If we cache it, we will speed up the merge and reduce the IO cost of the eq file.

There will be some adverse effects on tables that are not in this scenario, because cache will involve the materialization of eq delete data. Like @zhoujinsong said, we can consider cache hits during scheduling to avoid unnecessary effect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants