Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] limit parallelly read file memory usage, extract some methods #1072

Merged

Conversation

leaves12138
Copy link
Contributor

[core] limit parallelly read file memory usage, extract some methods(#1061)

Purpose

Tests

(List UT and IT cases to verify this change)

API and Format

(Does this change affect API or storage format)

Documentation

(Does this change introduce a new feature)

@leaves12138 leaves12138 changed the title [core] limit parallelly read file memory usage, extract some methods(#1061) [core] limit parallelly read file memory usage, extract some methods May 5, 2023
// however entry.bucket() was computed against the old numOfBuckets
// and thus the filtered manifest entries might be empty
// which renders the bucket check invalid
if (filterByBucket(file)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'if' statement can be simplified

}

private <T extends AbstractManifestEntry> Pair<Long, List<T>> doPlan(
Function<List<ManifestFileMeta>, List<T>> processor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readManifestFile?

doPlan(
// how to process entry files
entries ->
entries.parallelStream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also put these logical into doPlan?
Only filterByStats is special, we can just instanceof ManifestEntry in filterByStats?


private <T extends AbstractManifestEntry> Pair<Long, List<T>> doPlan(
Function<List<ManifestFileMeta>, List<T>> processor,
Function<T, Boolean> postFilterProcessor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need this one, postFilter can be applied to SimpleManifestEntry too.
If there is level filter, throw exception.

@leaves12138 leaves12138 force-pushed the improve_memory-limit-read-file branch from 2f01378 to eed1bc3 Compare May 5, 2023 06:02
@leaves12138
Copy link
Contributor Author

fixed comment

if (entry instanceof ManifestEntry) {
return filterByStats((ManifestEntry) entry);
} else {
throw new RuntimeException("only complete manifest entry could be filter by stats");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return true;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done this

// reduce memory usage by batch iterable process, the cached result in memory will be 2 *
// queueSize
public static <T, U> Iterable<T> parallelismBatchIterable(
Function<List<U>, List<T>> processor, List<U> input, int queueSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default value of queueSize can be COMMON_IO_FORK_JOIN_POOL thread number * 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done this! set queueSize to FileUtils.COMMON_IO_FORK_JOIN_POOL.getParallelism() * 2

if (index < activeList.size()) {
next = activeList.get(index++);
if (index == activeList.size()) {
activeList = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You implement a wrong iterator.
Test should cover multiple invoking for hasNext .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done this. added tests for multiple invoking hasNext method

activeList = batch.get();
if (stack.size() > 0) {
batch =
CompletableFuture.supplyAsync(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to produce more elements when the consumer not finish this batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done this

@leaves12138 leaves12138 force-pushed the improve_memory-limit-read-file branch 3 times, most recently from ece57b0 to adf8b8e Compare May 5, 2023 08:25
int size = input.size();
int num = size / queueSize;

for (int i = 0; i < num; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lists.partition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (entry instanceof ManifestEntry) {
return filterByStats((ManifestEntry) entry);
}
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments here: filterByStats is an action that is completed as much as possible and does not have an impact if it is not done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@leaves12138 leaves12138 force-pushed the improve_memory-limit-read-file branch from adf8b8e to 0faf3b7 Compare May 5, 2023 11:28
@leaves12138
Copy link
Contributor Author

fixed comment

public static <T, U> Iterable<T> parallelismBatchIterable(
Function<List<U>, List<T>> processor, List<U> input) {
// default queueSize
int queueSize = FileUtils.COMMON_IO_FORK_JOIN_POOL.getParallelism() * 2;
Copy link
Contributor

@wxplovecc wxplovecc May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe default queueSize should be smaller , After testing it in the outdoor environment ,it will oom and set smaller than 80 is Ok, how about not *2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduce an option: SCAN_MANIFEST_PARALLELISM, default is none (will be COMMON_IO_FORK_JOIN_POOL.getParallelism()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done this

@leaves12138 leaves12138 force-pushed the improve_memory-limit-read-file branch 2 times, most recently from 352903e to bb6f277 Compare May 6, 2023 03:45
@leaves12138
Copy link
Contributor Author

fixed comment

@leaves12138
Copy link
Contributor Author

test passed in my local environment?

() -> processor.apply(stack.poll()),
FileUtils.COMMON_IO_FORK_JOIN_POOL)
.get();
} catch (InterruptedException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just

} catch (Exception e) {
    throw new RuntimeException(e);
}

?

  • Swallow InterruptedException is not good.
  • Exception message is confused, "should never get here", IOException will go here.

@leaves12138 leaves12138 force-pushed the improve_memory-limit-read-file branch from bb6f277 to f0c4135 Compare May 6, 2023 07:12
@github-actions github-actions bot added documentation Improvements or additions to documentation core labels May 6, 2023
@JingsongLi JingsongLi closed this May 6, 2023
@JingsongLi JingsongLi reopened this May 6, 2023
@@ -66,7 +67,7 @@ public T next() {
}

private void advanceIfNeeded() {
if ((activeList == null || index >= activeList.size())
while ((activeList == null || index >= activeList.size())
Copy link
Contributor

@JingsongLi JingsongLi May 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a unit test for this?

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit 5d23c7d into apache:master May 8, 2023
leaves12138 pushed a commit to leaves12138/incubator-paimon that referenced this pull request May 10, 2023
leaves12138 pushed a commit to leaves12138/incubator-paimon that referenced this pull request May 10, 2023
…everted code, we don't need AbstractEntry, but we still need memory control) (apache#1072)
@leaves12138 leaves12138 deleted the improve_memory-limit-read-file branch May 11, 2023 09:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core documentation Improvements or additions to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants