Skip to content

Commit

Permalink
Merge pull request #5628 from egbertbouman/anon_checking
Browse files Browse the repository at this point in the history
Added anonymization support to TorrentChecker
  • Loading branch information
egbertbouman committed Oct 17, 2020
2 parents 62108df + 44498c7 commit 8ec6975
Show file tree
Hide file tree
Showing 17 changed files with 768 additions and 540 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async def test_task_select_tracker(enable_chant, torrent_checker, session):
tracker = session.mds.TrackerState(url="http://localhost/tracker")
session.mds.TorrentState(infohash=b'a' * 20, seeders=5, leechers=10, trackers={tracker})

controlled_session = HttpTrackerSession("127.0.0.1", ("localhost", 8475), "/announce", 5)
controlled_session = HttpTrackerSession("127.0.0.1", ("localhost", 8475), "/announce", 5, None)
controlled_session.connect_to_tracker = lambda: succeed(None)

torrent_checker._create_session_for_request = lambda *args, **kwargs: controlled_session
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import socket
import struct
from asyncio import CancelledError, DatagramProtocol, Future, ensure_future, get_event_loop, start_server
from asyncio import CancelledError, DatagramProtocol, Future, ensure_future, get_event_loop, sleep, start_server
from unittest.mock import Mock

from aiohttp.web_exceptions import HTTPBadRequest
Expand Down Expand Up @@ -32,8 +32,8 @@ def send_request(self, *args):
return succeed(self.response)


@pytest.fixture
def fake_udp_socket_manager():
@pytest.fixture(name='fake_udp_socket_manager')
def fixture_fake_udp_socket_manager():
return FakeUdpSocketManager()


Expand All @@ -53,7 +53,7 @@ async def fake_dht_session(session):

@pytest.mark.asyncio
async def test_httpsession_scrape_no_body():
session = HttpTrackerSession("localhost", ("localhost", 8475), "/announce", 5)
session = HttpTrackerSession("localhost", ("localhost", 8475), "/announce", 5, None)
session._infohash_list = []
with pytest.raises(ValueError):
session._process_scrape_response(None)
Expand All @@ -63,7 +63,7 @@ async def test_httpsession_scrape_no_body():

@pytest.mark.asyncio
async def test_httpsession_bdecode_fails():
session = HttpTrackerSession("localhost", ("localhost", 8475), "/announce", 5)
session = HttpTrackerSession("localhost", ("localhost", 8475), "/announce", 5, None)
session._infohash_list = []
with pytest.raises(ValueError):
session._process_scrape_response(bencode({}))
Expand All @@ -73,7 +73,7 @@ async def test_httpsession_bdecode_fails():

@pytest.mark.asyncio
async def test_httpsession_code_not_200():
session = HttpTrackerSession("localhost", ("localhost", 8475), "/announce", 5)
session = HttpTrackerSession("localhost", ("localhost", 8475), "/announce", 5, None)

def fake_request(_):
raise HTTPBadRequest()
Expand All @@ -86,7 +86,7 @@ def fake_request(_):

@pytest.mark.asyncio
async def test_httpsession_failure_reason_in_dict():
session = HttpTrackerSession("localhost", ("localhost", 8475), "/announce", 5)
session = HttpTrackerSession("localhost", ("localhost", 8475), "/announce", 5, None)
session._infohash_list = []
with pytest.raises(ValueError):
session._process_scrape_response(bencode({'failure reason': 'test'}))
Expand All @@ -97,7 +97,7 @@ async def test_httpsession_failure_reason_in_dict():
@pytest.mark.asyncio
async def test_httpsession_unicode_err():
session = HttpTrackerSession("retracker.local", ("retracker.local", 80),
"/announce?comment=%26%23%3B%28%2C%29%5B%5D%E3%5B%D4%E8%EB%FC%EC%EE%E2", 5)
"/announce?comment=%26%23%3B%28%2C%29%5B%5D%E3%5B%D4%E8%EB%FC%EC%EE%E2", 5, None)

with pytest.raises(UnicodeEncodeError):
await session.connect_to_tracker()
Expand All @@ -114,7 +114,7 @@ async def _client_connected(_, writer):

server = await start_server(_client_connected, host='localhost', port=0, family=socket.AF_INET)
_, port = server.sockets[0].getsockname()
session = HttpTrackerSession("localhost", ("localhost", port), "/announce", 1)
session = HttpTrackerSession("localhost", ("localhost", port), "/announce", .1, None)
with pytest.raises(ValueError):
await session.connect_to_tracker()
sleep_task.set_result(None)
Expand All @@ -130,7 +130,7 @@ async def test_udpsession_timeout(fake_udp_socket_manager):
local_addr=('127.0.0.1', 0),
family=socket.AF_INET)
_, port = transport.get_extra_info('sockname')
session = UdpTrackerSession("localhost", ("127.0.0.1", port), "/announce", 1, fake_udp_socket_manager)
session = UdpTrackerSession("localhost", ("127.0.0.1", port), "/announce", .1, None, fake_udp_socket_manager)
with pytest.raises(ValueError):
await session.connect_to_tracker()
transport.close()
Expand All @@ -141,25 +141,44 @@ async def test_pop_finished_transaction():
"""
Test that receiving a datagram for an already finished tracker session does not result in InvalidStateError
"""

mgr = UdpSocketManager()
transaction_id = 123
mgr = UdpSocketManager()
mgr.connection_made(Mock())
mock_tracker_session = Mock()
mock_tracker_session.transaction_id = transaction_id
mgr.send_request(Mock(), mock_tracker_session)
task = ensure_future(mgr.send_request(Mock(), Mock(proxy=None, transaction_id=transaction_id)))
await sleep(0)
assert mgr.tracker_sessions

mgr.tracker_sessions[transaction_id].cancel()

data = struct.pack("!iiq", 124, transaction_id, 126)
mgr.datagram_received(data, None)
assert not mgr.tracker_sessions
await task
assert task.done()


@pytest.mark.asyncio
async def test_proxy_transport():
"""
Test that the UdpSocketManager uses a proxy if specified
"""
mgr = UdpSocketManager()
mgr.connection_made(Mock())
mgr.proxy_transports['proxy_url'] = Mock()
ensure_future(mgr.send_request(b'', Mock(proxy='proxy_url', transaction_id=123)))
await sleep(0)
mgr.proxy_transports['proxy_url'].sendto.assert_called_once()
mgr.transport.assert_not_called()
mgr.tracker_sessions[123].cancel()

ensure_future(mgr.send_request(b'', Mock(proxy=None, transaction_id=123)))
await sleep(0)
mgr.proxy_transports['proxy_url'].sendto.assert_called_once()
mgr.transport.sendto.assert_called_once()
mgr.tracker_sessions[123].cancel()


@pytest.mark.asyncio
async def test_httpsession_cancel_operation():
session = HttpTrackerSession("127.0.0.1", ("localhost", 8475), "/announce", 5)
session = HttpTrackerSession("127.0.0.1", ("localhost", 8475), "/announce", 5, None)
task = ensure_future(session.connect_to_tracker())
with pytest.raises(CancelledError):
task.cancel()
Expand All @@ -169,7 +188,7 @@ async def test_httpsession_cancel_operation():

@pytest.mark.asyncio
async def test_udpsession_cancel_operation(fake_udp_socket_manager):
session = UdpTrackerSession("127.0.0.1", ("localhost", 8475), "/announce", 0, fake_udp_socket_manager)
session = UdpTrackerSession("127.0.0.1", ("localhost", 8475), "/announce", 0, None, fake_udp_socket_manager)
task = ensure_future(session.connect_to_tracker())
with pytest.raises(CancelledError):
task.cancel()
Expand All @@ -179,7 +198,7 @@ async def test_udpsession_cancel_operation(fake_udp_socket_manager):

@pytest.mark.asyncio
async def test_udpsession_handle_response_wrong_len(fake_udp_socket_manager):
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, fake_udp_socket_manager)
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, None, fake_udp_socket_manager)
assert not session.is_failed
fake_udp_socket_manager.response = b"too short"
with pytest.raises(ValueError):
Expand All @@ -196,7 +215,7 @@ async def test_udpsession_handle_response_wrong_len(fake_udp_socket_manager):

@pytest.mark.asyncio
async def test_udpsession_no_port(fake_udp_socket_manager):
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, fake_udp_socket_manager)
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, None, fake_udp_socket_manager)
assert not session.is_failed
fake_udp_socket_manager.transport = None
with pytest.raises(ValueError):
Expand All @@ -206,7 +225,7 @@ async def test_udpsession_no_port(fake_udp_socket_manager):

@pytest.mark.asyncio
async def test_udpsession_handle_connection_wrong_action_transaction(fake_udp_socket_manager):
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, fake_udp_socket_manager)
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, None, fake_udp_socket_manager)
assert not session.is_failed
fake_udp_socket_manager.response = struct.pack("!qq4s", 123, 123, b"test")
with pytest.raises(ValueError):
Expand All @@ -216,7 +235,7 @@ async def test_udpsession_handle_connection_wrong_action_transaction(fake_udp_so

@pytest.mark.asyncio
async def test_udpsession_handle_packet(fake_udp_socket_manager):
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, fake_udp_socket_manager)
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, None, fake_udp_socket_manager)
session.action = 123
session.transaction_id = 124
assert not session.is_failed
Expand All @@ -227,7 +246,7 @@ async def test_udpsession_handle_packet(fake_udp_socket_manager):

@pytest.mark.asyncio
async def test_udpsession_handle_wrong_action_transaction(fake_udp_socket_manager):
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, fake_udp_socket_manager)
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, None, fake_udp_socket_manager)
assert not session.is_failed
fake_udp_socket_manager.response = struct.pack("!qq4s", 123, 123, b"test")
with pytest.raises(ValueError):
Expand All @@ -237,7 +256,7 @@ async def test_udpsession_handle_wrong_action_transaction(fake_udp_socket_manage

@pytest.mark.asyncio
async def test_udpsession_mismatch(fake_udp_socket_manager):
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, fake_udp_socket_manager)
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, None, fake_udp_socket_manager)
session.action = 123
session.transaction_id = 124
session.infohash_list = [b'\x00' * 20]
Expand All @@ -250,7 +269,7 @@ async def test_udpsession_mismatch(fake_udp_socket_manager):

@pytest.mark.asyncio
async def test_udpsession_response_too_short(fake_udp_socket_manager):
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, fake_udp_socket_manager)
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, None, fake_udp_socket_manager)
assert not session.is_failed
fake_udp_socket_manager.response = struct.pack("!i", 123)
with pytest.raises(ValueError):
Expand All @@ -260,7 +279,7 @@ async def test_udpsession_response_too_short(fake_udp_socket_manager):

@pytest.mark.asyncio
async def test_udpsession_response_wrong_transaction_id(fake_udp_socket_manager):
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, fake_udp_socket_manager)
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, None, fake_udp_socket_manager)
assert not session.is_failed
fake_udp_socket_manager.response = struct.pack("!ii", 0, 1337)
with pytest.raises(ValueError):
Expand All @@ -270,7 +289,7 @@ async def test_udpsession_response_wrong_transaction_id(fake_udp_socket_manager)

@pytest.mark.asyncio
async def test_udpsession_response_list_len_mismatch(fake_udp_socket_manager):
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, fake_udp_socket_manager)
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 0, None, fake_udp_socket_manager)
session.action = 123
session.transaction_id = 123
assert not session.is_failed
Expand All @@ -284,7 +303,7 @@ async def test_udpsession_response_list_len_mismatch(fake_udp_socket_manager):

@pytest.mark.asyncio
async def test_udpsession_correct_handle(fake_udp_socket_manager):
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 5, fake_udp_socket_manager)
session = UdpTrackerSession("localhost", ("localhost", 4782), "/announce", 5, None, fake_udp_socket_manager)
session.ip_address = "127.0.0.1"
session.infohash_list.append(b'test')
fake_udp_socket_manager.response = struct.pack("!iiq", 0, session.transaction_id, 2)
Expand All @@ -297,7 +316,7 @@ async def test_udpsession_correct_handle(fake_udp_socket_manager):

@pytest.mark.asyncio
async def test_big_correct_run(fake_udp_socket_manager):
session = UdpTrackerSession("localhost", ("192.168.1.1", 1234), "/announce", 0, fake_udp_socket_manager)
session = UdpTrackerSession("localhost", ("192.168.1.1", 1234), "/announce", 0, None, fake_udp_socket_manager)
assert not session.is_failed
fake_udp_socket_manager.response = struct.pack("!iiq", session.action, session.transaction_id, 126)
await session.connect()
Expand All @@ -310,7 +329,7 @@ async def test_big_correct_run(fake_udp_socket_manager):

@pytest.mark.asyncio
async def test_http_unprocessed_infohashes():
session = HttpTrackerSession("localhost", ("localhost", 8475), "/announce", 5)
session = HttpTrackerSession("localhost", ("localhost", 8475), "/announce", 5, None)
session.infohash_list.append(b"test")
response = bencode({"files": {b"a" * 20: {"complete": 10, "incomplete": 10}}})
session._process_scrape_response(response)
Expand All @@ -320,14 +339,14 @@ async def test_http_unprocessed_infohashes():

@pytest.mark.asyncio
async def test_failed_unicode():
session = HttpTrackerSession(u"localhost", ("localhost", 8475), "/announce", 5)
session = HttpTrackerSession(u"localhost", ("localhost", 8475), "/announce", 5, None)
with pytest.raises(ValueError):
session._process_scrape_response(bencode({'failure reason': '\xe9'}))
await session.cleanup()


def test_failed_unicode_udp(fake_udp_socket_manager):
session = UdpTrackerSession("localhost", ("localhost", 8475), "/announce", 0, fake_udp_socket_manager)
session = UdpTrackerSession("localhost", ("localhost", 8475), "/announce", 0, None, fake_udp_socket_manager)
with pytest.raises(ValueError):
session.failed('\xd0')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
TORRENT_CHECK_RETRY_INTERVAL = 30 # Interval when the torrent was successfully checked for the last time
MAX_TORRENTS_CHECKED_PER_SESSION = 50


class TorrentChecker(TaskManager):

def __init__(self, session):
Expand Down Expand Up @@ -285,9 +286,13 @@ async def check_torrent_health(self, infohash, timeout=20, scrape_now=False):
# get torrent's tracker list from DB
tracker_set = self.get_valid_trackers_of_torrent(torrent_id)

hops = self.tribler_session.config.get_default_number_hops()
socks_listen_ports = self.tribler_session.config.get_tunnel_community_socks5_listen_ports()
proxy = ('127.0.0.1', socks_listen_ports[hops - 1]) if hops > 0 else None

tasks = []
for tracker_url in tracker_set:
session = self._create_session_for_request(tracker_url, timeout=timeout)
session = self._create_session_for_request(tracker_url, timeout=timeout, proxy=proxy)
session.add_infohash(infohash)
tasks.append(self.connect_to_tracker(session))

Expand All @@ -305,8 +310,8 @@ async def check_torrent_health(self, infohash, timeout=20, scrape_now=False):
res = await gather(*tasks, return_exceptions=True)
return self.on_torrent_health_check_completed(infohash, res)

def _create_session_for_request(self, tracker_url, timeout=20):
session = create_tracker_session(tracker_url, timeout, self.socket_mgr)
def _create_session_for_request(self, tracker_url, timeout=20, proxy=None):
session = create_tracker_session(tracker_url, timeout, proxy, self.socket_mgr)

if tracker_url not in self._session_list:
self._session_list[tracker_url] = []
Expand Down

0 comments on commit 8ec6975

Please sign in to comment.