Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and speed up health checks #7313

Merged
merged 19 commits into from Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f91db36
Remove unused function
kozlovsky Mar 9, 2023
377fc85
The notifier should warn only once about unknown topic names to reduc…
kozlovsky Mar 6, 2023
2d05736
Show the first part of the torrent's infohash in the tooltip for easi…
kozlovsky Mar 6, 2023
4b54372
Set Socks5Connection log level to WARNING to reduce log spam
kozlovsky Mar 6, 2023
1b2e764
Remove duplicate import
kozlovsky Mar 6, 2023
e099241
Add HealthInfo.seeders_leechers_last_check property to make compariso…
kozlovsky Mar 6, 2023
ac4cc68
Fix determining the best health info in aggregate_responses_for_infoh…
kozlovsky Mar 6, 2023
10a8357
Log response time in TorrentChecker.get_tracker_response()
kozlovsky Mar 8, 2023
5676699
Fixes #7287: Fix the health info comparison algorithm
kozlovsky Mar 8, 2023
1598755
Fix: add torrents with non-zero leechers to torrents_checked
kozlovsky Mar 8, 2023
a74bd04
UI speedup: update health info without waiting responses from all tra…
kozlovsky Mar 8, 2023
a9acf8f
Refactoring: extract TorrentChecker.notify() method
kozlovsky Mar 8, 2023
2ea10b6
Fix: send notification to GUI even if the new health info is ignored
kozlovsky Mar 8, 2023
b2d51b2
Move the `HealthInfo.last_check` field to the end (to be able to use …
kozlovsky Mar 9, 2023
cdfabc2
Remove HealthInfo.seeders_leechers_last_check property, compare Healt…
kozlovsky Mar 9, 2023
bf46b3d
Simplify tests
kozlovsky Mar 9, 2023
a2efecf
Add HealthInfo.self_checked field, pass HealthInfo to should_replace …
kozlovsky Mar 10, 2023
2891092
Add health.old(), health.older_than(other), health.much_older_than(ot…
kozlovsky Mar 9, 2023
f10d2dd
Add tests for better coverage
kozlovsky Mar 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -65,7 +65,7 @@ def finalize_lookup(self, infohash):
seeders = DHTHealthManager.get_size_from_bloomfilter(bf_seeders)
peers = DHTHealthManager.get_size_from_bloomfilter(bf_peers)
if not self.lookup_futures[infohash].done():
health = HealthInfo(infohash, last_check=int(time.time()), seeders=seeders, leechers=peers)
health = HealthInfo(infohash, seeders=seeders, leechers=peers)
self.lookup_futures[infohash].set_result(health)

self.lookup_futures.pop(infohash, None)
Expand Down
Expand Up @@ -23,10 +23,9 @@ class TorrentState(db.Entity):
@classmethod
def from_health(cls, health: HealthInfo):
return cls(infohash=health.infohash, seeders=health.seeders, leechers=health.leechers,
last_check=health.last_check)
last_check=health.last_check, self_checked=health.self_checked)

def to_health(self) -> HealthInfo:
return HealthInfo(infohash=self.infohash, last_check=self.last_check,
seeders=self.seeders, leechers=self.leechers)
return HealthInfo(self.infohash, self.seeders, self.leechers, self.last_check, self.self_checked)

return TorrentState
2 changes: 1 addition & 1 deletion src/tribler/core/components/metadata_store/db/store.py
Expand Up @@ -484,7 +484,7 @@ def process_torrent_health(self, health: HealthInfo) -> bool:

torrent_state = self.TorrentState.get_for_update(infohash=health.infohash)

if torrent_state and health.should_update(torrent_state):
if torrent_state and health.should_replace(torrent_state.to_health()):
self._logger.debug(f"Update health info {health}")
torrent_state.set(seeders=health.seeders, leechers=health.leechers, last_check=health.last_check,
self_checked=False)
Expand Down
Expand Up @@ -23,8 +23,6 @@
from tribler.core.components.metadata_store.remote_query_community.payload_checker import ObjState
from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings
from tribler.core.components.metadata_store.utils import RequestTimeoutException
from tribler.core.components.knowledge.community.knowledge_validator import is_valid_resource
from tribler.core.components.knowledge.db.knowledge_db import ResourceType
from tribler.core.utilities.pony_utils import run_threaded
from tribler.core.utilities.unicode import hexlify

Expand Down
Expand Up @@ -32,8 +32,7 @@ def get_peers_for(health_status):
return randint(101, 1000)
return randint(1, 100)

return HealthInfo(random_infohash(), last_check=int(time.time()),
seeders=get_peers_for(status), leechers=get_peers_for(status))
return HealthInfo(random_infohash(), seeders=get_peers_for(status), leechers=get_peers_for(status))


def _generate_checked_torrents(count: int, status: str = None) -> List[HealthInfo]:
Expand Down Expand Up @@ -89,7 +88,7 @@ async def test_torrents_health_gossip(self):
"""
Test whether torrent health information is correctly gossiped around
"""
checked_torrent_info = HealthInfo(b'a' * 20, seeders=200, leechers=0, last_check=int(time.time()))
checked_torrent_info = HealthInfo(b'a' * 20, seeders=200, leechers=0)
node0_db = self.nodes[0].overlay.mds.TorrentState
node1_db2 = self.nodes[1].overlay.mds.TorrentState

Expand Down Expand Up @@ -180,7 +179,7 @@ async def test_torrents_health_update(self):
"""
self.fill_database(self.nodes[1].overlay.mds)

checked_torrent_info = HealthInfo(b'0' * 20, seeders=200, leechers=0, last_check=int(time.time()))
checked_torrent_info = HealthInfo(b'0' * 20, seeders=200, leechers=0)
await self.init_first_node_and_gossip(checked_torrent_info, deliver_timeout=0.5)

# Check whether node 1 has new torrent health information
Expand All @@ -197,7 +196,7 @@ async def test_unknown_torrent_query_back(self):
with db_session:
self.nodes[0].overlay.mds.TorrentMetadata(infohash=infohash)
await self.init_first_node_and_gossip(
HealthInfo(infohash, seeders=200, leechers=0, last_check=int(time.time())))
HealthInfo(infohash, seeders=200, leechers=0))
with db_session:
assert self.nodes[1].overlay.mds.TorrentMetadata.get()

Expand All @@ -209,5 +208,5 @@ async def test_skip_torrent_query_back_for_known_torrent(self):
self.nodes[1].overlay.mds.TorrentMetadata(infohash=infohash)
self.nodes[1].overlay.send_remote_select = Mock()
await self.init_first_node_and_gossip(
HealthInfo(infohash, seeders=200, leechers=0, last_check=int(time.time())))
HealthInfo(infohash, seeders=200, leechers=0))
self.nodes[1].overlay.send_remote_select.assert_not_called()
Expand Up @@ -41,6 +41,7 @@ class Socks5Connection(Protocol):
def __init__(self, socksserver):
super().__init__()
self._logger = logging.getLogger(self.__class__.__name__)
self._logger.setLevel(logging.WARNING)
self.socksserver = socksserver
self.transport = None
self.connect_to = None
Expand Down
@@ -1,3 +1,5 @@
from __future__ import annotations

import time
from dataclasses import dataclass, field
from typing import List
Expand All @@ -7,21 +9,29 @@
from tribler.core.utilities.unicode import hexlify


TOLERABLE_TIME_DRIFT = 60 # one minute
HOUR = 60 * 60
MINUTE = 60
HOUR = MINUTE * 60
TOLERABLE_TIME_DRIFT = MINUTE # When receiving health from another peer, how far the timestamp can be in the future?
TORRENT_CHECK_WINDOW = MINUTE # When asking multiple trackers in parallel, we ignore this time difference in responses
HEALTH_FRESHNESS_SECONDS = 4 * HOUR # Number of seconds before a torrent health is considered stale. Default: 4 hours
drew2a marked this conversation as resolved.
Show resolved Hide resolved


@dataclass
@dataclass(order=True)
class HealthInfo:
infohash: bytes = field(repr=False)
last_check: int
seeders: int = 0
leechers: int = 0
last_check: int = field(default_factory=lambda: int(time.time()))
self_checked: bool = False

def __repr__(self):
infohash_repr = hexlify(self.infohash[:4])
age = self._last_check_repr(self.last_check)
return f"{self.__class__.__name__}('{infohash_repr}', {self.seeders}/{self.leechers}, {age})"
status = self._last_check_repr(self.last_check)
if status == 'just checked' and self.self_checked:
status = 'just self-checked'
elif self.self_checked:
status += ', self-checked'
return f"{self.__class__.__name__}('{infohash_repr}', {self.seeders}/{self.leechers}, {status})"

@staticmethod
def _last_check_repr(last_check: int) -> str:
Expand All @@ -46,19 +56,44 @@ def infohash_hex(self):
def is_valid(self) -> bool:
return self.last_check < int(time.time()) + TOLERABLE_TIME_DRIFT

def should_update(self, torrent_state, self_checked=False):
if self.last_check <= torrent_state.last_check:
# The torrent state in the DB is already fresher than this health
def old(self) -> bool:
now = int(time.time())
return self.last_check < now - HEALTH_FRESHNESS_SECONDS

def older_than(self, other: HealthInfo) -> bool:
return self.last_check < other.last_check - TORRENT_CHECK_WINDOW

def much_older_than(self, other: HealthInfo) -> bool:
return self.last_check + HEALTH_FRESHNESS_SECONDS < other.last_check

def should_replace(self, prev: HealthInfo) -> bool:
if self.infohash != prev.infohash:
raise ValueError('An attempt to compare health for different infohashes')

if not self.is_valid():
return False # Health info with future last_check time is ignored

if self.self_checked:
return not prev.self_checked \
or prev.older_than(self) \
or (self.seeders, self.leechers) > (prev.seeders, prev.leechers)

if self.older_than(prev):
# Always ignore a new health info if it is older than the previous health info
return False

now = int(time.time())
hour_ago = now - HOUR
if not self_checked and torrent_state.self_checked and hour_ago <= torrent_state.last_check <= now:
# The torrent state in the DB was locally checked just recently,
# and we trust this recent local check more than the new health info received remotely
if prev.self_checked and not prev.old():
# The previous self-checked health info is fresh enough, do not replace it with a remote health info
return False

return True
if prev.much_older_than(self):
# The previous health info (that can be self-checked ot not) is very old,
# let's replace it with a more recent remote health info
return True

# self is a remote health info that isn't older than previous health info, but isn't much fresher as well
return (self.seeders, self.leechers) > (prev.seeders, prev.leechers)


@dataclass
class TrackerResponse:
Expand Down
@@ -0,0 +1,104 @@
import time

import pytest

from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HEALTH_FRESHNESS_SECONDS, HealthInfo, \
TOLERABLE_TIME_DRIFT, \
TORRENT_CHECK_WINDOW

INFOHASH = b'infohash_1'


def now() -> int:
return int(time.time())


def test_different_infohashes():
prev_health = HealthInfo(INFOHASH)
health = HealthInfo(infohash=b'infohash_2')
with pytest.raises(ValueError, match='^An attempt to compare health for different infohashes$'):
health.should_replace(prev_health)


def test_invalid_health():
prev_health = HealthInfo(INFOHASH)
health = HealthInfo(INFOHASH, last_check=now() + TOLERABLE_TIME_DRIFT + 2)
assert not health.is_valid()
assert not health.should_replace(prev_health)


def test_self_checked_health_update_remote_health():
prev_health = HealthInfo(INFOHASH)
health = HealthInfo(INFOHASH, self_checked=True)
assert health.should_replace(prev_health)


def test_self_checked_health_torrent_state_outside_window():
prev_health = HealthInfo(INFOHASH, last_check=now() - TORRENT_CHECK_WINDOW - 1, self_checked=True)
health = HealthInfo(INFOHASH, self_checked=True)
assert health.should_replace(prev_health)


def test_self_checked_health_inside_window_more_seeders():
prev_health = HealthInfo(INFOHASH, 1, 2, last_check=now() - TORRENT_CHECK_WINDOW + 2, self_checked=True)
health = HealthInfo(INFOHASH, 2, 1, self_checked=True)
assert health > prev_health
assert health.should_replace(prev_health)


def test_self_checked_health_inside_window_fewer_seeders():
prev_health = HealthInfo(INFOHASH, 2, 1, last_check=now() - TORRENT_CHECK_WINDOW + 2, self_checked=True)
health = HealthInfo(INFOHASH, 1, 2, self_checked=True)
assert health < prev_health
assert not health.should_replace(prev_health)


def test_self_checked_torrent_state_fresh_enough():
prev_health = HealthInfo(INFOHASH, last_check=now() - HEALTH_FRESHNESS_SECONDS + 2, self_checked=True)
health = HealthInfo(INFOHASH)
assert not health.should_replace(prev_health)


def test_torrent_state_self_checked_long_ago():
prev_health = HealthInfo(INFOHASH, last_check=now() - HEALTH_FRESHNESS_SECONDS - 2, self_checked=True)
health = HealthInfo(INFOHASH)
assert health.should_replace(prev_health)

# should work the same way if time is not recent
big_time_offset = 1000000
prev_health.last_check -= big_time_offset
health.last_check -= big_time_offset
assert health.should_replace(prev_health)


def test_more_recent_more_seeders():
t = now() - 100
prev_health = HealthInfo(INFOHASH, 1, 2, last_check=t)

health = HealthInfo(INFOHASH, 2, 1, last_check=t-1)
assert abs(prev_health.last_check - health.last_check) <= TOLERABLE_TIME_DRIFT
assert health.should_replace(prev_health)

health.last_check = t+1
assert abs(prev_health.last_check - health.last_check) <= TOLERABLE_TIME_DRIFT
assert health.should_replace(prev_health)


def test_more_recent_fewer_seeders():
t = now() - 100
prev_health = HealthInfo(INFOHASH, 2, 1, last_check=t)

health = HealthInfo(INFOHASH, last_check=t-1, seeders=1, leechers=2)
assert abs(prev_health.last_check - health.last_check) <= TOLERABLE_TIME_DRIFT
assert not health.should_replace(prev_health)

health.last_check = t+1
assert abs(prev_health.last_check - health.last_check) <= TOLERABLE_TIME_DRIFT
assert not health.should_replace(prev_health)


def test_less_recent_more_seeders():
t = now() - 100
prev_health = HealthInfo(INFOHASH, last_check=t)
health = HealthInfo(INFOHASH, 100, last_check=t - TOLERABLE_TIME_DRIFT - 1)
assert not health.should_replace(prev_health)