Skip to content

Commit

Permalink
Merge branch 'release-7.5' into devel
Browse files Browse the repository at this point in the history
  • Loading branch information
ichorid committed Oct 29, 2020
2 parents 9a00834 + 1fce01f commit 5c04aa4
Show file tree
Hide file tree
Showing 16 changed files with 265 additions and 52 deletions.
8 changes: 8 additions & 0 deletions src/tribler-common/tribler_common/simpledefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,11 @@ class CHANNEL_STATE(Enum):
DOWNLOADING = "Downloading"
PREVIEW = "Preview"
METAINFO_LOOKUP = "Searching for metainfo"


# Max download or upload rate limit for libtorrent.
# On Win64, the compiled version of libtorrent only supported 2^31 - 1
# as rate limit values instead of sys.maxsize or 2^63 -1. Since 2^31
# is a sufficiently large value for download/upload rate limit,
# here we set the max values for these parameters.
MAX_LIBTORRENT_RATE_LIMIT = 2 ** 31 - 1 # bytes per second
15 changes: 15 additions & 0 deletions src/tribler-core/tribler_core/config/test_tribler_config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from pathlib import Path

from tribler_common.simpledefs import MAX_LIBTORRENT_RATE_LIMIT

from tribler_core.config.tribler_config import CONFIG_FILENAME, TriblerConfig
from tribler_core.utilities.osutils import get_home_dir

Expand Down Expand Up @@ -191,6 +193,19 @@ def test_get_set_methods_libtorrent(tribler_config):
tribler_config.set_libtorrent_dht_enabled(False)
assert not tribler_config.get_libtorrent_dht_enabled()

# Add tests for setting libtorrent rate limits
rate_limit = MAX_LIBTORRENT_RATE_LIMIT - 1024 # lower than the max value set
tribler_config.set_libtorrent_max_upload_rate(rate_limit)
assert tribler_config.get_libtorrent_max_upload_rate() == rate_limit
tribler_config.set_libtorrent_max_download_rate(rate_limit)
assert tribler_config.get_libtorrent_max_download_rate() == rate_limit

rate_limit = MAX_LIBTORRENT_RATE_LIMIT + 1024 # higher than the max value set
tribler_config.set_libtorrent_max_upload_rate(rate_limit)
assert tribler_config.get_libtorrent_max_upload_rate() == MAX_LIBTORRENT_RATE_LIMIT
tribler_config.set_libtorrent_max_download_rate(rate_limit)
assert tribler_config.get_libtorrent_max_download_rate() == MAX_LIBTORRENT_RATE_LIMIT


def test_get_set_methods_tunnel_community(tribler_config):
"""
Expand Down
6 changes: 4 additions & 2 deletions src/tribler-core/tribler_core/config/tribler_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from validate import Validator

from tribler_common.simpledefs import MAX_LIBTORRENT_RATE_LIMIT

from tribler_core.exceptions import InvalidConfigException
from tribler_core.modules.libtorrent.download_config import get_default_dest_dir
from tribler_core.utilities import path_util
Expand Down Expand Up @@ -442,7 +444,7 @@ def get_libtorrent_max_upload_rate(self):
:return: the maximum upload rate in kB / s
"""
return self.config['libtorrent'].as_int('max_upload_rate')
return min(self.config['libtorrent'].as_int('max_upload_rate'), MAX_LIBTORRENT_RATE_LIMIT)

def set_libtorrent_max_download_rate(self, value):
"""
Expand All @@ -459,7 +461,7 @@ def get_libtorrent_max_download_rate(self):
:return: the maximum download rate in kB / s
"""
return self.config['libtorrent'].as_int('max_download_rate')
return min(self.config['libtorrent'].as_int('max_download_rate'), MAX_LIBTORRENT_RATE_LIMIT)

def set_libtorrent_dht_enabled(self, value):
self.config['libtorrent']['dht'] = value
Expand Down
3 changes: 2 additions & 1 deletion src/tribler-core/tribler_core/modules/ipv8_module_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ class GigaChannelTestnetCommunityLauncher(TestnetMixIn, GigaChannelCommunityLaun
@set_in_session('remote_query_community')
@overlay('tribler_core.modules.metadata_store.community.remote_query_community', 'RemoteQueryCommunity')
@kwargs(metadata_store='session.mds', notifier='session.notifier')
@walk_strategy('ipv8.peerdiscovery.discovery', 'RandomWalk', target_peers=50)
@walk_strategy('ipv8.peerdiscovery.discovery', 'RandomWalk', target_peers=30)
@walk_strategy('tribler_core.modules.metadata_store.community.sync_strategy', 'RemovePeers', target_peers=-1)
class RemoteQueryCommunityLauncher(IPv8CommunityLauncher):
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
from ipv8.community import Community
from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.lazy_payload import VariablePayload, vp_compile
from ipv8.peerdiscovery.network import Network
from ipv8.requestcache import RandomNumberCache, RequestCache

from pony.orm.dbapiprovider import OperationalError

from tribler_common.simpledefs import CHANNELS_VIEW_UUID, NTFY

from tribler_core.modules.metadata_store.orm_bindings.channel_metadata import entries_to_chunk
Expand Down Expand Up @@ -73,10 +76,9 @@ class RemoteQueryCommunity(Community):
community_id = unhexlify('dc43e3465cbd83948f30d3d3e8336d71cce33aa7')

def __init__(self, my_peer, endpoint, network, metadata_store, settings=None, notifier=None):
super(RemoteQueryCommunity, self).__init__(my_peer, endpoint, network)
super().__init__(my_peer, endpoint, Network())

self.notifier = notifier
self.max_peers = 60

self.settings = settings or RemoteQueryCommunitySettings()

Expand All @@ -90,8 +92,7 @@ def __init__(self, my_peer, endpoint, network, metadata_store, settings=None, no
self.queried_peers_limit = 1000

if self.notifier:
self.notifier.add_observer(NTFY.POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT,
self.on_pc_add_unknown_torrent)
self.notifier.add_observer(NTFY.POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT, self.on_pc_add_unknown_torrent)

# this flag enable or disable https://github.com/Tribler/tribler/pull/5657
# it can be changed in runtime
Expand Down Expand Up @@ -133,17 +134,30 @@ def send_remote_select_subscribed_channels(self, peer):
}
self.send_remote_select(peer, **request_dict)

async def process_rpc_query(self, json_bytes: bytes):
"""
Retrieve the result of a database query from a third party, encoded as raw JSON bytes (through `dumps`).
:raises TypeError: if the JSON contains invalid keys.
:raises ValueError: if no JSON could be decoded.
:raises pony.orm.dbapiprovider.OperationalError: if an illegal query was performed.
"""
request_sanitized = sanitize_query(json.loads(json_bytes), self.settings.max_response_size)
return await self.mds.MetadataNode.get_entries_threaded(**request_sanitized)

@lazy_wrapper(RemoteSelectPayload)
async def on_remote_select(self, peer, request):
request_sanitized = sanitize_query(json.loads(request.json), self.settings.max_response_size)
db_results = await self.mds.MetadataNode.get_entries_threaded(**request_sanitized)
if not db_results:
return

index = 0
while index < len(db_results):
data, index = entries_to_chunk(db_results, self.settings.maximum_payload_size, start_index=index)
self.ez_send(peer, SelectResponsePayload(request.id, data))
try:
db_results = await self.process_rpc_query(request.json)
if not db_results:
return

index = 0
while index < len(db_results):
data, index = entries_to_chunk(db_results, self.settings.maximum_payload_size, start_index=index)
self.ez_send(peer, SelectResponsePayload(request.id, data))
except (OperationalError, TypeError, ValueError) as error:
self.logger.error(f"Remote select. The error occurred: {error}")

@lazy_wrapper(SelectResponsePayload)
async def on_remote_select_response(self, peer, response):
Expand Down Expand Up @@ -190,8 +204,7 @@ async def unload(self):
await super(RemoteQueryCommunity, self).unload()

if self.notifier:
self.notifier.remove_observer(NTFY.POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT,
self.on_pc_add_unknown_torrent)
self.notifier.remove_observer(NTFY.POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT, self.on_pc_add_unknown_torrent)


class RemoteQueryTestnetCommunity(RemoteQueryCommunity):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,17 @@ def take_step(self):
if peers:
peer = choice(peers)
self.overlay.send_random_to(peer)


class RemovePeers(DiscoveryStrategy):
"""
Synchronization strategy for remote query community.
Remove a random peer, if we have enough peers to walk to.
"""

def take_step(self):
with self.walk_lock:
peers = self.overlay.get_peers()
if peers and len(peers) > 20:
self.overlay.network.remove_peer(choice(peers))
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from datetime import datetime
from json import dumps

from ipv8.keyvault.crypto import default_eccrypto
from ipv8.peer import Peer
from ipv8.test.base import TestBase

from pony.orm import db_session
from pony.orm.dbapiprovider import OperationalError

from tribler_common.simpledefs import NTFY

Expand Down Expand Up @@ -33,6 +37,7 @@ def setUp(self):
super(TestRemoteQueryCommunity, self).setUp()
self.count = 0
self.initialize(RemoteQueryCommunity, 2)
self.torrent_template = {"title": "", "infohash": b"", "torrent_date": datetime(1970, 1, 1), "tags": "video"}

def create_node(self, *args, **kwargs):
metadata_store = MetadataStore(
Expand All @@ -47,6 +52,15 @@ def create_node(self, *args, **kwargs):
self.count += 1
return node

def channel_metadata(self, i):
return self.nodes[i].overlay.mds.ChannelMetadata

def torrent_metadata(self, i):
return self.nodes[i].overlay.mds.TorrentMetadata

def overlay(self, i):
return self.nodes[i].overlay

async def test_remote_select(self):
# Fill Node 0 DB with channels and torrents entries
with db_session:
Expand Down Expand Up @@ -235,6 +249,7 @@ def has_testing_infohash(t):
assert torrent_has_been_added_to_db1
assert torrent_not_presented_on_db2

await self.introduce_nodes()
remote_query = {"infohash": hexlify(torrent_infohash)}
self.nodes[1].overlay.send_remote_select_to_many(**remote_query)

Expand Down Expand Up @@ -273,9 +288,7 @@ def has_testing_infohash(t):
assert torrent_not_presented_on_db2

# notify second node that new torrent hash has been received from the first node
rqc2.notifier.notify(NTFY.POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT,
self.nodes[0].my_peer,
torrent_infohash)
rqc2.notifier.notify(NTFY.POPULARITY_COMMUNITY_ADD_UNKNOWN_TORRENT, self.nodes[0].my_peer, torrent_infohash)

await self.deliver_messages(timeout=0.5)
with db_session:
Expand All @@ -298,9 +311,81 @@ async def test_unknown_query_attribute(self):
await self.deliver_messages(timeout=0.1)

# mixed: the old and a new attribute
rqc_node2.send_remote_select_to_many(**{'infohash': hexlify(b'0' * 20),
'new_attribute': 'some_value'})
rqc_node2.send_remote_select_to_many(**{'infohash': hexlify(b'0' * 20), 'new_attribute': 'some_value'})
await self.deliver_messages(timeout=0.1)

# no exception have been raised
assert True

async def test_process_rpc_query_match_many(self):
"""
Check if a correct query with a match in our database returns a result.
"""
with db_session:
channel = self.channel_metadata(0).create_channel("a channel", "")
add_random_torrent(self.torrent_metadata(0), name="a torrent", channel=channel)

results = await self.overlay(0).process_rpc_query(dumps({}))
self.assertEqual(2, len(results))

channel_md, torrent_md = results if isinstance(results[0], self.channel_metadata(0)) else results[::-1]
self.assertEqual("a channel", channel_md.title)
self.assertEqual("a torrent", torrent_md.title)

async def test_process_rpc_query_match_one(self):
"""
Check if a correct query with one match in our database returns one result.
"""
with db_session:
self.channel_metadata(0).create_channel("a channel", "")

results = await self.overlay(0).process_rpc_query(dumps({}))
self.assertEqual(1, len(results))

channel_md, = results
self.assertEqual("a channel", channel_md.title)

async def test_process_rpc_query_match_none(self):
"""
Check if a correct query with no match in our database returns no result.
"""
results = await self.overlay(0).process_rpc_query(dumps({}))
self.assertEqual(0, len(results))

async def test_process_rpc_query_match_empty_json(self):
"""
Check if processing an empty request causes a ValueError (JSONDecodeError) to be raised.
"""
with self.assertRaises(ValueError):
await self.overlay(0).process_rpc_query(b'')

async def test_process_rpc_query_match_illegal_json(self):
"""
Check if processing a request with illegal JSON causes a UnicodeDecodeError to be raised.
"""
with self.assertRaises(UnicodeDecodeError):
await self.overlay(0).process_rpc_query(b'{"akey":\x80}')

async def test_process_rpc_query_match_invalid_json(self):
"""
Check if processing a request with invalid JSON causes a ValueError to be raised.
"""
with db_session:
self.channel_metadata(0).create_channel("a channel", "")
query = b'{"id_":' + b'\x31' * 200 + b'}'
with self.assertRaises(ValueError):
await self.overlay(0).process_rpc_query(query)

async def test_process_rpc_query_match_invalid_key(self):
"""
Check if processing a request with invalid flags causes a UnicodeDecodeError to be raised.
"""
with self.assertRaises(TypeError):
await self.overlay(0).process_rpc_query(b'{"bla":":("}')

async def test_process_rpc_query_no_column(self):
"""
Check if processing a request with no database columns causes an OperationalError.
"""
with self.assertRaises(OperationalError):
await self.overlay(0).process_rpc_query(b'{"txt_filter":{"key":"bla"}}')
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
from ipv8.keyvault.crypto import default_eccrypto
from ipv8.peer import Peer
from ipv8.peerdiscovery.network import Network
from ipv8.test.base import TestBase

import pytest

from tribler_core.modules.metadata_store.community.sync_strategy import SyncChannels
from tribler_core.modules.metadata_store.community.sync_strategy import RemovePeers, SyncChannels


class MockCommunity(object):
def __init__(self):
self.fetch_next_called = False
self.send_random_to_called = []
self.get_peers_return = []
self.network = Network()

def send_random_to(self, peer):
self.send_random_to_called.append(peer)
Expand Down Expand Up @@ -65,3 +68,43 @@ def test_strategy_multi_peer(mock_community, strategy):

assert len(mock_community.send_random_to_called) == 1
assert mock_community.send_random_to_called[0] in mock_community.get_peers_return


class TestRemovePeers(TestBase):
def setUp(self):
self.community = MockCommunity()
self.strategy = RemovePeers(self.community)
return super().setUp()

def test_strategy_no_peers(self):
"""
If we have no peers, nothing should happen.
"""
self.strategy.take_step()

self.assertSetEqual(set(), self.community.network.verified_peers)

def test_strategy_one_peer(self):
"""
If we have one peer, it should not be removed.
"""
test_peer = Peer(default_eccrypto.generate_key(u"very-low"))
self.community.network.add_verified_peer(test_peer)
self.community.get_peers_return.append(test_peer)

self.strategy.take_step()

self.assertSetEqual({test_peer}, self.community.network.verified_peers)

def test_strategy_multi_peer(self):
"""
If we have over 20 peers, one should be removed.
"""
for _ in range(21):
test_peer = Peer(default_eccrypto.generate_key(u"very-low"))
self.community.network.add_verified_peer(test_peer)
self.community.get_peers_return.append(test_peer)

self.strategy.take_step()

self.assertEqual(20, len(self.community.network.verified_peers))

0 comments on commit 5c04aa4

Please sign in to comment.