From 18335337b7fb4baa355cc1d5b8a17550878a6d63 Mon Sep 17 00:00:00 2001 From: drew2a Date: Fri, 22 Apr 2022 12:57:23 +0200 Subject: [PATCH] Add an incoming array --- .../core/components/ipv8/eva_protocol.py | 170 +++++++++++------- .../ipv8/tests/test_eva_protocol.py | 94 +++++----- 2 files changed, 153 insertions(+), 111 deletions(-) diff --git a/src/tribler/core/components/ipv8/eva_protocol.py b/src/tribler/core/components/ipv8/eva_protocol.py index 571512b1d7f..002700199c4 100644 --- a/src/tribler/core/components/ipv8/eva_protocol.py +++ b/src/tribler/core/components/ipv8/eva_protocol.py @@ -31,7 +31,7 @@ >>> async def on_error(self, peer, exception): ... self.logger.error(f'Error has been occurred: {exception}') """ -__version__ = '2.0.0' +from __future__ import annotations import asyncio import logging @@ -42,12 +42,14 @@ from dataclasses import dataclass from enum import Enum, auto from random import SystemRandom -from typing import Awaitable, Callable, Dict, Optional, Type +from typing import Awaitable, Callable, Dict, List, Optional, Type from ipv8.lazy_community import lazy_wrapper from ipv8.messaging.lazy_payload import VariablePayload, vp_compile from ipv8.types import Peer +__version__ = '2.1.0' + logger = logging.getLogger('EVA') MAX_U64 = 0xFFFFFFFF @@ -68,7 +70,7 @@ class Acknowledgement(VariablePayload): @vp_compile class Data(VariablePayload): format_list = ['I', 'I', 'raw'] - names = ['block_number', 'nonce', 'data'] + names = ['number', 'nonce', 'data'] @vp_compile @@ -144,6 +146,36 @@ class TransferResult: data: bytes nonce: int + def __str__(self): + return f'TransferResult(peer={self.peer}, info: {self.info}, data hash: {hash(self.data)}, nonce={self.nonce})' + + +class TransferWindow: + def __init__(self, start: int, size: int): + self.blocks: List[Optional[bytes]] = [None] * size + + self.start = start + self.processed: int = 0 + self.last_window: bool = False + + def add(self, index: int, block: bytes): + if self.blocks[index] is not None: + return + self.blocks[index] = block + self.processed += 1 + + def is_finished(self) -> bool: + return self.processed == len(self.blocks) + + def consecutive_blocks(self): + for block in self.blocks: + if block is None: + break + yield block + + def __str__(self): + return f'{{start: {self.start}, processed: {self.processed}, size: {len(self.blocks)}}}' + class Transfer: # pylint: disable=too-many-instance-attributes """The class describes an incoming or an outgoing transfer""" @@ -151,25 +183,25 @@ class Transfer: # pylint: disable=too-many-instance-attributes NONE = -1 def __init__(self, transfer_type: TransferType, info: bytes, data: bytes, data_size: int, block_count: int, - nonce: int, peer: Peer, protocol, future: Optional[Future] = None, window_size: int = 0, - updated: float = 0): + nonce: int, peer: Peer, protocol: EVAProtocol, future: Optional[Future] = None, updated: float = 0): """ This class has been used internally by the EVA protocol""" self.type = transfer_type self.info = info self.data = data + self.data_list: List[bytes] = [] # for incoming transfers only self.data_size = data_size self.block_count = block_count - self.block_number = Transfer.NONE self.future = future self.peer = peer self.nonce = nonce - self.window_size = window_size self.updated = updated self.protocol = protocol + self.window: Optional[TransferWindow] = None + self.attempt = 0 - self.acknowledgement_number = 0 self.terminated = False + self.acknowledgement_received = False def finish(self): result = TransferResult(peer=self.peer, info=self.info, data=self.data, nonce=self.nonce) @@ -179,7 +211,7 @@ def terminate(self, result: Optional[TransferResult] = None, exception: Optional if self.terminated: return - logger.debug(f'Terminate. Peer: {self.peer}. Transfer: {self}') + logger.debug(f'Terminate. Result: {result}.') container = self.protocol.incoming if self.type == TransferType.INCOMING else self.protocol.outgoing container.pop(self.peer, None) @@ -194,9 +226,29 @@ def terminate(self, result: Optional[TransferResult] = None, exception: Optional self.peer = None self.protocol = None self.future = None + self.data_list = None + self.window = None self.terminated = True + def create_window(self): + if not self.window: + self.window = TransferWindow(start=0, size=self.protocol.window_size) + logger.debug(f'New window: {self.window}') + return + + self.data_list.extend(self.window.consecutive_blocks()) + self.window = TransferWindow( + start=len(self.data_list), + size=self.protocol.window_size + ) + logger.debug(f'Next window: {self.window}') + + def get_block(self, number: int) -> bytes: + start_position = number * self.protocol.block_size + stop_position = start_position + self.protocol.block_size + return self.data[start_position:stop_position] + def _terminate_with_result(self, result: TransferResult): if self.future: self.future.set_result(result) @@ -217,10 +269,7 @@ def _terminate_with_exception(self, exception: Exception): asyncio.create_task(callback(self.peer, exception)) def __str__(self): - return ( - f'Type: {self.type}. Info: {self.info}. Block: {self.block_number}({self.block_count}). ' - f'Window size: {self.window_size}. Updated: {self.updated}' - ) + return f'Type: {self.type}. Info: {self.info}. Window: {self.window}. Updated: {self.updated}' class TransferException(Exception): @@ -318,7 +367,8 @@ def __init__( # pylint: disable=too-many-arguments f'Binary size limit: {binary_size_limit}.' ) - def send_binary(self, peer: Peer, info: bytes, data: bytes, nonce: Optional[int] = None) -> Future: + def send_binary(self, peer: Peer, info: bytes, data: bytes, nonce: Optional[int] = None) -> \ + Awaitable[TransferResult]: """Send a big binary data. Due to ipv8 specifics, we can use only one socket port per one peer. @@ -462,7 +512,6 @@ async def on_write_request(self, peer: Peer, payload: WriteRequest): nonce=payload.nonce, future=None, peer=peer, - window_size=self.window_size, updated=time.time(), protocol=self ) @@ -471,16 +520,16 @@ async def on_write_request(self, peer: Peer, payload: WriteRequest): self._terminate_by_error(transfer, ValueException('Data size can not be less or equal to 0')) return - if self._is_simultaneously_served_transfers_limit_exceeded(): - exception = TransferLimitException('Maximum simultaneous transfers limit exceeded') - self._terminate_by_error(transfer, exception) - return - if payload.data_size > self.binary_size_limit: e = SizeException(f'Current data size limit({self.binary_size_limit}) has been exceeded', transfer) self._terminate_by_error(transfer, e) return + if self._is_simultaneously_served_transfers_limit_exceeded(): + exception = TransferLimitException('Maximum simultaneous transfers limit exceeded') + self._terminate_by_error(transfer, exception) + return + self.incoming[peer] = transfer self.community.register_anonymous_task('eva_terminate_by_timeout', self._terminate_by_timeout_task, transfer) @@ -492,77 +541,71 @@ async def on_acknowledgement(self, peer: Peer, payload: Acknowledgement): transfer = self.outgoing.get(peer) if not transfer: - logger.warning("No outgoing transfer found with peer %s associated with incoming ackowledgement.", peer) - return - - if transfer.block_number > payload.number: - logger.warning("Cannot handle incoming acknowledgement from peer %s - ack num mismatch (%d - %d)", - peer, transfer.block_number, payload.number) + logger.warning(f'No outgoing transfer found with peer {peer} associated with incoming acknowledgement.') return if transfer.nonce != payload.nonce: - logger.warning("Cannot handle incoming acknowledgement from peer %s - nonce mismatch", peer) + logger.warning(f'Cannot handle incoming acknowledgement from peer {peer} - nonce mismatch.') return - transfer.block_number = payload.number - if transfer.block_number > transfer.block_count: + transfer.acknowledgement_received = True + transfer.updated = time.time() + + is_final_acknowledgement = payload.number > transfer.block_count + if is_final_acknowledgement: transfer.finish() self.send_scheduled() return - transfer.window_size = max(self.MIN_WINDOWS_SIZE, min(payload.window_size, self.binary_size_limit)) - transfer.updated = time.time() - - for block_number in range(transfer.block_number, transfer.block_number + transfer.window_size): - start_position = block_number * self.block_size - stop_position = start_position + self.block_size - data = transfer.data[start_position:stop_position] - logger.debug(f'Transmit({block_number}). Peer: {peer}.') - self.community.eva_send_message(peer, Data(block_number, transfer.nonce, data)) - if len(data) == 0: + for number in range(payload.number, payload.number + payload.window_size): + block = transfer.get_block(number) + logger.debug(f'Transmit({number}). Peer: {peer}.') + self.community.eva_send_message(peer, Data(number, transfer.nonce, block)) + if len(block) == 0: break async def on_data(self, peer, payload): - logger.debug(f'On data({payload.block_number}). Peer: {peer}. Data hash: {hash(payload.data)}') + logger.debug(f'On data({payload.number}). Peer: {peer}. Data hash: {hash(payload.data)}') transfer = self.incoming.get(peer) if not transfer: return - can_be_handled = transfer.block_number == payload.block_number - 1 + window_index = payload.number - transfer.window.start + # The packet can be handled if payload number within [window_start..window_start+window_size) + can_be_handled = 0 <= window_index < len(transfer.window.blocks) if not can_be_handled or transfer.nonce != payload.nonce: return - transfer.block_number = payload.block_number - is_final_data_packet = len(payload.data) == 0 if is_final_data_packet: - self.send_acknowledgement(transfer) - transfer.finish() - self.send_scheduled() - return + logger.debug(f'Last packet is ({payload.number})') - data_size = len(transfer.data) + len(payload.data) - if data_size > self.binary_size_limit: - e = SizeException(f'Current data size limit({self.binary_size_limit}) has been exceeded', transfer) - self._terminate_by_error(transfer, e) - return - - transfer.data += payload.data + transfer.window.last_window = True + # cut current windows to the end of transfer + transfer.window.blocks = transfer.window.blocks[:window_index + 1] + transfer.window.add(window_index, payload.data) transfer.attempt = 0 transfer.updated = time.time() - time_to_acknowledge = transfer.acknowledgement_number + transfer.window_size <= transfer.block_number + 1 - if time_to_acknowledge: + if transfer.window.is_finished(): + logger.debug(f'Window finished: {transfer.window}') + if not transfer.window.last_window: + self.send_acknowledgement(transfer) + return + + logger.debug('Finish transfer') self.send_acknowledgement(transfer) + transfer.data = b''.join(transfer.data_list) + transfer.finish() + self.send_scheduled() def send_acknowledgement(self, transfer: Transfer): - ack = transfer.block_number + 1 + transfer.create_window() - transfer.acknowledgement_number = ack - logger.debug(f'Ack ({ack}). Window size: {transfer.window_size}. Peer: {transfer.peer}') + logger.debug(f'Ack ({transfer.window.start}). Peer: {transfer.peer}') - acknowledgement = Acknowledgement(ack, transfer.window_size, transfer.nonce) + acknowledgement = Acknowledgement(transfer.window.start, len(transfer.window.blocks), transfer.nonce) self.community.eva_send_message(transfer.peer, acknowledgement) async def on_error(self, peer: Peer, error: Error): @@ -636,9 +679,7 @@ async def _resend_acknowledge_task(self, transfer: Transfer): remaining_time = self.retransmit_interval_in_sec current_attempt = f'{transfer.attempt + 1}/{self.retransmit_attempt_count}' - logger.debug(f'Re-ack ({transfer.acknowledgement_number}). ' - f'Attempt: {current_attempt} for peer: {transfer.peer}') - + logger.debug(f'Re-ack. Attempt: {current_attempt} for peer: {transfer.peer}') self.send_acknowledgement(transfer) async def _send_write_request_task(self, transfer: Transfer): @@ -650,8 +691,9 @@ async def _send_write_request_task(self, transfer: Transfer): for attempt in range(self.retransmit_attempt_count): await asyncio.sleep(self.retransmit_interval_in_sec) - if transfer.terminated or transfer.block_number != Transfer.NONE: + if transfer.terminated or transfer.acknowledgement_received: return + current_attempt = f'{attempt + 1}/{self.retransmit_attempt_count}' logger.debug(f'Re-write request. Attempt: {current_attempt} for peer: {transfer.peer}') self.send_write_request(transfer) diff --git a/src/tribler/core/components/ipv8/tests/test_eva_protocol.py b/src/tribler/core/components/ipv8/tests/test_eva_protocol.py index fcfce9c090e..522d21ba39d 100644 --- a/src/tribler/core/components/ipv8/tests/test_eva_protocol.py +++ b/src/tribler/core/components/ipv8/tests/test_eva_protocol.py @@ -23,7 +23,7 @@ Transfer, TransferException, TransferLimitException, TransferResult, - TransferType, ValueException, + TransferType, TransferWindow, ValueException, WriteRequest, ) @@ -346,9 +346,10 @@ async def test_survive_when_multiply_packets_lost(self): packet_loss_probability = lost_packets_count_estimation / (block_count * data_set_count) self.bob.eva.retransmit_attempt_count = lost_packets_count_estimation + self.alice.eva.retransmit_interval_in_sec = 1 + self.bob.eva.retransmit_interval_in_sec = 0.1 for participant in [self.alice, self.bob]: - participant.eva.retransmit_interval_in_sec = 0 participant.eva.block_size = 3 participant.eva.window_size = 10 @@ -370,6 +371,7 @@ async def fake_bob_on_data(peer, payload): if chance_to_fake and not max_count_reached and not is_last_packet: self.test_store.actual_packets_lost += 1 + logging.info(f'Lost packet ({payload.number})') return await bob_on_data(peer, payload) @@ -431,14 +433,13 @@ async def test_dynamically_changed_window_size(self): bob_send_acknowledgement = self.bob.eva.send_acknowledgement - def bob_fake_send_acknowledgement(transfer): - if transfer.window_size == 1: + def bob_fake_send_acknowledgement(transfer: Transfer): + if self.bob.eva.window_size == 1: # go up self.test_store.window_size_increment = 2 - transfer.window_size += self.test_store.window_size_increment + self.bob.eva.window_size += self.test_store.window_size_increment - self.test_store.actual_window_size = transfer.window_size bob_send_acknowledgement(transfer) self.bob.eva.send_acknowledgement = bob_fake_send_acknowledgement @@ -451,31 +452,36 @@ async def test_cheating_send_over_size(self): # Alice will try to send b`extra` binary data over the original size. self.bob.eva.binary_size_limit = 5 - await self.send_sequence_from_alice_to_bob( WriteRequest(4, 1, b'info'), Data(0, 1, b'data'), - Data(1, 1, b'extra') + Data(100, 1, b'extra'), # over the window, should be ignored + + Data(1, 1, b''), ) - assert isinstance(self.bob.most_recent_received_exception, SizeException) + assert self.bob.most_recent_received_data == (b'info', b'data', 1) async def test_wrong_message_order(self): # In this test we send a single transfer from Alice to Bob. # Alice will try to send packets in invalid order. These packets - # should be dropped + # should be delivered. self.bob.eva.block_size = 2 - + expected_data = b'ABCDEFJHI' await self.send_sequence_from_alice_to_bob( - WriteRequest(4, 1, b'info'), - Data(0, 1, b'da'), - Data(2, 1, b'xx'), # should be dropped - Data(1, 1, b'ta'), + WriteRequest(len(expected_data), 1, b'info'), + Data(0, 1, b'ABC'), + Data(2, 1, b'JHI'), + Data(1, 1, b'DEF'), + + Data(1, 1, b'xx'), # should be ignored + Data(2, 2, b'xx'), # should be ignored + Data(2, 1, b''), ) - assert self.bob.most_recent_received_data == (b'info', b'data', 1) + assert self.bob.most_recent_received_data == (b'info', expected_data, 1) async def test_wrong_message_order_and_wrong_nonce(self): # In this test we send a single transfer from Alice to Bob. @@ -523,6 +529,13 @@ def peer(): return Mock() +def create_transfer() -> Transfer: + block_size = 10 + data_size = 100 + return Transfer(transfer_type=TransferType.INCOMING, info=b'', data=b'd' * data_size, data_size=data_size, + block_count=10, nonce=0, peer=Mock(), protocol=EVAProtocol(Mock(), block_size=block_size)) + + @pytest.mark.asyncio async def test_on_write_request_data_size_le0(eva: EVAProtocol, peer): # validate that data_size can not be less or equal to 0 @@ -533,37 +546,6 @@ async def test_on_write_request_data_size_le0(eva: EVAProtocol, peer): assert method_mock.call_count == 2 -@pytest.mark.asyncio -async def test_on_acknowledgement_window_size_attr(eva: EVAProtocol, peer): - # This test ensures that `window_size` will be always within the limits: - # 0 < window_size < binary_size_limit - nonce = 1 - transfer = Transfer( - transfer_type=TransferType.OUTGOING, - info=b'', - data=b'', - data_size=0, - block_count=10, - nonce=nonce, - future=None, - peer=Mock(), - window_size=0, - protocol=eva - ) - - eva.outgoing[peer] = transfer - window_size = 0 - - # validate that window_size can not be less or equal to 0 - await eva.on_acknowledgement(peer, Acknowledgement(1, window_size, nonce)) - assert transfer.window_size == eva.MIN_WINDOWS_SIZE - - # validate that window_size can not be greater than binary_size_limit - window_size = eva.binary_size_limit + 1 - await eva.on_acknowledgement(peer, Acknowledgement(1, window_size, nonce)) - assert transfer.window_size == eva.binary_size_limit - - def test_is_simultaneously_served_transfers_limit_exceeded(eva: EVAProtocol): # In this test we will try to exceed `max_simultaneous_transfers` limit. eva.max_simultaneous_transfers = 3 @@ -671,3 +653,21 @@ def test_shutdown(eva: EVAProtocol): eva.shutdown() assert all(t.terminate.called for t in [transfer1, transfer2, transfer3]) + + +def test_block(): + transfer = create_transfer() + + first_block = b'd' * 10 + assert transfer.get_block(0) == first_block + + last_block = transfer.get_block(10) + assert last_block == b'' + + +def test_finished(): + window = TransferWindow(start=0, size=10) + assert not window.is_finished() + + window.processed = 10 + assert window.is_finished()