Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets and multiprocessing #752

Closed
slavestys opened this issue Apr 27, 2020 · 7 comments
Closed

Websockets and multiprocessing #752

slavestys opened this issue Apr 27, 2020 · 7 comments
Labels

Comments

@slavestys
Copy link

Can i use websockets with multiprocessing?
For example, the main process accepts new connections and passes them to other processes. Further work takes place in other processes.

@slavestys slavestys changed the title Websockets and multiprocessing [proposed question] Websockets and multiprocessing Apr 27, 2020
@slavestys slavestys changed the title [proposed question] Websockets and multiprocessing Websockets and multiprocessing Apr 27, 2020
@aaugustin
Copy link
Member

websockets is based on the asyncio module from the standard library. If it's doable with asyncio, then it's probably doable with websockets.

Can you show how you would do this with a basic TCP server (e.g. an echo server) managed by asyncio? Then we can figure out how to adapt for websockets.

@aaraney
Copy link

aaraney commented Jun 27, 2020

@slavestys I had the same question and the answer is yes. Below is a server client example I used to prove it to myself.

@aaugustin I don't know if this is an overly niche example for usage documentation, but personally I would have found it really helpful. Would you consider adding an even simpler example of using concurrent.futures with websockets?

Concurrent futures ProcessPoolExecutor example:

server.py

#!/usr/bin/env python
import asyncio
import json
import logging
import time
import os
import websockets
from concurrent.futures import ProcessPoolExecutor
from functools import reduce

logging.basicConfig(level=logging.INFO,
                    format='PID %(process)-10d %(asctime)s %(message)s'
                    )

task_executer = ProcessPoolExecutor(max_workers=3)

def data_processor(raw_message):
    pid = os.getpid()

    message = json.loads(raw_message)
    print(f'PID {pid:<10} ID {message["id"]} message deserialized')

    message_data = message['data']
    result = reduce(lambda a,b: int(a)+int(b), message_data)

    # Sleeping just for visualizing things in the logs sake
    time.sleep(1.0)

    print(f'PID {pid:<10} Result {result}')

    result_message = {
        'id': message['id'],
        'result': result
    }

    return result_message


async def producer(websocket, path, message):
    log = logging.getLogger('producer')
    log.info('Received processed message')
    log.info(f'Sending {message["id"]}')

    serialized_message = json.dumps(message)

    await websocket.send(serialized_message)


async def listener(websocket, path):
    tasks =  []

    log = logging.getLogger('listener')
    loop = asyncio.get_running_loop()
    async for json_message in websocket:

        tasks.append(
            loop.run_in_executor(task_executer, data_processor, json_message)
        )
        for task in asyncio.as_completed(tasks):
            _message = await task
            loop.create_task(producer(websocket, path, _message))

try:
    start_server = websockets.serve(listener, "localhost", 8765)

    loop = asyncio.get_event_loop()

    loop.run_until_complete(start_server)
    loop.run_forever()
except Exception as e:
    print(f'Caught exception {e}')
    pass
finally:
    loop.close()

client.py

#!/usr/bin/env python

import asyncio
import websockets
import json
from random import randint


async def send_json(json_message: dict) -> None:
    uri = "ws://localhost:8765"

    # Make string representation
    json_message['data'] = [randint(0, 50) for _ in range(randint(25,100))]
    json_message = json.dumps(json_message)

    async with websockets.connect(uri) as websocket:

        await websocket.send(json_message)
        print(f"> message sent")

        response = await websocket.recv()
        print(f"< {response}")

if __name__ == "__main__":
    import sys

    _id = sys.argv[1]
    
    json_data = {
        'id': _id,
        'data': None
    }
    asyncio.get_event_loop().run_until_complete(send_json(json_data))

Open a shell session and start server.py, then in another shell run something similar to for i in $(seq 1 10); do python client.py $i&; done. Basically run a client instance 10 times in the background, setting the id of the sent data to the iterative term of the bash for loop.

@aaugustin
Copy link
Member

Thanks for the suggestion, but this is longer than the kind of examples that I'm willing to maintain in the documentation. Hopefully Google will take other users encountering the same issue to this page!

@aaraney
Copy link

aaraney commented Jul 27, 2020

Thanks for the response @aaugustin, that makes complete sense. Having read my suggestion again, I am now more in line with your opinion. I hope Google does the trick myself as well!

@ngoel17
Copy link

ngoel17 commented Aug 2, 2022

Thanks for the example @aaraney. I converted your example into an echo bot, by commenting out the JSON message parsing and the corresponding operations and returning the incoming message itself. I removed server-side logging as I was testing under heavy concurrency.
Regardless of the concurrency level I use, the server prints messages like this one. The messages print much after the whole echo bot testing is complete and latency (generally decent) has been reported. But they consume screen space and look scary. Is there a resolution? I am using bench.py from here as a client.

PID 275455 2022-08-02 17:56:41,241 Task exception was never retrieved
future: <Task finished name='Task-506815' coro=<producer() done, defined at server.py:38> exception=ConnectionClosed('WebSocket connection is closed: code = 1000 (OK), no reason')>
Traceback (most recent call last):
File "server.py", line 45, in producer
await websocket.send(message)
File "/home/ngoel/anaconda3/lib/python3.8/site-packages/websockets/protocol.py", line 462, in send
yield from self.ensure_open()
File "/home/ngoel/anaconda3/lib/python3.8/site-packages/websockets/protocol.py", line 644, in ensure_open
raise ConnectionClosed(
websockets.exceptions.ConnectionClosed: WebSocket connection is closed: code = 1000 (OK), no reason

@ngoel17
Copy link

ngoel17 commented Aug 3, 2022

@aaugustin would you have any suggestions? The client is already given above and here is the modified server:

import asyncio
import time
import os
import websockets
from concurrent.futures import ProcessPoolExecutor
from functools import reduce

task_executer = ProcessPoolExecutor(max_workers=3)

def data_processor(raw_message):
    return raw_message


async def producer(websocket, path, message):
    await websocket.send(message)


async def listener(websocket, path):
    tasks =  []
    loop = asyncio.get_running_loop()
    async for message in websocket:
        tasks.append(
            loop.run_in_executor(task_executer, data_processor, message)
        )
        for task in asyncio.as_completed(tasks):
            _message = await task
            loop.create_task(producer(websocket, path, _message))
try:
    start_server = websockets.serve(listener, "localhost", 3000)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(start_server)
    loop.run_forever()
except Exception as e:
    print(f'Caught exception {e}')
    pass
finally:
    loop.close()

@ngoel17
Copy link

ngoel17 commented Aug 4, 2022

It turns out there is one more issue with the code. The code will work fine as long as there is only one message on a particular connection. if a connection sends a second message on the same connection, the for loop "for task in asyncio.as_completed(tasks):" will run again and that will cause 3 responses to 2 incoming messages. !n responsed to n messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants