Skip to content

Commit

Permalink
Merge pull request #5668 from drew2a/feature/5658
Browse files Browse the repository at this point in the history
Gossip only healthy torrents
  • Loading branch information
drew2a committed Oct 24, 2020
2 parents 08bb535 + 4a00014 commit edfde4a
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio
import heapq
import random
from binascii import unhexlify

Expand All @@ -11,13 +13,20 @@
from tribler_core.modules.popularity.payload import TorrentsHealthPayload
from tribler_core.utilities.unicode import hexlify

PUBLISH_INTERVAL = 5


class PopularityCommunity(Community):
"""
Community for disseminating the content across the network. Follows publish-subscribe model.
Community for disseminating the content across the network.
Every 5 seconds it gossips 5 the most popular torrents and 5 random torrents to
a random peer.
Gossiping is for checked torrents only.
"""
GOSSIP_INTERVAL = 5
GOSSIP_POPULAR_TORRENT_COUNT = 5
GOSSIP_RANDOM_TORRENT_COUNT = 5

community_id = unhexlify('9aca62f878969c437da9844cba29a134917e1648')

def __init__(self, *args, **kwargs):
Expand All @@ -29,36 +38,79 @@ def __init__(self, *args, **kwargs):

self.add_message_handler(TorrentsHealthPayload, self.on_torrents_health)

self.logger.info('Popularity Community initialized (peer mid %s)', hexlify(self.my_peer.mid))
self.register_task("publish", self.gossip_torrents_health, interval=PUBLISH_INTERVAL)
self.logger.info('Popularity Community initialized (peer mid %s)',
hexlify(self.my_peer.mid))
self.register_task("gossip", self.gossip_torrents_health,
interval=PopularityCommunity.GOSSIP_INTERVAL)

@staticmethod
def select_torrents_to_gossip(torrents) -> (set, set):
""" Select torrents to gossip.
Select top 5 popular torrents, and 5 random torrents.
Args:
torrents: set of tuples (infohash, seeders, leechers, last_check)
Returns:
tuple (set(popular), set(random))
"""
# select the torrents that have seeders
alive = set((_, seeders, *rest) for (_, seeders, *rest) in torrents
if seeders > 0)
if not alive:
return {}, {}

# select 5 most popular from alive torrents, using `seeders` as a key
count = PopularityCommunity.GOSSIP_POPULAR_TORRENT_COUNT
popular = set(heapq.nlargest(count, alive, key=lambda t: t[1]))

# select 5 random torrents from the rest of the list
rest = alive - popular
count = min(PopularityCommunity.GOSSIP_RANDOM_TORRENT_COUNT, len(rest))
rand = set(random.sample(rest, count))

return popular, rand

@db_session
def gossip_torrents_health(self):
"""
Gossip torrent health information to another peer.
"""
if not self.get_peers() or not self.torrent_checker:
return

num_torrents_checked = len(self.torrent_checker.torrents_checked)
random_torrents_checked = random.sample(self.torrent_checker.torrents_checked, min(num_torrents_checked, 5))
popular_torrents_checked = sorted(self.torrent_checker.torrents_checked - set(random_torrents_checked),
key=lambda tup: tup[1], reverse=True)[:5]
checked = self.torrent_checker.torrents_checked
if not checked:
return

popular, rand = PopularityCommunity.select_torrents_to_gossip(checked)
if not popular and not rand:
self.logger.info(f'No torrents to gossip. Checked torrents count: '
f'{len(checked)}')
return

random_peer = random.choice(self.get_peers())

self.logger.info(
f'Gossip torrent health information for {len(random_torrents_checked)}'
f' random torrents and {len(popular_torrents_checked)} checked torrents', )
self.ez_send(random_peer, TorrentsHealthPayload.create(random_torrents_checked, popular_torrents_checked))
f'Gossip torrent health information for {len(rand)}'
f' random torrents and {len(popular)} popular torrents')

self.ez_send(random_peer, TorrentsHealthPayload.create(rand, popular))

@lazy_wrapper(TorrentsHealthPayload)
async def on_torrents_health(self, peer, payload):
self.logger.info("Received torrent health information for %d random torrents and %d checked torrents",
len(payload.random_torrents), len(payload.torrents_checked))
all_torrents = payload.random_torrents + payload.torrents_checked
self.logger.info(f"Received torrent health information for "
f"{len(payload.torrents_checked)} popular torrents and"
f" {len(payload.random_torrents)} random torrents")

torrents = payload.random_torrents + payload.torrents_checked
asyncio.create_task(self.process_torrents_health(peer, torrents))

async def process_torrents_health(self, peer, torrent_healths):
infohashes_to_resolve = []
with db_session:
for infohash, seeders, leechers, last_check in all_torrents:
for infohash, seeders, leechers, last_check in torrent_healths:
torrent_state = self.metadata_store.TorrentState.get(infohash=infohash)
if torrent_state and last_check > torrent_state.last_check:
# Replace current information
Expand All @@ -70,5 +122,13 @@ async def on_torrents_health(self, peer, payload):
self.metadata_store.TorrentState(infohash=infohash, seeders=seeders,
leechers=leechers, last_check=last_check)
self.logger.info(f"{hexlify(infohash)} added ({seeders},{leechers})")
if self.notifier:
self.notifier.notify(NTFY.POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT, peer, infohash)
infohashes_to_resolve.append(infohash)

if not self.notifier:
return

# `self.notifier.notify` has been extracted from `with db_session:` to
# prevent issues related to nested db_sessions inside notifier callbacks
for infohash in infohashes_to_resolve:
self.notifier.notify(NTFY.POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT,
peer, infohash)
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import logging
import time
from random import randint
from types import SimpleNamespace

from ipv8.keyvault.crypto import default_eccrypto
from ipv8.test.base import TestBase
from ipv8.test.mocking.ipv8 import MockIPv8

from pony.orm import db_session

import pytest

from tribler_common.simpledefs import NTFY

from tribler_core.modules.metadata_store.store import MetadataStore
from tribler_core.modules.popularity.popularity_community import PopularityCommunity
from tribler_core.notifier import Notifier
from tribler_core.tests.tools.base_test import MockObject
from tribler_core.utilities.path_util import Path
from tribler_core.utilities.random_utils import random_infohash


class TestPopularityCommunity(TestBase):
Expand Down Expand Up @@ -54,11 +60,22 @@ async def test_torrents_health_gossip(self):
Test whether torrent health information is correctly gossiped around
"""
checked_torrent_info = (b'a' * 20, 200, 0, int(time.time()))
db1 = self.nodes[0].overlay.metadata_store.TorrentState
db2 = self.nodes[1].overlay.metadata_store.TorrentState

with db_session:
assert db1.select().count() == 0
assert db2.select().count() == 0

await self.init_first_node_and_gossip(checked_torrent_info)

# Check whether node 1 has new torrent health information
with db_session:
self.assertEqual(len(self.nodes[1].overlay.metadata_store.TorrentState.select()), 1)
torrent = db2.select().first()
assert torrent.infohash == checked_torrent_info[0]
assert torrent.seeders == checked_torrent_info[1]
assert torrent.leechers == checked_torrent_info[2]
assert torrent.last_check == checked_torrent_info[3]

async def test_torrents_health_override(self):
"""
Expand Down Expand Up @@ -94,7 +111,103 @@ def on_torrent_state_added(self, peer, torrent_hash):
assert not remote_query_community.added_peer
assert not remote_query_community.torrent_hash

# check that no NPE when notifier is None
self.nodes[1].overlay.notifier = None
await self.init_first_node_and_gossip((b'1' * 20, 200, 0, int(time.time())))
assert not remote_query_community.added_peer
assert not remote_query_community.torrent_hash

# check that notifications are works
self.nodes[1].overlay.notifier = notifier
await self.init_first_node_and_gossip((b'2' * 20, 200, 0, int(time.time())))

assert remote_query_community.added_peer
assert remote_query_community.torrent_hash


@pytest.mark.asyncio
async def test_select_torrents_to_gossip_small_list():
torrents = [
# infohash, seeders, leechers, last_check
(b'0' * 20, 0, 0, None),
(b'1' * 20, 1, 0, None),
(b'1' * 20, 2, 0, None),
]

popular, rand = PopularityCommunity.select_torrents_to_gossip(set(torrents))
assert torrents[1] in popular
assert torrents[2] in popular
assert not rand


@pytest.mark.asyncio
async def test_select_torrents_to_gossip_big_list():
# torrent structure is (infohash, seeders, leechers, last_check)
dead_torrents = {(random_infohash(), 0, randint(1, 10), None)
for _ in range(10)}

alive_torrents = {(random_infohash(), randint(1, 10), randint(1, 10), None)
for _ in range(10)}

top5_popular_torrents = {(random_infohash(), randint(11, 100), randint(1, 10), None)
for _ in range(PopularityCommunity.GOSSIP_POPULAR_TORRENT_COUNT)}

all_torrents = dead_torrents | alive_torrents | top5_popular_torrents

popular, rand = PopularityCommunity.select_torrents_to_gossip(all_torrents)
assert len(popular) <= PopularityCommunity.GOSSIP_POPULAR_TORRENT_COUNT
assert popular == top5_popular_torrents

assert len(rand) <= PopularityCommunity.GOSSIP_RANDOM_TORRENT_COUNT
assert rand <= alive_torrents


@pytest.mark.asyncio
async def test_no_alive_torrents():
torrents = {(random_infohash(), 0, randint(1, 10), None)
for _ in range(10)}

popular, rand = PopularityCommunity.select_torrents_to_gossip(torrents)
assert not popular
assert not rand


# pylint: disable=super-init-not-called
@pytest.mark.asyncio
async def test_gossip_torrents_health_returns():
class MockPopularityCommunity(PopularityCommunity):
def __init__(self):
self.is_ez_send_has_been_called = False
self.torrent_checker = None
self.logger = logging.getLogger()

def gossip_torrents_health(self):
PopularityCommunity.gossip_torrents_health(self)

def ez_send(self, peer, *payloads, **kwargs):
self.is_ez_send_has_been_called = True

def get_peers(self):
return [None]

community = MockPopularityCommunity()

community.gossip_torrents_health()
assert not community.torrent_checker
assert not community.is_ez_send_has_been_called

community.torrent_checker = SimpleNamespace()
community.torrent_checker.torrents_checked = None
community.gossip_torrents_health()
assert not community.is_ez_send_has_been_called

community.torrent_checker.torrents_checked = {(b'0' * 20, 0, 0, None),
(b'1' * 20, 0, 0, None)}

community.gossip_torrents_health()
assert not community.is_ez_send_has_been_called

community.torrent_checker.torrents_checked = {(b'0' * 20, 1, 0, None),
(b'1' * 20, 1, 0, None)}
community.gossip_torrents_health()
assert community.is_ez_send_has_been_called

0 comments on commit edfde4a

Please sign in to comment.