Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support to run streams in a subprocess #126

Closed
oliver-zehentleitner opened this issue Oct 28, 2020 · 12 comments
Closed

add support to run streams in a subprocess #126

oliver-zehentleitner opened this issue Oct 28, 2020 · 12 comments
Assignees
Labels
enhancement New feature or request
Projects

Comments

@oliver-zehentleitner
Copy link
Member

Is your feature request related to a problem? Please describe.
This lib is using multi-threading which is not really parallel in python - through the GIL all threads are executed sequential in a cycle.

Describe the solution you'd like
wrap streams into processes instead of threads to bypass GIL

@oliver-zehentleitner oliver-zehentleitner added the enhancement New feature or request label Oct 28, 2020
@oliver-zehentleitner oliver-zehentleitner self-assigned this Oct 28, 2020
@oliver-zehentleitner oliver-zehentleitner added this to To do in Todo Oct 28, 2020
@kimchirichie
Copy link

if the streams become subprocesses, would that not hinder the main process ability to terminate the stream? how would you kill the stream?

@oliver-zehentleitner
Copy link
Member Author

I need to test it out, I have no experience with that, but I have read some articles and when creating a new process, i think you can get the PID as result and it should be easy to kill/stop a process. And the processes can have a pipe to each other with multiprocessing.Queue().

https://www.cloudcity.io/blog/2019/02/27/things-i-wish-they-told-me-about-multiprocessing-in-python/

@kimchirichie
Copy link

interesting. i wonder if benefits of multiprocessing is significant enough. i think parallel processing is mostly useful for computational scripts. in our case, its I/O network.

@oliver-zehentleitner
Copy link
Member Author

if you do it not parallelo with multiprocessing than you are limited to one cpu core! even with 8 cores, your script can use only 12,5% of the cpu power of the system.

@kimchirichie
Copy link

youre 100% right. but does it need more than 1 core? its network I/O and most of the time, the process is not working. threading may be sufficient. (unless theres hundreds of connections)

@oliver-zehentleitner
Copy link
Member Author

Try this and its just throwing away the receives.... not saving it to database or something else...

I think async.io is very cpu intensive.

@oliver-zehentleitner
Copy link
Member Author

But you are right, its more a theoretical problem, 99,99% of all users will be not affected. Its very uncommon to stream everything, this does not make sense excepted for testing this lib :) - but if I find have the time, I think I will try it.

@kosmodromov
Copy link

Actually it makes sense because sometimes in periods of high volatility, network traffic increases significantly along with cpu that also needs for app logic. In case of streaming multiple timeframes on binance futures, for instance.

@Indiana3714
Copy link

Indiana3714 commented Oct 22, 2021

As far as I know, due to the GIL, there's actually only one thread actually running in Python even when you're multithreaded. (https://hackernoon.com/concurrent-programming-in-python-is-not-what-you-think-it-is-b6439c3f3e6a and https://tenthousandmeters.com/blog/python-behind-the-scenes-13-the-gil-and-its-effects-on-python-multithreading/) Unlike in other languages that have true multithreading unfortunately. Sadly right now its pretty hard to use the library to subscribe to orderbook updates every 500ms for example when "depth5" when you have like 30 subscriptions to symbols, the data that is received from the library lags by a minimum of 10 seconds or so (this figure goes up forever). Still thinking of how to handle it on my project, maybe I'll spawn one subprocess and one manager per channel and pool the messages using PyZMQ (since that uses Cython internally and should be fast and beats built in multiprocessing pipe or queue libraries)

@CharlyEmpereurmot
Copy link

CharlyEmpereurmot commented Feb 27, 2022

Hi!

Indeed when starting to stream whatever, it would be absolutely great to be able to choose between Threads or Processes.
In the meanwhile, for going around this and to use Processes, maybe something in this spirit would work ?

For some reasons right now stream_payload and stream_signal are always False. While the same code without using multiprocessing worked fine. Do you know what the origin of this problem is? Does the test code below makes sense to you, or is there a fundamental flaw I'm not seeing?

Is it even OK to try to pass the BinanceWebSocketApiManager objects as arguments like this?

from multiprocessing import Process, Manager
import unicorn_binance_websocket_api
from datetime import datetime
import pytz, time, json


def handle_stream(bwsam, data, streams_uuids):

    while True:

        if bwsam.is_manager_stopping():
            exit(0)
        stream_payload = bwsam.pop_stream_data_from_stream_buffer()
        stream_signal = bwsam.pop_stream_signal_from_stream_signal_buffer()

        # process signal from websocket
        if stream_signal is not False:
            pair, period = streams_uuids[stream_signal['stream_id']]

            if stream_signal['type'] == 'CONNECT':
                if not data[pair]['stream_connected']:
                    data[pair]['stream_connected'] = True
                    print(f"{pair.upper()}_{period} -- {datetime.now(tz=pytz.utc).timestamp()} -- WARNING: Stream connected")

            elif stream_signal['type'] == 'DISCONNECT':
                if data[pair]['stream_connected']:
                    data[pair]['stream_connected'] = False
                    print(f"{pair.upper()}_{period} -- {datetime.now(tz=pytz.utc).timestamp()} -- WARNING: Stream disconnected")

        # process payload from websocket
        if stream_payload is False:
            time.sleep(0.1)
        else:
            stream_payload = json.loads(stream_payload)
            if 'stream' not in stream_payload:
                continue
            pair, datatype = stream_payload['stream'].split('@')
            period = datatype.split('_')[-1]

            # stream data
            open_ts = stream_payload['data']['k']['t']
            close_price = stream_payload['data']['k']['c']

            print(f"{pair.upper()}_{period} -- {datetime.now(tz=pytz.utc).timestamp()} @ {open_ts} -- Price: {close_price}")


if __name__ == "__main__":

    mp_manager = Manager()
    data = mp_manager.dict()
    streams_uuids = mp_manager.dict()

    bwsam_1 = unicorn_binance_websocket_api.BinanceWebSocketApiManager(exchange="binance.com", enable_stream_signal_buffer=True)
    stream_uuid_1 = bwsam_1.create_stream(channels=['kline_1m'], markets=['btcusdt'])
    data['btcusdt'] = {
        'period': '1m',
        'stream_connected': False,
        'stream_uuid': stream_uuid_1
    }
    streams_uuids[stream_uuid_1] = 'btcusdt', '1m'

    bwsam_2 = unicorn_binance_websocket_api.BinanceWebSocketApiManager(exchange="binance.com", enable_stream_signal_buffer=True)
    stream_uuid_2 = bwsam_2.create_stream(channels=['kline_5m'], markets=['ethusdt'])
    data['ethusdt'] = {
        'period': '1m',
        'stream_connected': False,
        'stream_uuid': stream_uuid_2
    }
    streams_uuids[stream_uuid_2] = 'ethusdt', '1m'

    p1 = Process(target=handle_stream, args=(bwsam_1, data, streams_uuids))
    p2 = Process(target=handle_stream, args=(bwsam_2, data, streams_uuids))
    p1.start()
    p2.start()
    print('Processes start')

@oliver-zehentleitner
Copy link
Member Author

The problem with subproccesses (and in your script) is that no memory objects can be shared.

Example:
you start a script (Process 1) and in this process you initiate an object.
then you create a new process (Process 2) and pass the initiated object - this object will be copied and is not a reference. Process 1 and 2 use different objects!
I just wanted to prove this by a source and found this: https://docs.python.org/3/library/multiprocessing.shared_memory.html
Apparently this is new since Python 3.8: https://mingze-gao.com/posts/python-shared-memory-in-multiprocessing/

I'm in a bit of a hurry right now and I'm not quite sure if this is really a solution, but I think this is the direction it should go.

@oliver-zehentleitner
Copy link
Member Author

No longer fits directly into the concept - we work with AsyncIO and Kubernetes and no longer need sub-processes.

If someone really urgently needs it, please contact us: https://www.lucit.tech/get-support.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Todo
  
To do
Development

No branches or pull requests

5 participants