Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement MapReducer.stream() #19

Merged
merged 13 commits into from
Oct 10, 2018
Merged

Implement MapReducer.stream() #19

merged 13 commits into from
Oct 10, 2018

Conversation

tyrasd
Copy link
Member

@tyrasd tyrasd commented Sep 18, 2018

Implements the idea from #17

Works in JDBC (singlethread/multithread) backends as well as the "AffinityCall" ignite backend (other backends fall back to .collect().stream(). For the other ignite backends it is not very straight-forward to implement. Maybe it can be implemented at some later stage, or the oshdb can (by default) fall back to another backend if it finds that the current one doesn't implement streaming?

@tyrasd tyrasd added the enhancement New feature or request label Sep 18, 2018
which is similar to performing something like `MapAggregator.collect().entrySet().stream()` with the difference that the latter would return all individual results for a particular index value as a list, while the stream returns them individually.

the current MapAggregator.collect() could be mimicked by doing `MapAggregator.stream().collect(Collectors.groupingBy(Entry::getKey, Collectors.mapping(Entry::getValue, Collectors.toList())))`
these are the methods that process the contents of the individual osh cells (cellIterator.*, grouping results by entity id, reducing)
return (oshEntityCell, cellIterator) -> {
AtomicReference<S> accInternal = new AtomicReference<>(identitySupplier.get());
// iterate over the history of all OSM objects in the current cell
List<OSMContribution> contributions = new ArrayList<>();
Copy link
Member Author

@tyrasd tyrasd Sep 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw: while testing, I found that here (and in a few places below) re-using this array for multiple consecutive entities can cause a problem, when one just directly uses these arrays as output, e.g. the following would produce different results:

….groupByEntity().collect() // <- produces wrong result
….groupByEntity().map(ArrayList::new).collect() // works fine

This is kind of a corner case, since (at first glance) it only happens when using groupByEntity and one does not specify any map function and one does not aggregate the results. But it could still be confusing to run into this and (I think) it doesn't matter too much from a performance point of view. I guess we should just not re-use the list here, or what do you think?

new generic mapreducer tests must also fail when there are missing tables/caches
streaming kernels now return collections of the individual cell results (instead of streams of individual cell results), because streams can't be serialized to be used in a remote "peer class loading" environment like ignite
makes the onClose actually work as expected/described in #30 (comment):
the old implementation didn't run the callback in the same job / broadcast runnable as the main map-reduce routine, thus triggering another serialization-deserialization of the remote object for the onClose callback, which would then see a different (new) object, which cannot to used to close a db connection for example.
@tyrasd tyrasd merged commit 9b902ad into master Oct 10, 2018
@tyrasd tyrasd deleted the streams branch October 10, 2018 08:50
@tyrasd tyrasd added this to the release 0.5.0 milestone Dec 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants