diff --git a/src/tribler/core/components/libtorrent/download_manager/dht_health_manager.py b/src/tribler/core/components/libtorrent/download_manager/dht_health_manager.py index 0e9b31a1ca2..673b6f0df54 100644 --- a/src/tribler/core/components/libtorrent/download_manager/dht_health_manager.py +++ b/src/tribler/core/components/libtorrent/download_manager/dht_health_manager.py @@ -65,7 +65,7 @@ def finalize_lookup(self, infohash): seeders = DHTHealthManager.get_size_from_bloomfilter(bf_seeders) peers = DHTHealthManager.get_size_from_bloomfilter(bf_peers) if not self.lookup_futures[infohash].done(): - health = HealthInfo(infohash, last_check=int(time.time()), seeders=seeders, leechers=peers) + health = HealthInfo(infohash, seeders=seeders, leechers=peers) self.lookup_futures[infohash].set_result(health) self.lookup_futures.pop(infohash, None) diff --git a/src/tribler/core/components/metadata_store/db/orm_bindings/torrent_state.py b/src/tribler/core/components/metadata_store/db/orm_bindings/torrent_state.py index 0d8d15aee93..38e7b3e9bf6 100644 --- a/src/tribler/core/components/metadata_store/db/orm_bindings/torrent_state.py +++ b/src/tribler/core/components/metadata_store/db/orm_bindings/torrent_state.py @@ -23,10 +23,9 @@ class TorrentState(db.Entity): @classmethod def from_health(cls, health: HealthInfo): return cls(infohash=health.infohash, seeders=health.seeders, leechers=health.leechers, - last_check=health.last_check) + last_check=health.last_check, self_checked=health.self_checked) def to_health(self) -> HealthInfo: - return HealthInfo(infohash=self.infohash, last_check=self.last_check, - seeders=self.seeders, leechers=self.leechers) + return HealthInfo(self.infohash, self.seeders, self.leechers, self.last_check, self.self_checked) return TorrentState diff --git a/src/tribler/core/components/metadata_store/db/store.py b/src/tribler/core/components/metadata_store/db/store.py index 0f4f75c78ba..78316ce0b4c 100644 --- a/src/tribler/core/components/metadata_store/db/store.py +++ b/src/tribler/core/components/metadata_store/db/store.py @@ -484,7 +484,7 @@ def process_torrent_health(self, health: HealthInfo) -> bool: torrent_state = self.TorrentState.get_for_update(infohash=health.infohash) - if torrent_state and health.should_update(torrent_state): + if torrent_state and health.should_replace(torrent_state.to_health()): self._logger.debug(f"Update health info {health}") torrent_state.set(seeders=health.seeders, leechers=health.leechers, last_check=health.last_check, self_checked=False) diff --git a/src/tribler/core/components/metadata_store/remote_query_community/remote_query_community.py b/src/tribler/core/components/metadata_store/remote_query_community/remote_query_community.py index bb7a33a9ad4..b0179e00a7b 100644 --- a/src/tribler/core/components/metadata_store/remote_query_community/remote_query_community.py +++ b/src/tribler/core/components/metadata_store/remote_query_community/remote_query_community.py @@ -23,8 +23,6 @@ from tribler.core.components.metadata_store.remote_query_community.payload_checker import ObjState from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings from tribler.core.components.metadata_store.utils import RequestTimeoutException -from tribler.core.components.knowledge.community.knowledge_validator import is_valid_resource -from tribler.core.components.knowledge.db.knowledge_db import ResourceType from tribler.core.utilities.pony_utils import run_threaded from tribler.core.utilities.unicode import hexlify diff --git a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py index 3f904b2400f..650de81c1af 100644 --- a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py +++ b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py @@ -32,8 +32,7 @@ def get_peers_for(health_status): return randint(101, 1000) return randint(1, 100) - return HealthInfo(random_infohash(), last_check=int(time.time()), - seeders=get_peers_for(status), leechers=get_peers_for(status)) + return HealthInfo(random_infohash(), seeders=get_peers_for(status), leechers=get_peers_for(status)) def _generate_checked_torrents(count: int, status: str = None) -> List[HealthInfo]: @@ -89,7 +88,7 @@ async def test_torrents_health_gossip(self): """ Test whether torrent health information is correctly gossiped around """ - checked_torrent_info = HealthInfo(b'a' * 20, seeders=200, leechers=0, last_check=int(time.time())) + checked_torrent_info = HealthInfo(b'a' * 20, seeders=200, leechers=0) node0_db = self.nodes[0].overlay.mds.TorrentState node1_db2 = self.nodes[1].overlay.mds.TorrentState @@ -180,7 +179,7 @@ async def test_torrents_health_update(self): """ self.fill_database(self.nodes[1].overlay.mds) - checked_torrent_info = HealthInfo(b'0' * 20, seeders=200, leechers=0, last_check=int(time.time())) + checked_torrent_info = HealthInfo(b'0' * 20, seeders=200, leechers=0) await self.init_first_node_and_gossip(checked_torrent_info, deliver_timeout=0.5) # Check whether node 1 has new torrent health information @@ -197,7 +196,7 @@ async def test_unknown_torrent_query_back(self): with db_session: self.nodes[0].overlay.mds.TorrentMetadata(infohash=infohash) await self.init_first_node_and_gossip( - HealthInfo(infohash, seeders=200, leechers=0, last_check=int(time.time()))) + HealthInfo(infohash, seeders=200, leechers=0)) with db_session: assert self.nodes[1].overlay.mds.TorrentMetadata.get() @@ -209,5 +208,5 @@ async def test_skip_torrent_query_back_for_known_torrent(self): self.nodes[1].overlay.mds.TorrentMetadata(infohash=infohash) self.nodes[1].overlay.send_remote_select = Mock() await self.init_first_node_and_gossip( - HealthInfo(infohash, seeders=200, leechers=0, last_check=int(time.time()))) + HealthInfo(infohash, seeders=200, leechers=0)) self.nodes[1].overlay.send_remote_select.assert_not_called() diff --git a/src/tribler/core/components/socks_servers/socks5/connection.py b/src/tribler/core/components/socks_servers/socks5/connection.py index 6eb34282db3..060fe4de425 100644 --- a/src/tribler/core/components/socks_servers/socks5/connection.py +++ b/src/tribler/core/components/socks_servers/socks5/connection.py @@ -41,6 +41,7 @@ class Socks5Connection(Protocol): def __init__(self, socksserver): super().__init__() self._logger = logging.getLogger(self.__class__.__name__) + self._logger.setLevel(logging.WARNING) self.socksserver = socksserver self.transport = None self.connect_to = None diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/dataclasses.py b/src/tribler/core/components/torrent_checker/torrent_checker/dataclasses.py index a25a909d34a..18dbab65ce0 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/dataclasses.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/dataclasses.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import time from dataclasses import dataclass, field from typing import List @@ -7,21 +9,29 @@ from tribler.core.utilities.unicode import hexlify -TOLERABLE_TIME_DRIFT = 60 # one minute -HOUR = 60 * 60 +MINUTE = 60 +HOUR = MINUTE * 60 +TOLERABLE_TIME_DRIFT = MINUTE # When receiving health from another peer, how far the timestamp can be in the future? +TORRENT_CHECK_WINDOW = MINUTE # When asking multiple trackers in parallel, we ignore this time difference in responses +HEALTH_FRESHNESS_SECONDS = 4 * HOUR # Number of seconds before a torrent health is considered stale. Default: 4 hours -@dataclass +@dataclass(order=True) class HealthInfo: infohash: bytes = field(repr=False) - last_check: int seeders: int = 0 leechers: int = 0 + last_check: int = field(default_factory=lambda: int(time.time())) + self_checked: bool = False def __repr__(self): infohash_repr = hexlify(self.infohash[:4]) - age = self._last_check_repr(self.last_check) - return f"{self.__class__.__name__}('{infohash_repr}', {self.seeders}/{self.leechers}, {age})" + status = self._last_check_repr(self.last_check) + if status == 'just checked' and self.self_checked: + status = 'just self-checked' + elif self.self_checked: + status += ', self-checked' + return f"{self.__class__.__name__}('{infohash_repr}', {self.seeders}/{self.leechers}, {status})" @staticmethod def _last_check_repr(last_check: int) -> str: @@ -46,19 +56,44 @@ def infohash_hex(self): def is_valid(self) -> bool: return self.last_check < int(time.time()) + TOLERABLE_TIME_DRIFT - def should_update(self, torrent_state, self_checked=False): - if self.last_check <= torrent_state.last_check: - # The torrent state in the DB is already fresher than this health + def old(self) -> bool: + now = int(time.time()) + return self.last_check < now - HEALTH_FRESHNESS_SECONDS + + def older_than(self, other: HealthInfo) -> bool: + return self.last_check < other.last_check - TORRENT_CHECK_WINDOW + + def much_older_than(self, other: HealthInfo) -> bool: + return self.last_check + HEALTH_FRESHNESS_SECONDS < other.last_check + + def should_replace(self, prev: HealthInfo) -> bool: + if self.infohash != prev.infohash: + raise ValueError('An attempt to compare health for different infohashes') + + if not self.is_valid(): + return False # Health info with future last_check time is ignored + + if self.self_checked: + return not prev.self_checked \ + or prev.older_than(self) \ + or (self.seeders, self.leechers) > (prev.seeders, prev.leechers) + + if self.older_than(prev): + # Always ignore a new health info if it is older than the previous health info return False - now = int(time.time()) - hour_ago = now - HOUR - if not self_checked and torrent_state.self_checked and hour_ago <= torrent_state.last_check <= now: - # The torrent state in the DB was locally checked just recently, - # and we trust this recent local check more than the new health info received remotely + if prev.self_checked and not prev.old(): + # The previous self-checked health info is fresh enough, do not replace it with a remote health info return False - return True + if prev.much_older_than(self): + # The previous health info (that can be self-checked ot not) is very old, + # let's replace it with a more recent remote health info + return True + + # self is a remote health info that isn't older than previous health info, but isn't much fresher as well + return (self.seeders, self.leechers) > (prev.seeders, prev.leechers) + @dataclass class TrackerResponse: diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_health_info_should_update.py b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_health_info_should_update.py new file mode 100644 index 00000000000..8e5f169e771 --- /dev/null +++ b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_health_info_should_update.py @@ -0,0 +1,104 @@ +import time + +import pytest + +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HEALTH_FRESHNESS_SECONDS, HealthInfo, \ + TOLERABLE_TIME_DRIFT, \ + TORRENT_CHECK_WINDOW + +INFOHASH = b'infohash_1' + + +def now() -> int: + return int(time.time()) + + +def test_different_infohashes(): + prev_health = HealthInfo(INFOHASH) + health = HealthInfo(infohash=b'infohash_2') + with pytest.raises(ValueError, match='^An attempt to compare health for different infohashes$'): + health.should_replace(prev_health) + + +def test_invalid_health(): + prev_health = HealthInfo(INFOHASH) + health = HealthInfo(INFOHASH, last_check=now() + TOLERABLE_TIME_DRIFT + 2) + assert not health.is_valid() + assert not health.should_replace(prev_health) + + +def test_self_checked_health_update_remote_health(): + prev_health = HealthInfo(INFOHASH) + health = HealthInfo(INFOHASH, self_checked=True) + assert health.should_replace(prev_health) + + +def test_self_checked_health_torrent_state_outside_window(): + prev_health = HealthInfo(INFOHASH, last_check=now() - TORRENT_CHECK_WINDOW - 1, self_checked=True) + health = HealthInfo(INFOHASH, self_checked=True) + assert health.should_replace(prev_health) + + +def test_self_checked_health_inside_window_more_seeders(): + prev_health = HealthInfo(INFOHASH, 1, 2, last_check=now() - TORRENT_CHECK_WINDOW + 2, self_checked=True) + health = HealthInfo(INFOHASH, 2, 1, self_checked=True) + assert health > prev_health + assert health.should_replace(prev_health) + + +def test_self_checked_health_inside_window_fewer_seeders(): + prev_health = HealthInfo(INFOHASH, 2, 1, last_check=now() - TORRENT_CHECK_WINDOW + 2, self_checked=True) + health = HealthInfo(INFOHASH, 1, 2, self_checked=True) + assert health < prev_health + assert not health.should_replace(prev_health) + + +def test_self_checked_torrent_state_fresh_enough(): + prev_health = HealthInfo(INFOHASH, last_check=now() - HEALTH_FRESHNESS_SECONDS + 2, self_checked=True) + health = HealthInfo(INFOHASH) + assert not health.should_replace(prev_health) + + +def test_torrent_state_self_checked_long_ago(): + prev_health = HealthInfo(INFOHASH, last_check=now() - HEALTH_FRESHNESS_SECONDS - 2, self_checked=True) + health = HealthInfo(INFOHASH) + assert health.should_replace(prev_health) + + # should work the same way if time is not recent + big_time_offset = 1000000 + prev_health.last_check -= big_time_offset + health.last_check -= big_time_offset + assert health.should_replace(prev_health) + + +def test_more_recent_more_seeders(): + t = now() - 100 + prev_health = HealthInfo(INFOHASH, 1, 2, last_check=t) + + health = HealthInfo(INFOHASH, 2, 1, last_check=t-1) + assert abs(prev_health.last_check - health.last_check) <= TOLERABLE_TIME_DRIFT + assert health.should_replace(prev_health) + + health.last_check = t+1 + assert abs(prev_health.last_check - health.last_check) <= TOLERABLE_TIME_DRIFT + assert health.should_replace(prev_health) + + +def test_more_recent_fewer_seeders(): + t = now() - 100 + prev_health = HealthInfo(INFOHASH, 2, 1, last_check=t) + + health = HealthInfo(INFOHASH, last_check=t-1, seeders=1, leechers=2) + assert abs(prev_health.last_check - health.last_check) <= TOLERABLE_TIME_DRIFT + assert not health.should_replace(prev_health) + + health.last_check = t+1 + assert abs(prev_health.last_check - health.last_check) <= TOLERABLE_TIME_DRIFT + assert not health.should_replace(prev_health) + + +def test_less_recent_more_seeders(): + t = now() - 100 + prev_health = HealthInfo(INFOHASH, last_check=t) + health = HealthInfo(INFOHASH, 100, last_check=t - TOLERABLE_TIME_DRIFT - 1) + assert not health.should_replace(prev_health) diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py index df231c4cd65..d054a8daccd 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py @@ -1,15 +1,19 @@ +import logging import os import random import secrets import time -from unittest.mock import AsyncMock, MagicMock +from asyncio import CancelledError +from binascii import unhexlify +from unittest.mock import AsyncMock, MagicMock, Mock import pytest from ipv8.util import succeed from pony.orm import db_session import tribler.core.components.torrent_checker.torrent_checker.torrent_checker as torrent_checker_module -from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo, TrackerResponse +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo, TOLERABLE_TIME_DRIFT, \ + TrackerResponse from tribler.core.components.torrent_checker.torrent_checker.utils import aggregate_responses_for_infohash, \ filter_non_exceptions from tribler.core.components.torrent_checker.torrent_checker.torrent_checker import TorrentChecker @@ -20,13 +24,13 @@ # pylint: disable=protected-access -@pytest.fixture -def tracker_manager(tmp_path, metadata_store): +@pytest.fixture(name="tracker_manager") +def tracker_manager_fixture(tmp_path, metadata_store): return TrackerManager(state_dir=tmp_path, metadata_store=metadata_store) @pytest.fixture(name="torrent_checker") -async def fixture_torrent_checker(tribler_config, tracker_manager, metadata_store): +async def torrent_checker_fixture(tribler_config, tracker_manager, metadata_store): torrent_checker = TorrentChecker(config=tribler_config, download_manager=MagicMock(), notifier=MagicMock(), @@ -258,6 +262,7 @@ def test_update_health(torrent_checker: TorrentChecker): ] health = aggregate_responses_for_infohash(infohash, responses) + health.self_checked = True # Check that everything works fine even if the database contains no proper infohash updated = torrent_checker.update_torrent_health(health) @@ -377,3 +382,121 @@ def add_torrent_to_channel(infohash, last_check): # Health check requests are sent for all selected torrents result = await torrent_checker.check_torrents_in_user_channel() assert len(result) == len(selected_torrents) + + +async def test_get_tracker_response_cancelled_error(torrent_checker: TorrentChecker, caplog): + """ + Tests that CancelledError from session.connect_to_tracker() is handled correctly + """ + torrent_checker.clean_session = AsyncMock() + torrent_checker.update_torrent_health = Mock() + torrent_checker.tracker_manager.update_tracker_info = Mock() + + tracker_url = '' + session = Mock(tracker_url=tracker_url) + session.connect_to_tracker = AsyncMock(side_effect=CancelledError()) + + with pytest.raises(CancelledError): + await torrent_checker.get_tracker_response(session) + + torrent_checker.clean_session.assert_called_once() + torrent_checker.update_torrent_health.assert_not_called() + torrent_checker.tracker_manager.update_tracker_info.assert_not_called() + + assert caplog.record_tuples == [ + ('TorrentChecker', logging.INFO, 'Tracker session is being cancelled: ') + ] + + +async def test_get_tracker_response_other_error(torrent_checker: TorrentChecker, caplog): + """ + Tests that arbitrary exception from session.connect_to_tracker() is handled correctly + """ + torrent_checker.clean_session = AsyncMock() + torrent_checker.update_torrent_health = Mock() + torrent_checker.tracker_manager.update_tracker_info = Mock() + + tracker_url = '' + session = Mock(tracker_url=tracker_url) + session.connect_to_tracker = AsyncMock(side_effect=ValueError('error text')) + + with pytest.raises(ValueError, match='^error text$'): + await torrent_checker.get_tracker_response(session) + + torrent_checker.clean_session.assert_called_once() + torrent_checker.update_torrent_health.assert_not_called() + torrent_checker.tracker_manager.update_tracker_info.assert_called_once_with(tracker_url, False) + + assert caplog.record_tuples == [ + ('TorrentChecker', logging.WARNING, "Got session error for the tracker: \nerror text") + ] + + +async def test_get_tracker_response(torrent_checker: TorrentChecker, caplog): + """ + Tests that the result from session.connect_to_tracker() is handled correctly and passed to update_torrent_health() + """ + health = HealthInfo(unhexlify('abcd0123')) + tracker_url = '' + tracker_response = TrackerResponse(url=tracker_url, torrent_health_list=[health]) + + session = Mock(tracker_url=tracker_url) + session.connect_to_tracker = AsyncMock(return_value=tracker_response) + + torrent_checker.clean_session = AsyncMock() + torrent_checker.update_torrent_health = Mock() + results = await torrent_checker.get_tracker_response(session) + + assert results is tracker_response + torrent_checker.update_torrent_health.assert_called_once_with(health) + + assert "Got response from Mock" in caplog.text + + +def test_update_torrent_health_invalid_health(torrent_checker: TorrentChecker, caplog): + """ + Tests that invalid health is ignored in TorrentChecker.update_torrent_health() + """ + caplog.set_level(logging.WARNING) + now = int(time.time()) + health = HealthInfo(unhexlify('abcd0123'), last_check=now + TOLERABLE_TIME_DRIFT + 2) + assert not torrent_checker.update_torrent_health(health) + assert "Invalid health info ignored: " in caplog.text + + +def test_update_torrent_health_not_self_checked(torrent_checker: TorrentChecker, caplog): + """ + Tests that non-self-checked health is ignored in TorrentChecker.update_torrent_health() + """ + caplog.set_level(logging.ERROR) + health = HealthInfo(unhexlify('abcd0123')) + assert not torrent_checker.update_torrent_health(health) + assert "Self-checked torrent health expected" in caplog.text + + +def test_update_torrent_health_unknown_torrent(torrent_checker: TorrentChecker, caplog): + """ + Tests that unknown torrent's health is ignored in TorrentChecker.update_torrent_health() + """ + caplog.set_level(logging.WARNING) + health = HealthInfo(unhexlify('abcd0123'), 1, 2, self_checked=True) + assert not torrent_checker.update_torrent_health(health) + assert "Unknown torrent: abcd0123" in caplog.text + + +async def test_update_torrent_health_no_replace(torrent_checker, caplog): + """ + Tests that the TorrentChecker.notify() method is called even if the new health does not replace the old health + """ + now = int(time.time()) + torrent_checker.notify = Mock() + + with db_session: + torrent_state = torrent_checker.mds.TorrentState(infohash=unhexlify('abcd0123'), seeders=2, leechers=1, + last_check=now, self_checked=True) + prev_health = torrent_state.to_health() + + health = HealthInfo(unhexlify('abcd0123'), 1, 2, self_checked=True, last_check=now) + assert not torrent_checker.update_torrent_health(health) + assert "Skip health update, the health in the database is fresher or have more seeders" in caplog.text + torrent_checker.notify.assert_called_with(prev_health) diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker_session.py b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker_session.py index e2588eefe21..20d9529f1ed 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker_session.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker_session.py @@ -362,7 +362,7 @@ async def test_connect_to_tracker_bep33(bep33_session, mock_dlmgr): Test the metainfo lookup of the BEP33 DHT session """ infohash = b'a' * 20 - infohash_health = HealthInfo(infohash, last_check=int(time.time()), seeders=1, leechers=2) + infohash_health = HealthInfo(infohash, seeders=1, leechers=2) mock_dlmgr.dht_health_manager = Mock() mock_dlmgr.dht_health_manager.get_health = lambda *_, **__: succeed(infohash_health) diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py b/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py index be466007011..cc7b7bfcef7 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py @@ -15,9 +15,10 @@ from tribler.core.components.metadata_store.db.serialization import REGULAR_TORRENT from tribler.core.components.metadata_store.db.store import MetadataStore from tribler.core.components.torrent_checker.torrent_checker import DHT -from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo, TrackerResponse +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HEALTH_FRESHNESS_SECONDS, HealthInfo, \ + TrackerResponse from tribler.core.components.torrent_checker.torrent_checker.utils import aggregate_responses_for_infohash, \ - filter_non_exceptions, gather_coros, aggregate_health_by_infohash + filter_non_exceptions, gather_coros from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import \ FakeBep33DHTSession, FakeDHTSession, TrackerSession, UdpSocketManager, create_tracker_session from tribler.core.components.torrent_checker.torrent_checker.tracker_manager import MAX_TRACKER_FAILURES, TrackerManager @@ -36,7 +37,6 @@ TORRENT_SELECTION_POOL_SIZE = 2 # How many torrents to check (popular or random) during periodic check USER_CHANNEL_TORRENT_SELECTION_POOL_SIZE = 5 # How many torrents to check from user's channel during periodic check -HEALTH_FRESHNESS_SECONDS = 4 * 3600 # Number of seconds before a torrent health is considered stale. Default: 4 hours TORRENTS_CHECKED_RETURN_SIZE = 240 # Estimated torrents checked on default 4 hours idle run @@ -158,12 +158,11 @@ async def check_random_tracker(self): else: health_list = response.torrent_health_list self._logger.info(f"Received {len(health_list)} health info results from tracker: {health_list}") - for health in aggregate_health_by_infohash(health_list): - self.update_torrent_health(health) async def get_tracker_response(self, session: TrackerSession) -> TrackerResponse: + t1 = time.time() try: - return await session.connect_to_tracker() + result = await session.connect_to_tracker() except CancelledError: self._logger.info(f"Tracker session is being cancelled: {session.tracker_url}") raise @@ -175,6 +174,15 @@ async def get_tracker_response(self, session: TrackerSession) -> TrackerResponse finally: await self.clean_session(session) + t2 = time.time() + self._logger.info(f"Got response from {session.__class__.__name__} in {t2-t1:.3f} seconds: {result}") + + with db_session: + for health in result.torrent_health_list: + self.update_torrent_health(health) + + return result + @property def torrents_checked(self) -> Dict[bytes, HealthInfo]: if self._torrents_checked is None: @@ -196,8 +204,8 @@ def load_torrents_checked_from_db(self) -> Dict[bytes, HealthInfo]: .limit(TORRENTS_CHECKED_RETURN_SIZE)) for torrent in checked_torrents: - result[torrent.infohash] = HealthInfo( - torrent.infohash, seeders=torrent.seeders, leechers=torrent.leechers, last_check=torrent.last_check) + result[torrent.infohash] = HealthInfo(torrent.infohash, torrent.seeders, torrent.leechers, + last_check=torrent.last_check, self_checked=True) return result @db_session @@ -328,7 +336,10 @@ async def check_torrent_health(self, infohash: bytes, timeout=20, scrape_now=Fal self._logger.info(f'{len(responses)} responses for {infohash_hex} have been received: {responses}') successful_responses = filter_non_exceptions(responses) health = aggregate_responses_for_infohash(infohash, successful_responses) - self.update_torrent_health(health) + if health.last_check == 0: # if not zero, was already updated in get_tracker_response + health.last_check = int(time.time()) + health.self_checked = True + self.update_torrent_health(health) def _create_session_for_request(self, tracker_url, timeout=20) -> Optional[TrackerSession]: self._logger.debug(f'Creating a session for the request: {tracker_url}') @@ -366,6 +377,10 @@ def update_torrent_health(self, health: HealthInfo) -> bool: self._logger.warning(f'Invalid health info ignored: {health}') return False + if not health.self_checked: + self._logger.error(f'Self-checked torrent health expected. Got: {health}') + return False + self._logger.debug(f'Update torrent health: {health}') with db_session: # Update torrent state @@ -374,18 +389,24 @@ def update_torrent_health(self, health: HealthInfo) -> bool: self._logger.warning(f"Unknown torrent: {hexlify(health.infohash)}") return False - if not health.should_update(torrent_state, self_checked=True): - self._logger.info("Skip health update, the health in the database is fresher") + prev_health = torrent_state.to_health() + if not health.should_replace(prev_health): + self._logger.info("Skip health update, the health in the database is fresher or have more seeders") + self.notify(prev_health) # to update UI state from "Checking..." return False torrent_state.set(seeders=health.seeders, leechers=health.leechers, last_check=health.last_check, self_checked=True) - if health.seeders > 0: + if health.seeders > 0 or health.leechers > 0: self.torrents_checked[health.infohash] = health else: self.torrents_checked.pop(health.infohash, None) + self.notify(health) + return True + + def notify(self, health: HealthInfo): self.notifier[notifications.channel_entity_updated]({ 'infohash': health.infohash_hex, 'num_seeders': health.seeders, @@ -393,4 +414,3 @@ def update_torrent_health(self, health: HealthInfo) -> bool: 'last_tracker_check': health.last_check, 'health': 'updated' }) - return True diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/torrentchecker_session.py b/src/tribler/core/components/torrent_checker/torrent_checker/torrentchecker_session.py index e2cd0786de3..f93efe9f63e 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/torrentchecker_session.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/torrentchecker_session.py @@ -161,14 +161,15 @@ def _process_scrape_response(self, body) -> TrackerResponse: leechers = file_info.get(b'incomplete', 0) unprocessed_infohashes.discard(infohash) - health_list.append(HealthInfo(infohash, last_check=now, seeders=seeders, leechers=leechers)) + health_list.append(HealthInfo(infohash, seeders, leechers, last_check=now, self_checked=True)) elif b'failure reason' in response_dict: self._logger.info("%s Failure as reported by tracker [%s]", self, repr(response_dict[b'failure reason'])) self.failed(msg=repr(response_dict[b'failure reason'])) # handle the infohashes with no result (seeders/leechers = 0/0) - health_list.extend(HealthInfo(infohash=infohash, last_check=now) for infohash in unprocessed_infohashes) + health_list.extend(HealthInfo(infohash=infohash, last_check=now, self_checked=True) + for infohash in unprocessed_infohashes) self.is_finished = True return TrackerResponse(url=self.tracker_url, torrent_health_list=health_list) @@ -397,7 +398,8 @@ async def scrape(self) -> TrackerResponse: # Store the information in the hash dict to be returned. # Sow complete as seeders. "complete: number of peers with the entire file, i.e. seeders (integer)" # - https://wiki.theory.org/BitTorrentSpecification#Tracker_.27scrape.27_Convention - response_list.append(HealthInfo(infohash, last_check=now, seeders=complete, leechers=incomplete)) + response_list.append(HealthInfo(infohash, seeders=complete, leechers=incomplete, + last_check=now, self_checked=True)) # close this socket and remove its transaction ID from the list self.remove_transaction_id() @@ -422,7 +424,8 @@ async def connect_to_tracker(self) -> TrackerResponse: now = int(time.time()) for infohash in self.infohash_list: metainfo = await self.download_manager.get_metainfo(infohash, timeout=self.timeout, raise_errors=True) - health = HealthInfo(infohash, last_check=now, seeders=metainfo[b'seeders'], leechers=metainfo[b'leechers']) + health = HealthInfo(infohash, seeders=metainfo[b'seeders'], leechers=metainfo[b'leechers'], + last_check=now, self_checked=True) health_list.append(health) return TrackerResponse(url=DHT, torrent_health_list=health_list) diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/utils.py b/src/tribler/core/components/torrent_checker/torrent_checker/utils.py index 6afde5bf860..30dc8857ebc 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/utils.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/utils.py @@ -31,18 +31,6 @@ def aggregate_responses_for_infohash(infohash: bytes, responses: List[TrackerRes result = HealthInfo(infohash, last_check=0) for response in responses: for health in response.torrent_health_list: - if health.infohash == infohash and health.seeders > result.seeders: + if health.infohash == infohash and health > result: result = health return result - - -def aggregate_health_by_infohash(health_list: List[HealthInfo]) -> List[HealthInfo]: - """ - For each infohash in the health list, finds the "best" health info (with the max number of seeders) - """ - d: Dict[bytes, HealthInfo] = {} - for health in health_list: - infohash = health.infohash - if infohash not in d or health.seeders > d[infohash].seeders: - d[infohash] = health - return list(d.values()) diff --git a/src/tribler/core/utilities/notifier.py b/src/tribler/core/utilities/notifier.py index 8c9022fbd62..c7b7d2d2843 100644 --- a/src/tribler/core/utilities/notifier.py +++ b/src/tribler/core/utilities/notifier.py @@ -107,6 +107,7 @@ def __init__(self, loop: AbstractEventLoop = None): self.logger = logging.getLogger(self.__class__.__name__) self.topics_by_name: Dict[str, Callable] = {} + self.unknown_topic_names = set() # We use the dict type for `self.observers` and `set.generic_observers` instead of the set type to provide # the deterministic ordering of callbacks. In Python, dictionaries are ordered while sets aren't. # Therefore, `value: bool` here is unnecessary and is never used. @@ -204,7 +205,9 @@ def notify_by_topic_name(self, topic_name: str, *args, **kwargs): with self.lock: topic = self.topics_by_name.get(topic_name) if topic is None: - self.logger.warning(f'Topic with name `{topic_name}` not found') + if topic_name not in self.unknown_topic_names: + self.unknown_topic_names.add(topic_name) + self.logger.warning(f'Topic with name `{topic_name}` not found') else: self.notify(topic, *args, **kwargs) diff --git a/src/tribler/gui/widgets/tablecontentmodel.py b/src/tribler/gui/widgets/tablecontentmodel.py index 3870db1d9fb..43e849b50e9 100644 --- a/src/tribler/gui/widgets/tablecontentmodel.py +++ b/src/tribler/gui/widgets/tablecontentmodel.py @@ -534,6 +534,9 @@ def item_txt(self, index, role, is_editing: bool = False): time_without_microseconds = str(td).partition('.')[0] return f'Checked: {time_without_microseconds} ago' + if role == Qt.ToolTipRole and column_type == Column.NAME: + return f'{item["infohash"][:8]}' + # The 'name' column is special in a sense that we want to draw the title and tags ourselves. # At the same time, we want to name this column to not break the renaming of torrent files, hence this check. if column_type == Column.NAME and not is_editing: