Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/aleph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def get_defaults():
"mq_host": "rabbitmq",
# Delay between connection attempts to other nodes on the network.
"reconnect_delay": 60,
# Only peers seen more recently than this value will be dialed. Avoids dialing dead nodes.
"max_peer_age": 24 * 60 * 60,
# P2P pubsub topic used for liveness checks.
"alive_topic": "ALIVE",
# Enabled P2P clients (HTTP and/or P2P).
Expand Down
22 changes: 21 additions & 1 deletion src/aleph/db/accessors/peers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,30 @@


def get_all_addresses_by_peer_type(
session: DbSession, peer_type: PeerType
session: DbSession, peer_type: PeerType, last_seen: Optional[dt.datetime] = None,
) -> Sequence[str]:
"""
Fetches all peer addresses filtered by peer type and optionally by the last_seen
timestamp. This function retrieves addresses for peers of a specific type from
the database. If a `last_seen` timestamp is provided, only peers with a `last_seen`
timestamp greater than or equal to the provided value are considered.

Arguments:
session (DbSession): Database session for querying data.
peer_type (PeerType): Type of peer to filter the addresses.
last_seen (Optional[datetime.datetime], optional): Timestamp to filter peers
last seen after or equal to this value. Defaults to None.

Returns:
Sequence[str]: List of addresses corresponding to the filtered peer type and
optional last_seen timestamp.
"""

select_peers_stmt = select(PeerDb.address).where(PeerDb.peer_type == peer_type)

if last_seen is not None:
select_peers_stmt = select_peers_stmt.where(PeerDb.last_seen >= last_seen)

addresses = session.execute(select_peers_stmt)
return addresses.scalars().all()

Expand Down
8 changes: 6 additions & 2 deletions src/aleph/jobs/reconnect_ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import asyncio
import datetime as dt
import logging

import aioipfs
Expand All @@ -17,10 +18,12 @@


async def reconnect_ipfs_job(
config: Config, session_factory: DbSessionFactory, ipfs_service: IpfsService
config: Config, session_factory: DbSessionFactory, ipfs_service: IpfsService
):
from aleph.services.utils import get_IP

max_peer_age = dt.timedelta(seconds=config.p2p.max_peer_age.value)

my_ip = await get_IP()
await asyncio.sleep(2)
while True:
Expand All @@ -34,9 +37,10 @@ async def reconnect_ipfs_job(
except aioipfs.APIError:
LOGGER.warning("Can't reconnect to %s" % peer)

last_seen = dt.datetime.now(dt.timezone.utc) - max_peer_age
with session_factory() as session:
peers = get_all_addresses_by_peer_type(
session=session, peer_type=PeerType.IPFS
session=session, peer_type=PeerType.IPFS, last_seen=last_seen
)

for peer in peers:
Expand Down
9 changes: 6 additions & 3 deletions src/aleph/services/p2p/jobs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import datetime as dt
import logging
from dataclasses import dataclass
from typing import Optional
Expand All @@ -9,10 +10,9 @@
from aleph.db.accessors.peers import get_all_addresses_by_peer_type
from aleph.db.models import PeerType
from aleph.types.db_session import DbSessionFactory

from ..cache.node_cache import NodeCache
from .http import api_get_request
from .peers import connect_peer
from ..cache.node_cache import NodeCache


@dataclass
Expand All @@ -30,14 +30,17 @@ async def reconnect_p2p_job(
) -> None:
await asyncio.sleep(2)

max_peer_age = dt.timedelta(seconds=config.p2p.max_peer_age.value)

while True:
try:
peers = set(config.p2p.peers.value)

last_seen = dt.datetime.now(dt.timezone.utc) - max_peer_age
with session_factory() as session:
peers |= set(
get_all_addresses_by_peer_type(
session=session, peer_type=PeerType.P2P
session=session, peer_type=PeerType.P2P, last_seen=last_seen
)
)

Expand Down
11 changes: 11 additions & 0 deletions tests/db/test_peers.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ async def test_get_all_addresses_by_peer_type(session_factory: DbSessionFactory)
assert p2p_entries == [p2p_entry.address]
assert ipfs_entries == [ipfs_entry.address]

with session_factory() as session:
recent_p2p_entries = get_all_addresses_by_peer_type(
session=session, peer_type=PeerType.P2P, last_seen=last_seen
)
old_p2p_entries = get_all_addresses_by_peer_type(
session=session, peer_type=PeerType.P2P, last_seen=last_seen + dt.timedelta(days=1)
)
assert recent_p2p_entries == [p2p_entry.address]
assert old_p2p_entries == []



@pytest.mark.asyncio
@pytest.mark.parametrize("peer_type", (PeerType.HTTP, PeerType.P2P, PeerType.IPFS))
Expand Down
Loading