Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel streaming issue #302

Closed
saschaarthur opened this issue Oct 11, 2016 · 9 comments
Closed

Parallel streaming issue #302

saschaarthur opened this issue Oct 11, 2016 · 9 comments
Assignees
Labels
Milestone

Comments

@saschaarthur
Copy link

saschaarthur commented Oct 11, 2016

Hello,

I have trouble in parallel processing the with the latest version (1.0.0-Final):

public class Init {
    public static void main(String args[])
    {
        int cores = Runtime.getRuntime().availableProcessors();
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", String.valueOf(cores*4));

        for(int k=0; k < 10;k++) {

            com.aol.cyclops.data.async.Queue<Integer> queue = QueueFactories.<Integer>boundedQueue(5000).build();

            new Thread(() -> {
                while(!queue.isOpen());
                System.err.println(queue.close());
            }).start();

            Stream<Integer> stream = queue.jdkStream();

            stream = stream.parallel();
            stream.forEach(e ->
            {
                System.out.println(e);
            });
            System.out.println("done " + k);
        }
    }
}

looks like mainthread/forkjoinpools threads waiting for the stream to close, if i comment out stream = stream.parallel(); above example works brilliant

@johnmcclean
Copy link
Member

johnmcclean commented Oct 11, 2016

Your analysis looks correct to me. I would guess that the signal that queue is closed is not being propagated to all of the connected threads for the parrallel Stream.

Thanks for this, I'll add a failing unit test to fix based on your example.

@johnmcclean johnmcclean added this to the 1.0.1 milestone Oct 12, 2016
@johnmcclean
Copy link
Member

I suspect we need to register X connections when jdkStream() is called where X=java.util.concurrent.ForkJoinPool.common.parallelism or Runtime.getAvailableProcessors() to be sure all Spliterators are notified when a Queue is closed.

@johnmcclean
Copy link
Member

Fixed in #315

@saschaarthur
Copy link
Author

saschaarthur commented Nov 2, 2016

Hello,

Im still having issues (1.01) with a lambda which is never solved (code is almost the same like the sample above, also using jdkstream).
Running the same code sequentiell is not a problem.
Any idears ?
Ill try to break down the issue on example code..

@johnmcclean
Copy link
Member

johnmcclean commented Nov 2, 2016

Yes please do. We probably need a larger suite of tests against the Queue type for parallel Streams.

The last issue was down to not all consuming threads receiving the Queue closed signal. A hacky work around might be to allow the number of queue closed messages to be configurable.

I wonder if a sequential Stream could be connected to the Queue and a parallel Stream connected to it (that would probably mean putting in place some sort of locking to ensure the data was read sequentially from the Queue in this case, which is not ideal either)..

@johnmcclean
Copy link
Member

johnmcclean commented Nov 2, 2016

Debugging the previous issue multiple poison pills (signify queue closed) had to be sent per thread. 2 seemed to work. I'm wondering if this is due to how the parallel stream is attempting to split up the data stream. The number of times the poison pill needs to be sent might be a function of the number of data forks created the by Stream api? I'll investigate that and see if there is a way of determining that number exactly.

@johnmcclean
Copy link
Member

It looks like the parallel stream can generate a lot of spliterators (around 500 being common on my tests pushing ~5000 entries through). Adding a common status across Spliterators as to whether or not the Queue has been closed seems to lead to streams being disconnected with much less overhead in terms of poision pills being sent. The default in the next version will be one per connected thread (with an option to configure this higher if needed).

@saschaarthur
Copy link
Author

Thanks a lot !

Sorry im a bit under water at the moment, had no time yet to break it down to an example.
Ill try my best the next few days..

@johnmcclean
Copy link
Member

The last couple of releases offered a kind of hacky way around this by making the number of posion pills to send when the Queue closes configurable (via jdkStream (scalingFactor) or disconnectStreams(pillsToSend) ).

There is another solution which is to set the timeout on the Queue. Both the numeric value and the timeout units can be set. Once they are set any connnected Threads will timeout and attempt to reconnect (which if the Queue is closed results in disconnection).

Queue#withTimeout and Queue#withTimeUnit can be used. This creates a new Queue instance with the changed timeout settings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants