Skip to content

Commit

Permalink
Merge pull request #5657 from drew2a/feature/5642
Browse files Browse the repository at this point in the history
Request a torrent info after new torrent health has been received
  • Loading branch information
drew2a committed Oct 20, 2020
2 parents 64fcff7 + 59e4ad2 commit 3e69384
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 37 deletions.
1 change: 1 addition & 0 deletions src/tribler-common/tribler_common/simpledefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,4 @@ class NTFY(Enum):
LOW_SPACE = "low_space"
EVENTS_START = "events_start"
TRIBLER_EXCEPTION = "tribler_exception"
POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT = "PopularityCommunity:added_unknown_torrent"
3 changes: 2 additions & 1 deletion src/tribler-core/tribler_core/modules/ipv8_module_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ class MarketTestnetCommunityLauncher(TestnetMixIn, MarketCommunityLauncher):

@overlay('tribler_core.modules.popularity.popularity_community', 'PopularityCommunity')
@precondition('session.config.get_popularity_community_enabled()')
@kwargs(metadata_store='session.mds', torrent_checker='session.torrent_checker')
@kwargs(metadata_store='session.mds', torrent_checker='session.torrent_checker',
notifier='session.notifier')
@walk_strategy('ipv8.peerdiscovery.discovery', 'RandomWalk')
@set_in_session('popularity_community')
class PopularityCommunityLauncher(IPv8CommunityLauncher):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from ipv8.community import Community
from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.lazy_payload import VariablePayload, vp_compile
from ipv8.peer import Peer
from ipv8.requestcache import RandomNumberCache, RequestCache

from tribler_common.simpledefs import CHANNELS_VIEW_UUID, NTFY
Expand All @@ -22,6 +21,12 @@ def sanitize_query(query_dict, cap=100):
first = first or 0
last = last if (last is not None and last <= (first + cap)) else (first + cap)
query_dict.update({"first": first, "last": last})

# convert hex infohash to binary
infohash = query_dict.get('infohash', None)
if infohash:
query_dict['infohash'] = unhexlify(infohash)

return query_dict


Expand Down Expand Up @@ -84,11 +89,19 @@ def __init__(self, my_peer, endpoint, network, metadata_store, settings=None, no
self.queried_subscribed_channels_peers = set()
self.queried_peers_limit = 1000

if self.notifier:
self.notifier.add_observer(NTFY.POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT,
self.on_pc_add_unknown_torrent)

self.add_message_handler(RemoteSelectPayload, self.on_remote_select)
self.add_message_handler(SelectResponsePayload, self.on_remote_select_response)

self.request_cache = RequestCache()

def on_pc_add_unknown_torrent(self, peer, infohash):
query = {'infohash': hexlify(infohash)}
self.send_remote_select(peer, **query)

def get_random_peers(self, sample_size=None):
# Randomly sample sample_size peers from the complete list of our peers
all_peers = self.get_peers()
Expand All @@ -99,6 +112,8 @@ def get_random_peers(self, sample_size=None):
def send_remote_select(self, peer, **kwargs):
request = SelectRequest(self.request_cache, hexlify(peer.mid), kwargs)
self.request_cache.add(request)

self.logger.info(f"Select to {hexlify(peer.mid)} with ({kwargs})")
self.ez_send(peer, RemoteSelectPayload(request.number, json.dumps(kwargs).encode('utf8')))

def send_remote_select_to_many(self, **kwargs):
Expand Down Expand Up @@ -127,6 +142,7 @@ async def on_remote_select(self, peer, request):

@lazy_wrapper(SelectResponsePayload)
async def on_remote_select_response(self, peer, response):
self.logger.info(f"Response from {hexlify(peer.mid)}")

request = self.request_cache.get(hexlify(peer.mid), response.id)
if request is None:
Expand All @@ -142,6 +158,8 @@ async def on_remote_select_response(self, peer, response):
peer_vote = peer if request.request_kwargs.get("subscribed", None) is True else None

result = await self.mds.process_compressed_mdblob_threaded(response.raw_blob, peer_vote_for_channels=peer_vote)

self.logger.info(f"Response result: {result}")
# Maybe move this callback to MetadataStore side?
if self.notifier:
new_channels = [
Expand All @@ -166,6 +184,10 @@ async def unload(self):
await self.request_cache.shutdown()
await super(RemoteQueryCommunity, self).unload()

if self.notifier:
self.notifier.remove_observer(NTFY.POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT,
self.on_pc_add_unknown_torrent)


class RemoteQueryTestnetCommunity(RemoteQueryCommunity):
community_id = unhexlify('ad8cece0dfdb0e03344b59a4d31a38fe9812da9d')
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.db
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

from pony.orm import db_session

from tribler_common.simpledefs import NTFY

from tribler_core.modules.metadata_store.community.remote_query_community import RemoteQueryCommunity, sanitize_query
from tribler_core.modules.metadata_store.orm_bindings.channel_node import NEW
from tribler_core.modules.metadata_store.serialization import CHANNEL_TORRENT, REGULAR_TORRENT
from tribler_core.modules.metadata_store.store import MetadataStore
from tribler_core.tests.tools.base_test import MockObject
from tribler_core.notifier import Notifier
from tribler_core.utilities.path_util import Path
from tribler_core.utilities.random_utils import random_infohash, random_string
from tribler_core.utilities.unicode import hexlify


def add_random_torrent(metadata_cls, name="test", channel=None):
Expand Down Expand Up @@ -39,6 +42,7 @@ def create_node(self, *args, **kwargs):
disable_sync=True,
)
kwargs['metadata_store'] = metadata_store
kwargs['notifier'] = Notifier()
node = super(TestRemoteQueryCommunity, self).create_node(*args, **kwargs)
self.count += 1
return node
Expand Down Expand Up @@ -101,7 +105,7 @@ def mock_notify(overlay, args):
overlay.notified_results = True
self.assertTrue("results" in args[0])

self.nodes[1].overlay.notifier = MockObject()
self.nodes[1].overlay.notifier = Notifier()
self.nodes[1].overlay.notifier.notify = lambda sub, args: mock_notify(self.nodes[1].overlay, args)

with db_session:
Expand Down Expand Up @@ -200,3 +204,90 @@ def test_sanitize_query(self):
]
for req, resp in req_response_list:
self.assertDictEqual(sanitize_query(req), resp)

def test_sanitize_query_infohash(self):
infohash_in_b = b'0' * 20
infohash_in_hex = hexlify(infohash_in_b)

query = {'infohash': infohash_in_hex}
sanitize_query(query)
assert query['infohash'] == infohash_in_b

# assert no exception raises when 'infohash' is missed
sanitize_query({})

async def test_infohash_select(self):
db1 = self.nodes[0].overlay.mds.TorrentMetadata
db2 = self.nodes[1].overlay.mds.TorrentMetadata

torrent_infohash = b'0' * 20
torrent_title = 'title'

def has_testing_infohash(t):
return t.infohash == torrent_infohash

with db_session:
db1.from_dict({"infohash": torrent_infohash, "title": torrent_title}).sign()

torrent_has_been_added_to_db1 = db1.select(has_testing_infohash).count() == 1
torrent_not_presented_on_db2 = db2.select(has_testing_infohash).count() == 0

assert torrent_has_been_added_to_db1
assert torrent_not_presented_on_db2

remote_query = {"infohash": hexlify(torrent_infohash)}
self.nodes[1].overlay.send_remote_select_to_many(**remote_query)

await self.deliver_messages(timeout=0.5)
with db_session:
torrents = list(db2.select(has_testing_infohash))

torrent_is_presented_on_db2 = len(torrents) == 1
torrent_has_valid_title = torrents[0].title == torrent_title

assert torrent_is_presented_on_db2
assert torrent_has_valid_title

async def test_add_unknown_torrent(self):
db1 = self.nodes[0].overlay.mds.TorrentMetadata
db2 = self.nodes[1].overlay.mds.TorrentMetadata

torrent_infohash = b'0' * 20

def has_testing_infohash(t):
return t.infohash == torrent_infohash

with db_session:
db1.from_dict({"infohash": torrent_infohash, "title": 'title'}).sign()

torrent_has_been_added_to_db1 = db1.select(has_testing_infohash).count() == 1
torrent_not_presented_on_db2 = db2.select(has_testing_infohash).count() == 0

assert torrent_has_been_added_to_db1
assert torrent_not_presented_on_db2

# notify second node that new torrent hash has been received from the first node
self.nodes[1].overlay.notifier.notify(NTFY.POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT,
self.nodes[0].my_peer,
torrent_infohash)

await self.deliver_messages(timeout=0.5)
with db_session:
torrent_is_presented_on_db2 = db2.select(has_testing_infohash).count() == 1

assert torrent_is_presented_on_db2

async def test_unknown_query_attribute(self):
rqc_node2 = self.nodes[1].overlay

# only the new attribute
rqc_node2.send_remote_select_to_many(**{'new_attribute': 'some_value'})
await self.deliver_messages(timeout=0.1)

# mixed: the old and a new attribute
rqc_node2.send_remote_select_to_many(**{'infohash': hexlify(b'0' * 20),
'new_attribute': 'some_value'})
await self.deliver_messages(timeout=0.1)

# no exception have been raised
assert True
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def get_entries_query(
subscribed=None,
category=None,
attribute_ranges=None,
infohash=None,
id_=None,
):
"""
Expand Down Expand Up @@ -109,6 +110,7 @@ def get_entries_query(
pony_query = pony_query.where(lambda g: g.status != TODELETE) if exclude_deleted else pony_query
pony_query = pony_query.where(lambda g: g.xxx == 0) if hide_xxx else pony_query
pony_query = pony_query.where(lambda g: g.status != LEGACY_ENTRY) if exclude_legacy else pony_query
pony_query = pony_query.where(lambda g: g.infohash == infohash) if infohash else pony_query

# Sort the query
if sort_by == "HEALTH":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import random
from asyncio import get_event_loop
from binascii import unhexlify

from ipv8.community import Community
from ipv8.lazy_community import lazy_wrapper

from pony.orm import db_session

from tribler_common.simpledefs import NTFY

from tribler_core.modules.popularity.payload import TorrentsHealthPayload
from tribler_core.utilities.unicode import hexlify

Expand All @@ -22,6 +23,7 @@ class PopularityCommunity(Community):
def __init__(self, *args, **kwargs):
self.metadata_store = kwargs.pop('metadata_store')
self.torrent_checker = kwargs.pop('torrent_checker', None)
self.notifier = kwargs.pop('notifier', None)

super(PopularityCommunity, self).__init__(*args, **kwargs)

Expand All @@ -44,27 +46,29 @@ def gossip_torrents_health(self):
key=lambda tup: tup[1], reverse=True)[:5]

random_peer = random.choice(self.get_peers())

self.logger.info(
f'Gossip torrent health information for {len(random_torrents_checked)}'
f' random torrents and {len(popular_torrents_checked)} checked torrents', )
self.ez_send(random_peer, TorrentsHealthPayload.create(random_torrents_checked, popular_torrents_checked))

@lazy_wrapper(TorrentsHealthPayload)
async def on_torrents_health(self, _, payload):
async def on_torrents_health(self, peer, payload):
self.logger.info("Received torrent health information for %d random torrents and %d checked torrents",
len(payload.random_torrents), len(payload.torrents_checked))
all_torrents = payload.random_torrents + payload.torrents_checked

def _put_health_entries_in_db():
with db_session(immediate=True):
for infohash, seeders, leechers, last_check in all_torrents:
torrent_state = self.metadata_store.TorrentState.get(infohash=infohash)
if torrent_state and last_check > torrent_state.last_check:
# Replace current information
torrent_state.seeders = seeders
torrent_state.leechers = leechers
torrent_state.last_check = last_check
elif not torrent_state:
_ = self.metadata_store.TorrentState(infohash=infohash, seeders=seeders,
leechers=leechers, last_check=last_check)

self.metadata_store.disconnect_thread()
await get_event_loop().run_in_executor(None, _put_health_entries_in_db)
with db_session:
for infohash, seeders, leechers, last_check in all_torrents:
torrent_state = self.metadata_store.TorrentState.get(infohash=infohash)
if torrent_state and last_check > torrent_state.last_check:
# Replace current information
torrent_state.seeders = seeders
torrent_state.leechers = leechers
torrent_state.last_check = last_check
self.logger.info(f"{hexlify(infohash)} updated ({seeders},{leechers})")
elif not torrent_state:
self.metadata_store.TorrentState(infohash=infohash, seeders=seeders,
leechers=leechers, last_check=last_check)
self.logger.info(f"{hexlify(infohash)} added ({seeders},{leechers})")
if self.notifier:
self.notifier.notify(NTFY.POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT, peer, infohash)
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

from pony.orm import db_session

from tribler_common.simpledefs import NTFY

from tribler_core.modules.metadata_store.store import MetadataStore
from tribler_core.modules.popularity.popularity_community import PopularityCommunity
from tribler_core.notifier import Notifier
from tribler_core.tests.tools.base_test import MockObject
from tribler_core.utilities.path_util import Path

Expand All @@ -28,7 +31,8 @@ def create_node(self, *args, **kwargs):
torrent_checker = MockObject()
torrent_checker.torrents_checked = set()

return MockIPv8(u"curve25519", PopularityCommunity, metadata_store=mds, torrent_checker=torrent_checker)
return MockIPv8(u"curve25519", PopularityCommunity, metadata_store=mds,
torrent_checker=torrent_checker, notifier=Notifier())

@db_session
def fill_database(self, metadata_store, last_check_now=False):
Expand All @@ -37,17 +41,20 @@ def fill_database(self, metadata_store, last_check_now=False):
metadata_store.TorrentState(
infohash=str(torrent_ind).encode() * 20, seeders=torrent_ind + 1, last_check=last_check)

async def test_torrents_health_gossip(self):
"""
Test whether torrent health information is correctly gossiped around
"""
checked_torrent_info = (b'a' * 20, 200, 0, int(time.time()))
async def init_first_node_and_gossip(self, checked_torrent_info, deliver_timeout=.1):
self.nodes[0].overlay.torrent_checker.torrents_checked.add(checked_torrent_info)
await self.introduce_nodes()

self.nodes[0].overlay.gossip_torrents_health()

await self.deliver_messages()
await self.deliver_messages(timeout=deliver_timeout)

async def test_torrents_health_gossip(self):
"""
Test whether torrent health information is correctly gossiped around
"""
checked_torrent_info = (b'a' * 20, 200, 0, int(time.time()))
await self.init_first_node_and_gossip(checked_torrent_info)

# Check whether node 1 has new torrent health information
with db_session:
Expand All @@ -60,14 +67,34 @@ async def test_torrents_health_override(self):
self.fill_database(self.nodes[1].overlay.metadata_store)

checked_torrent_info = (b'0' * 20, 200, 0, int(time.time()))
self.nodes[0].overlay.torrent_checker.torrents_checked.add(checked_torrent_info)
await self.introduce_nodes()

self.nodes[0].overlay.gossip_torrents_health()

await self.deliver_messages(timeout=0.5)
await self.init_first_node_and_gossip(checked_torrent_info, deliver_timeout=0.5)

# Check whether node 1 has new torrent health information
with db_session:
state = self.nodes[1].overlay.metadata_store.TorrentState.get(infohash=b'0' * 20)
self.assertIsNot(state.last_check, 0)

async def test_unknown_torrent_notification(self):
"""Test Popularity Community publish event about receiving an unknown torrent
"""
notifier = self.nodes[1].overlay.notifier

class MockRemoteQueryCommunity:
added_peer = None
torrent_hash = None

def on_torrent_state_added(self, peer, torrent_hash):
self.added_peer = peer
self.torrent_hash = torrent_hash

remote_query_community = MockRemoteQueryCommunity()
notifier.add_observer(NTFY.POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT,
remote_query_community.on_torrent_state_added)

assert not remote_query_community.added_peer
assert not remote_query_community.torrent_hash

await self.init_first_node_and_gossip((b'1' * 20, 200, 0, int(time.time())))

assert remote_query_community.added_peer
assert remote_query_community.torrent_hash

0 comments on commit 3e69384

Please sign in to comment.