Skip to content

Commit

Permalink
Made TorrentDef.load async and threaded (#7666)
Browse files Browse the repository at this point in the history
  • Loading branch information
qstokkink committed Nov 14, 2023
1 parent d018d66 commit f610c82
Show file tree
Hide file tree
Showing 21 changed files with 245 additions and 309 deletions.
6 changes: 3 additions & 3 deletions src/tribler/core/components/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ def mock_dlmgr(state_dir):


@pytest.fixture
def video_tdef():
return TorrentDef.load(TESTS_DATA_DIR / 'video.avi.torrent')
async def video_tdef():
return await TorrentDef.load(TESTS_DATA_DIR / 'video.avi.torrent')


@pytest.fixture
Expand All @@ -90,7 +90,7 @@ async def video_seeder(tmp_path_factory, video_tdef):
dlmgr.initialize()
dscfg_seed = DownloadConfig()
dscfg_seed.set_dest_dir(TESTS_DATA_DIR)
upload = dlmgr.start_download(tdef=video_tdef, config=dscfg_seed)
upload = await dlmgr.start_download(tdef=video_tdef, config=dscfg_seed)
await upload.wait_for_status(DownloadStatus.SEEDING)
yield dlmgr
await dlmgr.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ async def download_channel(self, channel):
dcfg.set_channel_download(True)
tdef = TorrentDef(metainfo=metainfo)

download = self.download_manager.start_download(tdef=tdef, config=dcfg, hidden=True)
download = await self.download_manager.start_download(tdef=tdef, config=dcfg, hidden=True)
try:
await download.future_finished
except CancelledError:
Expand All @@ -279,7 +279,8 @@ def _process_download():
if updated_channel:
self.notifier[notifications.channel_entity_updated](channel_dict)

def updated_my_channel(self, tdef):
@task
async def updated_my_channel(self, tdef):
"""
Notify the core that we updated our channel.
"""
Expand All @@ -293,7 +294,7 @@ def updated_my_channel(self, tdef):
dcfg = DownloadConfig(state_dir=self.state_dir)
dcfg.set_dest_dir(self.mds.channels_dir)
dcfg.set_channel_download(True)
return self.download_manager.start_download(tdef=tdef, config=dcfg)
return await self.download_manager.start_download(tdef=tdef, config=dcfg)

@db_session
def clean_unsubscribed_channels(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ def torrent_template():


@pytest.fixture
def personal_channel(metadata_store):
async def personal_channel(metadata_store):
global update_metainfo
tdef = await TorrentDef.load(TORRENT_UBUNTU_FILE)
with db_session:
chan = metadata_store.ChannelMetadata.create_channel(title="my test chan", description="test")
tdef = TorrentDef.load(TORRENT_UBUNTU_FILE)
chan.add_torrent_to_channel(tdef, None)
update_metainfo = chan.commit_channel_torrent()
return chan
Expand Down Expand Up @@ -108,11 +108,11 @@ async def mock_remove_download(download_obj, **_):
gigachannel_manager.updated_my_channel.assert_called_once()


def test_updated_my_channel(personal_channel, gigachannel_manager, tmpdir):
async def test_updated_my_channel(personal_channel, gigachannel_manager, tmpdir):
tdef = TorrentDef.load_from_dict(update_metainfo)
gigachannel_manager.download_manager.start_download = MagicMock()
gigachannel_manager.download_manager.start_download = AsyncMock()
gigachannel_manager.download_manager.download_exists = lambda *_: False
gigachannel_manager.updated_my_channel(tdef)
await gigachannel_manager.updated_my_channel(tdef)
gigachannel_manager.download_manager.start_download.assert_called_once()


Expand Down Expand Up @@ -388,7 +388,7 @@ def mock_get_metainfo_good(*args, **kwargs):

initiated_download = False

def mock_download_from_tdef(*_, **__):
async def mock_download_from_tdef(*_, **__):
global initiated_download
initiated_download = True
mock_dl = MockObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from shutil import rmtree
from typing import Callable, Dict, List, Optional

from ipv8.taskmanager import TaskManager, task
from ipv8.taskmanager import TaskManager

from tribler.core import notifications
from tribler.core.components.libtorrent.download_manager.dht_health_manager import DHTHealthManager
Expand Down Expand Up @@ -491,7 +491,7 @@ async def get_metainfo(self, infohash: bytes, timeout: float = 30, hops: Optiona
dcfg.set_upload_mode(True) # Upload mode should prevent libtorrent from creating files
dcfg.set_dest_dir(self.metadata_tmpdir)
try:
download = self.start_download(tdef=tdef, config=dcfg, hidden=True, checkpoint_disabled=True)
download = await self.start_download(tdef=tdef, config=dcfg, hidden=True, checkpoint_disabled=True)
except TypeError as e:
self._logger.warning(e)
if raise_errors:
Expand Down Expand Up @@ -550,7 +550,7 @@ async def start_download_from_uri(self, uri, config=None):

if scheme in (HTTP_SCHEME, HTTPS_SCHEME):
tdef = await TorrentDef.load_from_url(uri)
return self.start_download(tdef=tdef, config=config)
return await self.start_download(tdef=tdef, config=config)
if scheme == MAGNET_SCHEME:
name, infohash, _ = parse_magnetlink(uri)
if infohash is None:
Expand All @@ -559,14 +559,14 @@ async def start_download_from_uri(self, uri, config=None):
tdef = TorrentDef.load_from_dict(self.metainfo_cache[infohash]['meta_info'])
else:
tdef = TorrentDefNoMetainfo(infohash, "Unknown name" if name is None else name, url=uri)
return self.start_download(tdef=tdef, config=config)
return await self.start_download(tdef=tdef, config=config)
if scheme == FILE_SCHEME:
file = url_to_path(uri)
return self.start_download(torrent_file=file, config=config)
return await self.start_download(torrent_file=file, config=config)
raise Exception("invalid uri")

def start_download(self, torrent_file=None, tdef=None, config: DownloadConfig = None, checkpoint_disabled=False,
hidden=False) -> Download:
async def start_download(self, torrent_file=None, tdef=None, config: DownloadConfig = None,
checkpoint_disabled=False, hidden=False) -> Download:
self._logger.debug(f'Starting download: filename: {torrent_file}, torrent def: {tdef}')
if config is None:
config = DownloadConfig.from_defaults(self.download_defaults)
Expand All @@ -578,7 +578,7 @@ def start_download(self, torrent_file=None, tdef=None, config: DownloadConfig =
if torrent_file is None:
raise ValueError("Torrent file must be provided if tdef is not given")
# try to get the torrent from the given torrent file
tdef = TorrentDef.load(torrent_file)
tdef = await TorrentDef.load(torrent_file)

assert tdef is not None, "tdef MUST not be None after loading torrent"

Expand Down Expand Up @@ -617,10 +617,9 @@ def start_download(self, torrent_file=None, tdef=None, config: DownloadConfig =
if infohash not in self.metainfo_requests or self.metainfo_requests[infohash][0] == download:
self.downloads[infohash] = download
if not self.dummy_mode:
self.start_handle(download, atp)
await self.start_handle(download, atp)
return download

@task
async def start_handle(self, download, atp):
atp_resume_data_skipped = atp.copy()
resume_data = atp.get('resume_data')
Expand Down Expand Up @@ -662,7 +661,6 @@ async def start_handle(self, download, atp):
except asyncio.TimeoutError:
self._logger.warning("Timeout waiting for libtorrent DHT getting enough peers")
ltsession.async_add_torrent(encode_atp(atp))
return await download.future_added

def get_libtorrent_version(self):
try:
Expand Down Expand Up @@ -768,7 +766,7 @@ async def update_hops(self, download, new_hops):
# If the user wants to change the hop count to 0, don't automatically bump this up to 1 anymore
config.set_safe_seeding(False)

self.start_download(tdef=download.tdef, config=config)
await self.start_download(tdef=download.tdef, config=config)

def update_trackers(self, infohash, trackers):
""" Update the trackers for a download.
Expand Down Expand Up @@ -862,13 +860,13 @@ async def load_checkpoints(self):
checkpoint_filenames = list(self.get_checkpoint_dir().glob('*.conf'))
self.checkpoints_count = len(checkpoint_filenames)
for i, filename in enumerate(checkpoint_filenames, start=1):
self.load_checkpoint(filename)
await self.load_checkpoint(filename)
self.checkpoints_loaded = i
await sleep(.01)
self.all_checkpoints_are_loaded = True
self._logger.info("Checkpoints are loaded")

def load_checkpoint(self, filename):
async def load_checkpoint(self, filename):
try:
config = DownloadConfig.load(filename)
except Exception:
Expand Down Expand Up @@ -910,7 +908,7 @@ def load_checkpoint(self, filename):
if self.download_exists(tdef.get_infohash()):
self._logger.info("Not resuming checkpoint because download has already been added")
else:
self.start_download(tdef=tdef, config=config)
await self.start_download(tdef=tdef, config=config)
except Exception:
self._logger.exception("Not resume checkpoint due to exception while adding download")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ async def create_torrent(self, request):
download_config = DownloadConfig()
download_config.set_dest_dir(result['base_dir'])
download_config.set_hops(self.download_manager.download_defaults.number_hops)
self.download_manager.start_download(tdef=TorrentDef(metainfo_dict), config=download_config)
await self.download_manager.start_download(tdef=TorrentDef(metainfo_dict), config=download_config)

return RESTResponse(json.dumps({"torrent": base64.b64encode(result['metainfo']).decode('utf-8')}))
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async def get_torrent_info(self, request):
if scheme == FILE_SCHEME:
file = url_to_path(uri)
try:
tdef = TorrentDef.load(file)
tdef = await TorrentDef.load(file)
metainfo = tdef.metainfo
except (FileNotFoundError, TypeError, ValueError, RuntimeError):
return RESTResponse({"error": f"error while decoding torrent file: {file}"},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from asyncio import Future, gather, get_event_loop, sleep
from unittest.mock import MagicMock
from unittest.mock import MagicMock, AsyncMock

import pytest
from ipv8.util import succeed
Expand Down Expand Up @@ -65,7 +65,7 @@ async def test_get_metainfo_valid_metadata(fake_dlmgr):
download_impl.future_metainfo = succeed(metainfo)

fake_dlmgr.initialize()
fake_dlmgr.start_download = MagicMock(return_value=download_impl)
fake_dlmgr.start_download = AsyncMock(return_value=download_impl)
fake_dlmgr.download_defaults.number_hops = 1
fake_dlmgr.remove_download = MagicMock(return_value=succeed(None))

Expand All @@ -86,7 +86,7 @@ async def test_get_metainfo_add_fail(fake_dlmgr):
download_impl.tdef.get_metainfo = lambda: None

fake_dlmgr.initialize()
fake_dlmgr.start_download = MagicMock()
fake_dlmgr.start_download = AsyncMock()
fake_dlmgr.start_download.side_effect = TypeError
fake_dlmgr.download_defaults.number_hops = 1
fake_dlmgr.remove = MagicMock(return_value=succeed(None))
Expand All @@ -109,7 +109,7 @@ async def test_get_metainfo_duplicate_request(fake_dlmgr):
get_event_loop().call_later(0.1, download_impl.future_metainfo.set_result, metainfo)

fake_dlmgr.initialize()
fake_dlmgr.start_download = MagicMock(return_value=download_impl)
fake_dlmgr.start_download = AsyncMock(return_value=download_impl)
fake_dlmgr.download_defaults.number_hops = 1
fake_dlmgr.remove_download = MagicMock(return_value=succeed(None))

Expand All @@ -134,7 +134,7 @@ async def test_get_metainfo_with_already_added_torrent(fake_dlmgr):
Testing metainfo fetching for a torrent which is already in session.
"""
sample_torrent = TESTS_DATA_DIR / "bak_single.torrent"
torrent_def = TorrentDef.load(sample_torrent)
torrent_def = await TorrentDef.load(sample_torrent)

download_impl = MagicMock()
download_impl.future_metainfo = succeed(bencode(torrent_def.get_metainfo()))
Expand Down Expand Up @@ -164,10 +164,10 @@ async def test_start_download_while_getting_metainfo(fake_dlmgr):
fake_dlmgr.get_session = lambda *_: metainfo_session
fake_dlmgr.downloads[infohash] = metainfo_dl
fake_dlmgr.metainfo_requests[infohash] = [metainfo_dl, 1]
fake_dlmgr.remove_download = MagicMock(return_value=succeed(None))
fake_dlmgr.remove_download = AsyncMock(return_value=succeed(None))

tdef = TorrentDefNoMetainfo(infohash, 'name', f'magnet:?xt=urn:btih:{hexlify(infohash)}&')
download = fake_dlmgr.start_download(tdef=tdef, checkpoint_disabled=True)
download = await fake_dlmgr.start_download(tdef=tdef, checkpoint_disabled=True)
assert metainfo_dl != download
await sleep(.1)
assert fake_dlmgr.downloads[infohash] == download
Expand Down Expand Up @@ -199,7 +199,7 @@ async def test_start_download(fake_dlmgr):

fake_dlmgr.get_session = lambda *_: mock_ltsession

download = fake_dlmgr.start_download(tdef=TorrentDefNoMetainfo(infohash, ''), checkpoint_disabled=True)
download = await fake_dlmgr.start_download(tdef=TorrentDefNoMetainfo(infohash, ''), checkpoint_disabled=True)
handle = await download.get_handle()
assert handle == mock_handle
fake_dlmgr.downloads.clear()
Expand Down Expand Up @@ -249,14 +249,14 @@ async def test_start_download_existing_handle(fake_dlmgr):

fake_dlmgr.get_session = lambda *_: mock_ltsession

download = fake_dlmgr.start_download(tdef=TorrentDefNoMetainfo(infohash, 'name'), checkpoint_disabled=True)
download = await fake_dlmgr.start_download(tdef=TorrentDefNoMetainfo(infohash, 'name'), checkpoint_disabled=True)
handle = await download.get_handle()
assert handle == mock_handle
fake_dlmgr.downloads.clear()
await download.shutdown()


def test_start_download_existing_download(fake_dlmgr):
async def test_start_download_existing_download(fake_dlmgr):
"""
Testing the addition of a torrent to the libtorrent manager, if there is a pre-existing download.
"""
Expand All @@ -270,18 +270,18 @@ def test_start_download_existing_download(fake_dlmgr):
fake_dlmgr.downloads[infohash] = mock_download
fake_dlmgr.get_session = lambda *_: mock_ltsession

download = fake_dlmgr.start_download(tdef=TorrentDefNoMetainfo(infohash, 'name'), checkpoint_disabled=True)
download = await fake_dlmgr.start_download(tdef=TorrentDefNoMetainfo(infohash, 'name'), checkpoint_disabled=True)
assert download == mock_download
fake_dlmgr.downloads.clear()


def test_start_download_no_ti_url(fake_dlmgr):
async def test_start_download_no_ti_url(fake_dlmgr):
"""
Test whether a ValueError is raised if we try to add a torrent without infohash or url
"""
fake_dlmgr.initialize()
with pytest.raises(ValueError):
fake_dlmgr.start_download()
await fake_dlmgr.start_download()


def test_remove_unregistered_torrent(fake_dlmgr):
Expand Down Expand Up @@ -349,27 +349,27 @@ def test_post_session_stats(fake_dlmgr):
mock_lt_session.post_session_stats.assert_called_once()


def test_load_checkpoint(fake_dlmgr):
async def test_load_checkpoint(fake_dlmgr):
good = []

def mock_start_download(*_, **__):
async def mock_start_download(*_, **__):
good.append(1)

fake_dlmgr.start_download = mock_start_download

# Try opening real state file
state = TESTS_DATA_DIR / "config_files/13a25451c761b1482d3e85432f07c4be05ca8a56.conf"
fake_dlmgr.load_checkpoint(state)
await fake_dlmgr.load_checkpoint(state)
assert good

# Try opening nonexistent file
good = []
fake_dlmgr.load_checkpoint("nonexistent_file")
await fake_dlmgr.load_checkpoint("nonexistent_file")
assert not good

# Try opening corrupt file
config_file_path = TESTS_DATA_DIR / "config_files/corrupt_session_config.conf"
fake_dlmgr.load_checkpoint(config_file_path)
await fake_dlmgr.load_checkpoint(config_file_path)
assert not good


Expand All @@ -380,19 +380,19 @@ async def test_download_manager_start(fake_dlmgr):
assert fake_dlmgr.all_checkpoints_are_loaded


def test_load_empty_checkpoint(fake_dlmgr, tmpdir):
async def test_load_empty_checkpoint(fake_dlmgr, tmpdir):
"""
Test whether download resumes with faulty pstate file.
"""
fake_dlmgr.get_downloads_pstate_dir = lambda: tmpdir
fake_dlmgr.start_download = MagicMock()
fake_dlmgr.start_download = AsyncMock()

# Empty pstate file
pstate_filename = fake_dlmgr.get_downloads_pstate_dir() / 'abcd.state'
with open(pstate_filename, 'wb') as state_file:
state_file.write(b"")

fake_dlmgr.load_checkpoint(pstate_filename)
await fake_dlmgr.load_checkpoint(pstate_filename)
fake_dlmgr.start_download.assert_not_called()


Expand All @@ -401,7 +401,7 @@ async def test_load_checkpoints(fake_dlmgr, tmpdir):
Test whether we are resuming downloads after loading checkpoints
"""

def mocked_load_checkpoint(filename):
async def mocked_load_checkpoint(filename):
assert str(filename).endswith('abcd.conf')
mocked_load_checkpoint.called = True

Expand Down Expand Up @@ -442,8 +442,8 @@ async def mocked_update_hops(*_):
await readd_future


def test_get_downloads_by_name(fake_dlmgr):
dl = fake_dlmgr.start_download(torrent_file=TORRENT_UBUNTU_FILE, checkpoint_disabled=True)
async def test_get_downloads_by_name(fake_dlmgr):
dl = await fake_dlmgr.start_download(torrent_file=TORRENT_UBUNTU_FILE, checkpoint_disabled=True)
assert fake_dlmgr.get_downloads_by_name("ubuntu-15.04-desktop-amd64.iso")
assert not fake_dlmgr.get_downloads_by_name("ubuntu-15.04-desktop-amd64.iso", channels_only=True)
assert not fake_dlmgr.get_downloads_by_name("bla")
Expand Down

0 comments on commit f610c82

Please sign in to comment.