Skip to content

Commit

Permalink
Merge pull request #5860 from drew2a/feature/5828
Browse files Browse the repository at this point in the history
Fast channel discovery
  • Loading branch information
drew2a committed Dec 21, 2020
2 parents 73a8f73 + 6613efc commit e53c872
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import logging

from ipv8.peerdiscovery.discovery import EdgeWalk


class DiscoveryBooster:
"""This class is designed for increasing the speed of peers' discovery during a limited time.
It can be applied to any community.
"""
def __init__(self, timeout_in_sec=10.0, max_peers=200, take_step_interval_in_sec=0.05, walker=None):
"""
Args:
timeout_in_sec: DiscoveryBooster work timeout. When this timeout will be reached,
`finish` function will be called.
max_peers: Temporary value of max peers that will be set to the community during DiscoveryBooster work.
take_step_interval_in_sec: Сall frequency of walker's `take_step` function.
walker: walker that will be used during boost period.
"""
self.logger = logging.getLogger(self.__class__.__name__)

self.timeout_in_sec = timeout_in_sec
self.max_peers = max_peers
self.take_step_interval_in_sec = take_step_interval_in_sec
self.walker = walker

self.community = None
self.saved_max_peers = None

self._take_step_task_name = 'take step'

def apply(self, community):
"""Apply DiscoveryBooster to the community
Args:
community: community to implement DiscoveryBooster
Returns: None
"""
if not community:
return

self.logger.info(
f'Apply. Timeout: {self.timeout_in_sec}s, '
f'Max peers: {self.max_peers}, '
f'Take step interval: {self.take_step_interval_in_sec}s'
)

self.community = community
self.saved_max_peers = community.max_peers

community.max_peers = self.max_peers

if not self.walker:
# values for neighborhood_size and edge_length were found empirically to
# maximize peer count at the end of a 30 seconds period
self.walker = EdgeWalk(community, neighborhood_size=25, edge_length=25)

community.register_task(self._take_step_task_name, self.take_step, interval=self.take_step_interval_in_sec)
community.register_task('finish', self.finish, delay=self.timeout_in_sec)

def finish(self):
"""Finish DiscoveryBooster work.
This function returns defaults max_peers to the community.
Will be called automatically from community's task manager.
Returns: None
"""
self.logger.info(
f'Finish. Set self.max_peers={self.saved_max_peers}. Cancel pending task: {self._take_step_task_name}'
)
self.community.max_peers = self.saved_max_peers
self.community.cancel_pending_task(self._take_step_task_name)

def take_step(self):
"""Take a step by invoke `walker.take_step()`
Will be called automatically from community's task manager.
Returns: None
"""
self.logger.debug('Take a step')
self.walker.take_step()
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from tribler_common.simpledefs import CHANNELS_VIEW_UUID, NTFY

from tribler_core.modules.metadata_store.community.discovery_booster import DiscoveryBooster
from tribler_core.modules.metadata_store.community.remote_query_community import (
RemoteQueryCommunity,
RemoteQueryCommunitySettings,
Expand Down Expand Up @@ -73,6 +74,10 @@ def __init__(self, my_peer, endpoint, network, metadata_store, **kwargs):
# TODO: use Bloom filter here instead. We actually *want* it to be all-false-positives eventually.
self.queried_peers = set()

self.discovery_booster = DiscoveryBooster()
self.discovery_booster.apply(self)


def get_random_peers(self, sample_size=None):
# Randomly sample sample_size peers from the complete list of our peers
all_peers = self.get_peers()
Expand Down Expand Up @@ -135,7 +140,6 @@ def __init__(self, *args, **kwargs):

# Register legacy payload
self.add_message_handler(LegacySelectResponsePayload, self.legacy_on_remote_select_response)

self.new_style_peers = set()

def legacy_send_remote_select_subscribed_channels(self, peer):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import pytest

from tribler_core.modules.metadata_store.community.discovery_booster import DiscoveryBooster

TEST_BOOSTER_MAX_PEERS = 100
TEST_BOOSTER_TIMEOUT_IN_SEC = 10
TEST_BOOSTER_TAKE_STEP_INTERVAL_IN_SEC = 1

TEST_COMMUNITY_MAX_PEERS = 30


@pytest.fixture(name="booster") # this workaround implemented only for pylint
def fixture_booster():
class MockWalker:
def __init__(self):
self.take_step_called = False

def take_step(self):
self.take_step_called = True

return DiscoveryBooster(
timeout_in_sec=TEST_BOOSTER_TIMEOUT_IN_SEC,
max_peers=TEST_BOOSTER_MAX_PEERS,
take_step_interval_in_sec=TEST_BOOSTER_TAKE_STEP_INTERVAL_IN_SEC,
walker=MockWalker(),
)


@pytest.fixture(name="community") # this workaround implemented only for pylint
def fixture_community():
class MockCommunity:
def __init__(self):
self.max_peers = TEST_COMMUNITY_MAX_PEERS
self.tasks = []

def register_task(
self, name, task, *args, delay=None, interval=None, ignore=()
): # pylint: disable=unused-argument
self.tasks.append(name)

def cancel_pending_task(self, name):
self.tasks.remove(name)

return MockCommunity()


def test_init(booster):
assert booster.max_peers == TEST_BOOSTER_MAX_PEERS
assert booster.timeout_in_sec == TEST_BOOSTER_TIMEOUT_IN_SEC
assert booster.take_step_interval_in_sec == TEST_BOOSTER_TAKE_STEP_INTERVAL_IN_SEC

assert booster.community is None
assert booster.saved_max_peers is None
assert booster.walker is not None


def test_apply(booster, community):
booster.apply(None)
assert booster.community is None

booster.apply(community)
assert booster.community == community
assert booster.saved_max_peers == TEST_COMMUNITY_MAX_PEERS
assert booster.walker is not None

assert community.max_peers == booster.max_peers
assert len(community.tasks) == 2


def test_finish(booster, community):
booster.apply(community)
booster.finish()
assert community.max_peers == TEST_COMMUNITY_MAX_PEERS
assert len(community.tasks) == 1


def test_take_step(booster):
booster.take_step()
assert booster.walker.take_step_called
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from unittest import mock
from unittest.mock import Mock

from ipv8.database import database_blob
Expand Down Expand Up @@ -35,7 +36,8 @@ def create_node(self, *args, **kwargs):
disable_sync=True,
)
kwargs['metadata_store'] = metadata_store
node = super().create_node(*args, **kwargs)
with mock.patch('tribler_core.modules.metadata_store.community.gigachannel_community.DiscoveryBooster'):
node = super().create_node(*args, **kwargs)
self.count += 1
return node

Expand Down

0 comments on commit e53c872

Please sign in to comment.