Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need help in understanding how to do parallel processing with reactive streams #100

Closed
atinsood opened this issue Jun 8, 2016 · 12 comments
Labels

Comments

@atinsood
Copy link

atinsood commented Jun 8, 2016

I wasn't sure if there is a dedicated SO for this, or if there was a mailing list where I could put up this questions. So apologies in advance if I have posted this at the wrong place.

I am trying to wrap my head around the concept of how to consume a reactive stream in a parallel fashion because from my understanding the stream is consumed by the subscriber in a sequential fashion.

The only example I could find was from https://github.com/ReactiveX/RxPY/blob/master/examples/parallel/timer.py . But I believe schedulers are supsd to do something like along those lines as well, not really able to understand the difference between the 2 is.

I am trying with a sample snippet

results = <an array of say 500 records>
rx.Observable.from_(results).flat_map(
                lambda item: executor.submit(<do something here that returns 1 tuple/record back>)            
            ).buffer_with_count(5).subscribe(custom_print)

def custom_print(items):
   print("length of items {}".format(len(items))

The result of this code is

length of items 5
length of items 5

and then the code moves on to the next statement. Am I missing something fundamental here. My understanding is that we should be getting a stream of 500 futures and the "length of items statement should repeat 100 times"

The same logic works with :

Observable.from_(results).map(<do something here that returns 1 tuple/record back>).buffer_with_count(5).subscribe(custom_print)

Thanks for the help.

@dbrattli
Copy link
Collaborator

dbrattli commented Jun 8, 2016

The key thing to remember is 1) A single subscription always runs sequentially. 2) Multiple subscriptions may run concurrently. If things runs in parallel depends on the GIL, third-party libraries used, or as with the example using the process pool.

The flat_map()/select_many() is basically splitting the stream (1 subscription) into a stream of streams (multiple subscriptions). Thus the stuff returned from flat_map() may run concurrently. I just tried the parallel/timer example and found no problems with it, so I'm not sure what problem you are experiencing.

So yes, you are getting a stream of 500 futures, but the output is generated when they resolve, and they may resolve in parallel. Thus the order of the merged stream may be very different from the original order. Could you perhaps provide a running example and explain exactly what output you get, and what you expect if you think the output was unexpected.

Buffering using buffer_with_count() has nothing to do with it, and is just buffering the output for you. Processing in front may run concurrently or not depending on scheduling, so you may just as well take it out of the equation.

@atinsood
Copy link
Author

atinsood commented Jun 8, 2016

@dbrattli thanks for the reply and the explanation. I will try to post a much more complete snippet by the end of the day

But here's something that I was trying today. Notice that I am using ThreadPoolExecutor vs ProcessPoolExecutor. Things seem to be working fine with ProcessPoolExecutor

I was under the assumption that either should not make a difference, because we get a list of futures that will be processed by the subscriber and only when all of these have been process the code will move to the next line

import concurrent.futures
from rx import Observable

def custom_print_buffer(items):
    if len(items) > 0 :
        print (len(items))

def custom_print(item):
    print (item)

def return_item(item):
    return item

x = Observable.from_(range(500))
with concurrent.futures.ThreadPoolExecutor(5) as executor:
     x.flat_map(lambda item : executor.submit(return_item, item)).buffer_with_count(100).subscribe(custom_print_buffer)
print("Done executing")

and I see outputs varying from

$ python examples.py
100
100
100
100
100
Done executing
$ python examples.py
100
100
45
100
100
Done executing
$ python examples.py
100
100
100
100
exception calling callback for <Future at 0x105641710 state=finished returned int>
Traceback (most recent call last):
  File "<python env>/.pyenv/versions/3.5.1/lib/python3.5/concurrent/futures/_base.py", line 297, in _invoke_callbacks
    callback(self)
  File "<python instal location>/lib/python3.5/site-packages/rx/linq/observable/fromfuture.py", line 26, in done
    observer.on_next(value)
  File "<python instal location>/lib/python3.5/site-packages/rx/core/observerbase.py", line 19, in on_next
    self._on_next_core(value)
  File "<python instal location>/lib/python3.5/site-packages/rx/core/autodetachobserver.py", line 16, in _on_next_core
    self.observer.on_next(value)
  File "<python instal location>/lib/python3.5/site-packages/rx/core/observerbase.py", line 19, in on_next
    self._on_next_core(value)
  File "<python instal location>/lib/python3.5/site-packages/rx/core/anonymousobserver.py", line 14, in _on_next_core
    self._next(value)
  File "<python instal location>/lib/python3.5/site-packages/rx/linq/observable/merge.py", line 128, in on_next
    observer.on_next(x)
  File "<python instal location>/lib/python3.5/site-packages/rx/core/observerbase.py", line 19, in on_next
    self._on_next_core(value)
  File "<python instal location>/lib/python3.5/site-packages/rx/core/autodetachobserver.py", line 16, in _on_next_core
    self.observer.on_next(value)
  File "<python instal location>/lib/python3.5/site-packages/rx/core/observerbase.py", line 19, in on_next
    self._on_next_core(value)
  File "<python instal location>/lib/python3.5/site-packages/rx/core/anonymousobserver.py", line 14, in _on_next_core
    self._next(value)
  File "<python instal location>/lib/python3.5/site-packages/rx/linq/observable/windowwithcount.py", line 57, in on_next
    s = q.pop(0);
IndexError: pop from empty list
Done executing

@atinsood
Copy link
Author

atinsood commented Jun 8, 2016

Also where does a scheduler fits in this picture. I was under the assumption that scheduler is ideally how you should be executing code snippets in different threads.

@dbrattli dbrattli added the bug label Jun 8, 2016
@dbrattli
Copy link
Collaborator

dbrattli commented Jun 8, 2016

Looks like we have a race-condition and that one or more operators are not thread-safe. I need to investigate, so flagging this as a bug.

About schedulers. You can write the same thing using schedulers:

scheduler = ThreadPoolScheduler()
xs = Observable.range(1, 5).flat_map(lambda x: Observable.just(x, scheduler=scheduler), mapper)

The ThreadPoolScheduler is in develop branch, and we don't have a ProcessPoolScheduler yet.

@atinsood
Copy link
Author

atinsood commented Jun 8, 2016

thanks, appreciate it. I will try and contribute the schedulers example mentioned above. Is there any reason one would prefer using scheduler vs using language specific multithreading? Just trying to understand if one approach is preferred over the other and rationale behind it.

@wolfch
Copy link

wolfch commented Mar 7, 2018

I can still recreate the race/thread issue - is there any further progress on this?

This code sometimes completes normally, but sometimes it blows up with IndexError: pop from empty list:

import concurrent.futures
from rx import Observable

def custom_print_buffer(items):
    if len(items) > 0 :
        print (len(items))

def custom_print(item):
    print (item)

def return_item(item):
    return item

x = Observable.from_(range(500))
with concurrent.futures.ThreadPoolExecutor(5) as executor:
     x.flat_map(lambda item : executor.submit(return_item, item)).buffer_with_count(100).subscribe(custom_print_buffer)
print("Done executing")

@wolfch
Copy link

wolfch commented Mar 7, 2018

In addition to sometimes raising that IndexError, sometimes it completes without error, but seems to drop processing of some of the items!

1520448287.059355 - 100
1520448287.070648 - 86
1520448287.083058 - 100
1520448287.093794 - 24
1520448287.104453 - 100
Done executing

@dbrattli
Copy link
Collaborator

I cannot reproduce this with RxPY v3 so closing this isse. Below is the equivalent v3 code, so please re-open this issue if you can make it fail:

import concurrent.futures
import rx
from rx import operators as ops


def custom_print_buffer(items):
    if items:
        print(len(items))


def custom_print(item):
    print(item)


def return_item(item):
    return item


x = rx.from_(range(500))
with concurrent.futures.ThreadPoolExecutor(5) as executor:
    x.pipe(
        ops.flat_map(lambda item: executor.submit(return_item, item)),
        ops.buffer_with_count(100)
    ).subscribe(custom_print_buffer)

print("Done executing")

@wolfch
Copy link

wolfch commented Jun 3, 2019

Isn't RxPy V3 beta?

@wolfch
Copy link

wolfch commented Jun 3, 2019

I concur that it appears to work with Python-3.6.4 using RxPY-3.0.0b4; but when it be promoted from beta to GA? Thanks for all the great work!

@dbrattli
Copy link
Collaborator

dbrattli commented Jun 3, 2019

Yes, still beta, but hopefully not for long (days)

@lock
Copy link

lock bot commented Jun 24, 2020

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators Jun 24, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

3 participants