From fcdcc467308a5a965f9f9c8d232e7db42c3ffacd Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Mon, 20 Oct 2025 21:45:15 +0200 Subject: [PATCH] fix: only attempt to re-dial to peers seen less than a day ago Node operators are experiencing issues on Hetzner where nodes attempt to dial old nodes (last active 2 years ago). The likely cause is the IPFS reconnection job looping over these nodes non-stop without filtering. Added a filter that only returns nodes seen after a specific datetime and a config value to specify the accepted period. By default, the node will attempt to reconnect to other peers seen at most a day ago. --- src/aleph/config.py | 2 ++ src/aleph/db/accessors/peers.py | 22 +++++++++++++++++++++- src/aleph/jobs/reconnect_ipfs.py | 8 ++++++-- src/aleph/services/p2p/jobs.py | 9 ++++++--- tests/db/test_peers.py | 11 +++++++++++ 5 files changed, 46 insertions(+), 6 deletions(-) diff --git a/src/aleph/config.py b/src/aleph/config.py index 1c9a02472..0cac0159d 100644 --- a/src/aleph/config.py +++ b/src/aleph/config.py @@ -84,6 +84,8 @@ def get_defaults(): "mq_host": "rabbitmq", # Delay between connection attempts to other nodes on the network. "reconnect_delay": 60, + # Only peers seen more recently than this value will be dialed. Avoids dialing dead nodes. + "max_peer_age": 24 * 60 * 60, # P2P pubsub topic used for liveness checks. "alive_topic": "ALIVE", # Enabled P2P clients (HTTP and/or P2P). diff --git a/src/aleph/db/accessors/peers.py b/src/aleph/db/accessors/peers.py index 5d30533f9..f02b3f612 100644 --- a/src/aleph/db/accessors/peers.py +++ b/src/aleph/db/accessors/peers.py @@ -11,10 +11,30 @@ def get_all_addresses_by_peer_type( - session: DbSession, peer_type: PeerType + session: DbSession, peer_type: PeerType, last_seen: Optional[dt.datetime] = None, ) -> Sequence[str]: + """ + Fetches all peer addresses filtered by peer type and optionally by the last_seen + timestamp. This function retrieves addresses for peers of a specific type from + the database. If a `last_seen` timestamp is provided, only peers with a `last_seen` + timestamp greater than or equal to the provided value are considered. + + Arguments: + session (DbSession): Database session for querying data. + peer_type (PeerType): Type of peer to filter the addresses. + last_seen (Optional[datetime.datetime], optional): Timestamp to filter peers + last seen after or equal to this value. Defaults to None. + + Returns: + Sequence[str]: List of addresses corresponding to the filtered peer type and + optional last_seen timestamp. + """ + select_peers_stmt = select(PeerDb.address).where(PeerDb.peer_type == peer_type) + if last_seen is not None: + select_peers_stmt = select_peers_stmt.where(PeerDb.last_seen >= last_seen) + addresses = session.execute(select_peers_stmt) return addresses.scalars().all() diff --git a/src/aleph/jobs/reconnect_ipfs.py b/src/aleph/jobs/reconnect_ipfs.py index ae8cdb5ae..5c4a80d18 100644 --- a/src/aleph/jobs/reconnect_ipfs.py +++ b/src/aleph/jobs/reconnect_ipfs.py @@ -3,6 +3,7 @@ """ import asyncio +import datetime as dt import logging import aioipfs @@ -17,10 +18,12 @@ async def reconnect_ipfs_job( - config: Config, session_factory: DbSessionFactory, ipfs_service: IpfsService + config: Config, session_factory: DbSessionFactory, ipfs_service: IpfsService ): from aleph.services.utils import get_IP + max_peer_age = dt.timedelta(seconds=config.p2p.max_peer_age.value) + my_ip = await get_IP() await asyncio.sleep(2) while True: @@ -34,9 +37,10 @@ async def reconnect_ipfs_job( except aioipfs.APIError: LOGGER.warning("Can't reconnect to %s" % peer) + last_seen = dt.datetime.now(dt.timezone.utc) - max_peer_age with session_factory() as session: peers = get_all_addresses_by_peer_type( - session=session, peer_type=PeerType.IPFS + session=session, peer_type=PeerType.IPFS, last_seen=last_seen ) for peer in peers: diff --git a/src/aleph/services/p2p/jobs.py b/src/aleph/services/p2p/jobs.py index 2203345dd..68cb06349 100644 --- a/src/aleph/services/p2p/jobs.py +++ b/src/aleph/services/p2p/jobs.py @@ -1,4 +1,5 @@ import asyncio +import datetime as dt import logging from dataclasses import dataclass from typing import Optional @@ -9,10 +10,9 @@ from aleph.db.accessors.peers import get_all_addresses_by_peer_type from aleph.db.models import PeerType from aleph.types.db_session import DbSessionFactory - -from ..cache.node_cache import NodeCache from .http import api_get_request from .peers import connect_peer +from ..cache.node_cache import NodeCache @dataclass @@ -30,14 +30,17 @@ async def reconnect_p2p_job( ) -> None: await asyncio.sleep(2) + max_peer_age = dt.timedelta(seconds=config.p2p.max_peer_age.value) + while True: try: peers = set(config.p2p.peers.value) + last_seen = dt.datetime.now(dt.timezone.utc) - max_peer_age with session_factory() as session: peers |= set( get_all_addresses_by_peer_type( - session=session, peer_type=PeerType.P2P + session=session, peer_type=PeerType.P2P, last_seen=last_seen ) ) diff --git a/tests/db/test_peers.py b/tests/db/test_peers.py index 3826b3997..2d6b31f55 100644 --- a/tests/db/test_peers.py +++ b/tests/db/test_peers.py @@ -58,6 +58,17 @@ async def test_get_all_addresses_by_peer_type(session_factory: DbSessionFactory) assert p2p_entries == [p2p_entry.address] assert ipfs_entries == [ipfs_entry.address] + with session_factory() as session: + recent_p2p_entries = get_all_addresses_by_peer_type( + session=session, peer_type=PeerType.P2P, last_seen=last_seen + ) + old_p2p_entries = get_all_addresses_by_peer_type( + session=session, peer_type=PeerType.P2P, last_seen=last_seen + dt.timedelta(days=1) + ) + assert recent_p2p_entries == [p2p_entry.address] + assert old_p2p_entries == [] + + @pytest.mark.asyncio @pytest.mark.parametrize("peer_type", (PeerType.HTTP, PeerType.P2P, PeerType.IPFS))