Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loop.run_forever terminated by exception #8183

Closed
1 task done
xldistance opened this issue Feb 24, 2024 · 4 comments
Closed
1 task done

loop.run_forever terminated by exception #8183

xldistance opened this issue Feb 24, 2024 · 4 comments
Labels

Comments

@xldistance
Copy link

xldistance commented Feb 24, 2024

Describe the bug

I can run aiohtto fine on top of ubuntu18.04 and python3.7, updating to Ubuntu22.04 and python3.11,python3.12 running aiohttp gives me an error!Here is the code I ran

import gzip
import json
import sys
import traceback
from asyncio import (
    AbstractEventLoop,
    TimeoutError,
    get_running_loop,
    new_event_loop,
    run_coroutine_threadsafe,
    set_event_loop,
    sleep,
)
from datetime import datetime
from threading import Thread
from types import coroutine
from typing import Dict, List, Union

from aiohttp import ClientSession, ClientTimeout, ClientWebSocketResponse, TCPConnector
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.utility import save_connection_status

class WebsocketClient:
    """
    * 针对各类Websocket API的异步客户端
    * 重载unpack_data方法来实现数据解包逻辑
    * 重载on_connected方法来实现连接成功回调处理
    * 重载on_disconnected方法来实现连接断开回调处理
    * 重载on_packet方法来实现数据推送回调处理
    * 重载on_error方法来实现异常捕捉回调处理
    """
    # ----------------------------------------------------------------------------------------------------
    def __init__(self):
        self._active = False
        self._host = ""
        self._session = None
        self._ws = None
        self._loop = None
        self._thread = None  # 新增线程属性
        self._proxy = ""
        self._header = {}
        self._last_sent_text = ""
        self._last_received_text = ""
        self.event_engine = EventEngine()
        self.main_engine = MainEngine(self.event_engine)
        self.log_info = self.main_engine.info
        self.log_exception = self.main_engine.log_exception
        self.ping_interval = 20  # 心跳间隔
        self.receive_timeout = 60  # 接受数据包超时
    # ----------------------------------------------------------------------------------------------------
    def init(
        self, host: str, proxy_host: str = "", proxy_port: int = 0, ping_interval: int = 20, header: dict = None, gateway_name: str = ""
    ):
        """
        初始化客户端
        """
        self._host = host
        self.ping_interval = ping_interval
        self.gateway_name = gateway_name
        if header:
            self._header = header
        if proxy_host and proxy_port:
            self._proxy = f"http://{proxy_host}:{proxy_port}"
        assert self.gateway_name, "请到交易接口WEBSOCKET API connect函数里面的self.init函数中添加gateway_name参数"
    # ----------------------------------------------------------------------------------------------------
    def start(self):
        """
        启动客户端
        连接成功后会自动调用on_connected回调函数,请等待on_connected被调用后,再发送数据包。
        """
        self._active = True
        try:
            self._loop = get_running_loop()
        except RuntimeError:
            self._loop = new_event_loop()
        self._thread = Thread(target=self._run_event_loop, args=(self._loop,))
        self._thread.start()
        run_coroutine_threadsafe(self._run(), self._loop)
    # ----------------------------------------------------------------------------------------------------
    def stop(self):
        """
        停止客户端
        """
        self._active = False
        if self._ws:
            run_coroutine_threadsafe(self._ws.close(), self._loop)
        if self._loop and self._loop.is_running():
            self._loop.call_soon_threadsafe(self._loop.stop)
        if self._thread:
            self._thread.join()
    # ----------------------------------------------------------------------------------------------------
    def send_packet(self, packet: Union[dict, str]):
        """
        发送数据包字典到服务器。
        如果需要发送非json数据,请重载实现本函数。
        """
        if self._ws:
            if packet in ["Pong", "ping"]:
                text = packet
            else:
                text: str = json.dumps(packet)
            self._record_last_sent_text(text)
            coro: coroutine = self._ws.send_str(text)
            run_coroutine_threadsafe(coro, self._loop)
    # ----------------------------------------------------------------------------------------------------
    def unpack_data(self, data: str) -> Dict:
        """
        对字符串数据进行json格式解包
        如果需要使用json以外的解包格式,请重载实现本函数。
        """
        data = json.loads(data)
        return data
    # ----------------------------------------------------------------------------------------------------
    def on_connected(self):
        """
        连接成功回调
        """
        pass
    # ----------------------------------------------------------------------------------------------------
    def on_disconnected(self):
        """
        连接断开回调
        """
        pass
    # ----------------------------------------------------------------------------------------------------
    def on_packet(self, packet: Union[dict, str]):
        """
        收到数据回调
        """
        pass
    # ----------------------------------------------------------------------------------------------------
    def on_error(self, exception_type: type, exception_value: Exception, tracebacks):
        """
        触发异常回调
        """
        self.log_exception(self.exception_detail(exception_type, exception_value, tracebacks))
        save_connection_status(self.gateway_name, False)
    # ----------------------------------------------------------------------------------------------------
    def exception_detail(self, exception_type: type, exception_value: Exception, tracebacks):
        """
        异常信息格式化
        """
        text = "[{}]: Unhandled WebSocket Error:{}\n".format(datetime.now().isoformat(), exception_type)
        text += "LastSentText:\n{}\n".format(self._last_sent_text)
        text += "LastReceivedText:\n{}\n".format(self._last_received_text)
        text += "Exception trace: \n"
        text += "".join(traceback.format_exception(exception_type, exception_value, tracebacks))
        return text
    # ----------------------------------------------------------------------------------------------------
    async def _run(self):
        """
        在事件循环中运行的主协程
        """
        self._session = ClientSession(trust_env=True)
        while self._active:
            try:
                self._ws = await self._session.ws_connect(
                    self._host,
                    proxy=self._proxy,
                    heartbeat=self.ping_interval,
                    receive_timeout=self.receive_timeout,
                    ssl=False,
                )
                self.on_connected()

                async for msg in self._ws:
                    text: Union[str, bytes] = msg.data
                    # 解压gzip数据
                    if isinstance(text, bytes):
                        text = gzip.decompress(text).decode("UTF-8")
                    self._record_last_received_text(text)
                    # ping,pong消息不需要解包
                    if text in ["Ping", "pong"]:
                        data = text
                    else:
                        data = self.unpack_data(text)
                    self.on_packet(data)
            except (TimeoutError,ConnectionResetError):
                # 接收数据超时重连
                # 关闭当前websocket连接
                coro = self._ws.close()
                run_coroutine_threadsafe(coro, self._loop)
            except Exception:
                self.on_error(*sys.exc_info())
            finally:
                self._ws = None
                self.on_disconnected()
    # ----------------------------------------------------------------------------------------------------
    def _record_last_sent_text(self, text: str):
        """
        记录最近发出的数据字符串
        """
        self._last_sent_text = text[:1000]
    # ----------------------------------------------------------------------------------------------------
    def _record_last_received_text(self, text: str):
        """
        记录最近收到的数据字符串
        """
        self._last_received_text = text[:1000]
    # ----------------------------------------------------------------------------------------------------
    def _run_event_loop(self, loop: AbstractEventLoop):
        if not loop.is_running():
            set_event_loop(loop)
            loop.run_forever()

To Reproduce

A rest and ws connection written with aiohttp will have loop.run_forever run broken after a few hours of operation

Expected behavior

loop.run_forever can be run all the time

Logs/tracebacks

Traceback (most recent call last):

  File "/usr/local/lib/python3.12/threading.py", line 1030, in _bootstrap
    self._bootstrap_inner()
    │    └ <function Thread._bootstrap_inner at 0x7f5264158ae0><Thread(Thread-16 (_run_event_loop), started 139991738984000)>
  File "/usr/local/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
    │    └ <function Thread.run at 0x7f52641587c0><Thread(Thread-16 (_run_event_loop), started 139991738984000)>
  File "/usr/local/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
    │    │        │    │        │    └ {}
    │    │        │    │        └ <Thread(Thread-16 (_run_event_loop), started 139991738984000)>
    │    │        │    └ (<_UnixSelectorEventLoop running=True closed=False debug=False>,)
    │    │        └ <Thread(Thread-16 (_run_event_loop), started 139991738984000)>
    │    └ <bound method WebsocketClient._run_event_loop of <vnpy.gateway.huobisusdt.huobisusdt_gateway.HuobisTradeWebsocketApi object a...<Thread(Thread-16 (_run_event_loop), started 139991738984000)>
  File "/usr/local/lib/python3.12/site-packages/vnpy/api/websocket/websocket_client.py", line 209, in _run_event_loop
    loop.run_forever()
    │    └ <function BaseEventLoop.run_forever at 0x7f52292edda0><_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 639, in run_forever
    self._run_once()
    │    └ <function BaseEventLoop._run_once at 0x7f52292efba0><_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1985, in _run_once
    handle._run()
    │      └ <function Handle._run at 0x7f5229447e20><Handle Task.task_wakeup(<Future finis...er shutdown')>)>
  File "/usr/local/lib/python3.12/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
    │    │            │    │           │    └ <member '_args' of 'Handle' objects>
    │    │            │    │           └ <Handle Task.task_wakeup(<Future finis...er shutdown')>)>
    │    │            │    └ <member '_callback' of 'Handle' objects>
    │    │            └ <Handle Task.task_wakeup(<Future finis...er shutdown')>)>
    │    └ <member '_context' of 'Handle' objects><Handle Task.task_wakeup(<Future finis...er shutdown')>)>
> File "/usr/local/lib/python3.12/site-packages/vnpy/api/websocket/websocket_client.py", line 158, in _run
    self._ws = await self._session.ws_connect(
    │    │           │    │        └ <function ClientSession.ws_connect at 0x7f52052a7100>
    │    │           │    └ <aiohttp.client.ClientSession object at 0x7f521fe348c0>
    │    │           └ <vnpy.gateway.huobisusdt.huobisusdt_gateway.HuobisTradeWebsocketApi object at 0x7f52055d5d90>
    │    └ None<vnpy.gateway.huobisusdt.huobisusdt_gateway.HuobisTradeWebsocketApi object at 0x7f52055d5d90>
  File "/usr/local/lib/python3.12/site-packages/aiohttp/client.py", line 832, in _ws_connect
    resp = await self.request(
                 │    └ <function ClientSession.request at 0x7f52052a6e80><aiohttp.client.ClientSession object at 0x7f521fe348c0>
  File "/usr/local/lib/python3.12/site-packages/aiohttp/client.py", line 578, in _request
    conn = await self._connector.connect(
                 │    │          └ <function BaseConnector.connect at 0x7f5205555f80>
                 │    └ <aiohttp.connector.TCPConnector object at 0x7f51fe9683e0><aiohttp.client.ClientSession object at 0x7f521fe348c0>
  File "/usr/local/lib/python3.12/site-packages/aiohttp/connector.py", line 544, in connect
    proto = await self._create_connection(req, traces, timeout)
                  │    │                  │    │       └ ClientTimeout(total=300, connect=None, sock_read=None, sock_connect=None, ceil_threshold=5)
                  │    │                  │    └ []
                  │    │                  └ <aiohttp.client_reqrep.ClientRequest object at 0x7f51fd05d940>
                  │    └ <function TCPConnector._create_connection at 0x7f5205556b60><aiohttp.connector.TCPConnector object at 0x7f51fe9683e0>
  File "/usr/local/lib/python3.12/site-packages/aiohttp/connector.py", line 911, in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
                     │    │                         │    │       └ ClientTimeout(total=300, connect=None, sock_read=None, sock_connect=None, ceil_threshold=5)
                     │    │                         │    └ []
                     │    │                         └ <aiohttp.client_reqrep.ClientRequest object at 0x7f51fd05d940>
                     │    └ <function TCPConnector._create_direct_connection at 0x7f5205557240><aiohttp.connector.TCPConnector object at 0x7f51fe9683e0>
  File "/usr/local/lib/python3.12/site-packages/aiohttp/connector.py", line 1173, in _create_direct_connection
    hosts = await asyncio.shield(host_resolved)
                  │       │      └ <Task finished name='Task-37367' coro=<TCPConnector._resolve_host() done, defined at /usr/local/lib/python3.12/site-packages/...
                  │       └ <function shield at 0x7f52292d2ac0><module 'asyncio' from '/usr/local/lib/python3.12/asyncio/__init__.py'>
  File "/usr/local/lib/python3.12/site-packages/aiohttp/connector.py", line 884, in _resolve_host
    addrs = await self._resolver.resolve(host, port, family=self._family)
                  │    │         │       │     │            │    └ 0
                  │    │         │       │     │            └ <aiohttp.connector.TCPConnector object at 0x7f51fe9683e0>
                  │    │         │       │     └ 443
                  │    │         │       └ 'api.hbdm.com'
                  │    │         └ <function ThreadedResolver.resolve at 0x7f52055547c0>
                  │    └ <aiohttp.resolver.ThreadedResolver object at 0x7f51fe96a360><aiohttp.connector.TCPConnector object at 0x7f51fe9683e0>
  File "/usr/local/lib/python3.12/site-packages/aiohttp/resolver.py", line 33, in resolve
    infos = await self._loop.getaddrinfo(
                  │    │     └ <function BaseEventLoop.getaddrinfo at 0x7f52292ee8e0>
                  │    └ <_UnixSelectorEventLoop running=True closed=False debug=False><aiohttp.resolver.ThreadedResolver object at 0x7f51fe96a360>
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 899, in getaddrinfo
    return await self.run_in_executor(
                 │    └ <function BaseEventLoop.run_in_executor at 0x7f52292ee700><_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 861, in run_in_executor
    executor.submit(func, *args), loop=self)
    │        │      │      │           └ <_UnixSelectorEventLoop running=True closed=False debug=False>
    │        │      │      └ ('api.hbdm.com', 443, 0, <SocketKind.SOCK_STREAM: 1>, 0, <AddressInfo.AI_ADDRCONFIG: 32>)
    │        │      └ <function getaddrinfo at 0x7f5264028860>
    │        └ <function ThreadPoolExecutor.submit at 0x7f521ee04540><concurrent.futures.thread.ThreadPoolExecutor object at 0x7f51fe8e46e0>
  File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 170, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')

RuntimeError: cannot schedule new futures after shutdown

Python Version

python 3.12.2

aiohttp Version

3.9.3

multidict Version

6.0.5

yarl Version

1.9.4

OS

Ubuntu 22.04

Related component

Server, Client

Additional context

No response

Code of Conduct

  • I agree to follow the aio-libs Code of Conduct
@xldistance xldistance added the bug label Feb 24, 2024
@Dreamsorcerer
Copy link
Member

Dreamsorcerer commented Feb 24, 2024

loop.run_forever can be run all the time

This seems to me like you're messing with low-level APIs from asyncio without understanding them. You're also running them in a thread, which seems like an bad idea. I'd suggest rewriting your application to work with the high-level API, so the entry point of your program should be asyncio.run().

@Dreamsorcerer
Copy link
Member

Seems like it's probably not an issue with aiohttp, so without further info I'll close this.

@Dreamsorcerer Dreamsorcerer closed this as not planned Won't fix, can't repro, duplicate, stale Mar 1, 2024
@xldistance
Copy link
Author

xldistance commented Mar 12, 2024

Seems like it's probably not an issue with aiohttp, so without further info I'll close this.

You can be sure it's an aiohttp problem.

@Dreamsorcerer
Copy link
Member

Dreamsorcerer commented Mar 12, 2024

Then provide a simple reproducer that makes it clear why this is a problem with aiohttp. The code you have provided is too complex and messes with low-level asyncio APIs while mixing it threading. There are far too many (odd, bad practice etc.) things going on in that code to isolate any issue within aiohttp.

i.e. Reproduce this issue without using the threading module, with the entrypoint of your application using asyncio.run() and not using any of the low-level asyncio APIs. If you can only reproduce it without meeting these conditions, then you should know enough about low-level asyncio to provide a clear description of exactly what the problem is. If you don't know enough to be able to do that, then it's very likely a problem with your code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants