Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-implement Stream.skip to have reasonable parallel behavior #4

Closed
almson opened this issue May 30, 2015 · 9 comments
Closed

Re-implement Stream.skip to have reasonable parallel behavior #4

almson opened this issue May 30, 2015 · 9 comments
Assignees
Milestone

Comments

@almson
Copy link

almson commented May 30, 2015

Currently, something like StreamEx.ofLines(...).skip(1).parallel().... doesn't do anything reasonable or useful. Instead of skipping the file's header, it skips a random line in the file. I think it should be possible to change its behavior to actually skip the first item if the source stream is ordered.

@amaembo
Copy link
Owner

amaembo commented May 30, 2015

As you can see, in this case StreamEx behavior is the same as the underlying Java Stream API behavior. However for your particular task there's a hackish alternative:

StreamEx.ofLines(...).pairMap((a, b) -> b).parallel()...

Try to test it.

@amaembo
Copy link
Owner

amaembo commented May 30, 2015

By the way I cannot reproduce your problem. I committed a testcase which works correctly both with skip and with hackish pairMap: it actually skips the first line. The documentation clearly states that the first stream element is skipped unless the stream is unordered. Please provide the code example where you experience this problem.

@amaembo
Copy link
Owner

amaembo commented May 30, 2015

Well, it's also possible that you are using an unordered collector as the terminal operation. For example, this test fails:

assertEquals(IntStreamEx.range(1, 5000).mapToObj(String::valueOf).toSet(),
        StreamEx.ofLines(new StringReader(input)).skip(1).parallel().toSet());

That's because Collectors.toSet() which is used internally is an unordered collector, thus it switches the stream to an unordered state, and in this state skip is allowed to skip any element. That's an interesting case which could be considered as a Stream API bug. You can fix this by using alternative ordered collector like this:

assertEquals(IntStreamEx.range(1, 5000).mapToObj(String::valueOf).toSet(),
        StreamEx.ofLines(new StringReader(input)).skip(1).parallel().toCollection(HashSet::new));

Or you can define your own collector which removes the UNORDERED characteristic. By the way the pairMap hack works well even with unordered collector. Though it's capable to skip single line only.

@amaembo
Copy link
Owner

amaembo commented May 31, 2015

I've created a separate branch and committed a new .recreate() method there:

StreamEx.ofLines(new StringReader(input)).parallel().skip(1).recreate().toSet();

This method starts a new pipeline at given point while keeping the whole procedure lazy. The thing is that parallel and unordered characteristics are not shared between the pipelines, so even if toSet() is unordered, this makes only the last pipeline unordered, but the first pipeline remains ordered. This should work nicely with any unordered terminal operation like findAny or forEach as well. I did not check yet how this would affect the performance. So you are free to checkout this branch and try this method. If it proves to be useful, I will include it to some later release (probably 0.3.2). The name recreate() maybe not the very best one, so I'm considering the alternatives as well.

Please note that I'm not going to "fix" the existing skip method for a reason. The idea of my library is to extend the Stream API, which means that at every point where standard Stream is already used, it can be replaced with StreamEx with no visible changes (see Liskov substitution principle). So if standard stream API has weird behavior or even bugs, then it should be the same when using StreamEx.

@amaembo amaembo added this to the 0.3.2 milestone Jun 13, 2015
@amaembo amaembo self-assigned this Jun 13, 2015
@amaembo
Copy link
Owner

amaembo commented Jun 13, 2015

The recreate() method looks to be useful and has very small performance impact, thus it's likely to be added to StreamEx 0.3.2.

@amaembo
Copy link
Owner

amaembo commented Jun 14, 2015

After additional testing and rethinking I decided to replace the recreate() with skipOrdered(n) which combines skip and recreate steps, turning the stream into sequential mode and pack to parallel if necessary. Seems that skip is the only problematic operation and problems appear only with non-sized sources, thus skipOrdered would be more clear for the users. I thought that there's similar problem with limit, but seems that limit always works fine and leaves the first entries even if terminal operation is unordered.

@amaembo
Copy link
Owner

amaembo commented Jun 14, 2015

skipOrdered added to the 0.3.2 version.

@amaembo amaembo closed this as completed Jun 14, 2015
@amaembo
Copy link
Owner

amaembo commented Jun 18, 2015

Brian Goetz approved that such skip() behavior is a bug. They posted a bug-report to OpenJDK bug tracker.

@amaembo
Copy link
Owner

amaembo commented Jul 1, 2015

Seems that the original Stream.skip is fixed for JDK 9 and backported to JDK 8u60. So my skipOrdered will become unnecessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants