Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA: refactor transfer class #6882

Merged
merged 1 commit into from Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
170 changes: 106 additions & 64 deletions src/tribler/core/components/ipv8/eva_protocol.py
Expand Up @@ -31,7 +31,7 @@
>>> async def on_error(self, peer, exception):
... self.logger.error(f'Error has been occurred: {exception}')
"""
__version__ = '2.0.0'
from __future__ import annotations

import asyncio
import logging
Expand All @@ -42,12 +42,14 @@
from dataclasses import dataclass
from enum import Enum, auto
from random import SystemRandom
from typing import Awaitable, Callable, Dict, Optional, Type
from typing import Awaitable, Callable, Dict, List, Optional, Type

from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.lazy_payload import VariablePayload, vp_compile
from ipv8.types import Peer

__version__ = '2.1.0'

logger = logging.getLogger('EVA')

MAX_U64 = 0xFFFFFFFF
Expand All @@ -68,7 +70,7 @@ class Acknowledgement(VariablePayload):
@vp_compile
class Data(VariablePayload):
format_list = ['I', 'I', 'raw']
names = ['block_number', 'nonce', 'data']
names = ['number', 'nonce', 'data']


@vp_compile
Expand Down Expand Up @@ -144,32 +146,62 @@ class TransferResult:
data: bytes
nonce: int

def __str__(self):
return f'TransferResult(peer={self.peer}, info: {self.info}, data hash: {hash(self.data)}, nonce={self.nonce})'


class TransferWindow:
def __init__(self, start: int, size: int):
self.blocks: List[Optional[bytes]] = [None] * size

self.start = start
self.processed: int = 0
self.last_window: bool = False

def add(self, index: int, block: bytes):
if self.blocks[index] is not None:
return
self.blocks[index] = block
self.processed += 1

def is_finished(self) -> bool:
return self.processed == len(self.blocks)

def consecutive_blocks(self):
for block in self.blocks:
if block is None:
break
yield block

def __str__(self):
return f'{{start: {self.start}, processed: {self.processed}, size: {len(self.blocks)}}}'


class Transfer: # pylint: disable=too-many-instance-attributes
"""The class describes an incoming or an outgoing transfer"""

NONE = -1

def __init__(self, transfer_type: TransferType, info: bytes, data: bytes, data_size: int, block_count: int,
nonce: int, peer: Peer, protocol, future: Optional[Future] = None, window_size: int = 0,
updated: float = 0):
nonce: int, peer: Peer, protocol: EVAProtocol, future: Optional[Future] = None, updated: float = 0):
""" This class has been used internally by the EVA protocol"""
self.type = transfer_type
self.info = info
self.data = data
self.data_list: List[bytes] = [] # for incoming transfers only
self.data_size = data_size
self.block_count = block_count
self.block_number = Transfer.NONE
self.future = future
self.peer = peer
self.nonce = nonce
self.window_size = window_size
self.updated = updated
self.protocol = protocol

self.window: Optional[TransferWindow] = None

self.attempt = 0
self.acknowledgement_number = 0
self.terminated = False
self.acknowledgement_received = False

def finish(self):
result = TransferResult(peer=self.peer, info=self.info, data=self.data, nonce=self.nonce)
Expand All @@ -179,7 +211,7 @@ def terminate(self, result: Optional[TransferResult] = None, exception: Optional
if self.terminated:
return

logger.debug(f'Terminate. Peer: {self.peer}. Transfer: {self}')
logger.debug(f'Terminate. Result: {result}.')

container = self.protocol.incoming if self.type == TransferType.INCOMING else self.protocol.outgoing
container.pop(self.peer, None)
Expand All @@ -194,9 +226,29 @@ def terminate(self, result: Optional[TransferResult] = None, exception: Optional
self.peer = None
self.protocol = None
self.future = None
self.data_list = None
self.window = None

self.terminated = True

def create_window(self):
if not self.window:
self.window = TransferWindow(start=0, size=self.protocol.window_size)
logger.debug(f'New window: {self.window}')
return

self.data_list.extend(self.window.consecutive_blocks())
self.window = TransferWindow(
start=len(self.data_list),
size=self.protocol.window_size
)
logger.debug(f'Next window: {self.window}')

def get_block(self, number: int) -> bytes:
start_position = number * self.protocol.block_size
stop_position = start_position + self.protocol.block_size
return self.data[start_position:stop_position]

def _terminate_with_result(self, result: TransferResult):
if self.future:
self.future.set_result(result)
Expand All @@ -217,10 +269,7 @@ def _terminate_with_exception(self, exception: Exception):
asyncio.create_task(callback(self.peer, exception))

def __str__(self):
return (
f'Type: {self.type}. Info: {self.info}. Block: {self.block_number}({self.block_count}). '
f'Window size: {self.window_size}. Updated: {self.updated}'
)
return f'Type: {self.type}. Info: {self.info}. Window: {self.window}. Updated: {self.updated}'


class TransferException(Exception):
Expand Down Expand Up @@ -318,7 +367,8 @@ def __init__( # pylint: disable=too-many-arguments
f'Binary size limit: {binary_size_limit}.'
)

def send_binary(self, peer: Peer, info: bytes, data: bytes, nonce: Optional[int] = None) -> Future:
def send_binary(self, peer: Peer, info: bytes, data: bytes, nonce: Optional[int] = None) -> \
Awaitable[TransferResult]:
"""Send a big binary data.

Due to ipv8 specifics, we can use only one socket port per one peer.
Expand Down Expand Up @@ -462,7 +512,6 @@ async def on_write_request(self, peer: Peer, payload: WriteRequest):
nonce=payload.nonce,
future=None,
peer=peer,
window_size=self.window_size,
updated=time.time(),
protocol=self
)
Expand All @@ -471,16 +520,16 @@ async def on_write_request(self, peer: Peer, payload: WriteRequest):
self._terminate_by_error(transfer, ValueException('Data size can not be less or equal to 0'))
return

if self._is_simultaneously_served_transfers_limit_exceeded():
exception = TransferLimitException('Maximum simultaneous transfers limit exceeded')
self._terminate_by_error(transfer, exception)
return

if payload.data_size > self.binary_size_limit:
e = SizeException(f'Current data size limit({self.binary_size_limit}) has been exceeded', transfer)
self._terminate_by_error(transfer, e)
return

if self._is_simultaneously_served_transfers_limit_exceeded():
exception = TransferLimitException('Maximum simultaneous transfers limit exceeded')
self._terminate_by_error(transfer, exception)
return

self.incoming[peer] = transfer

self.community.register_anonymous_task('eva_terminate_by_timeout', self._terminate_by_timeout_task, transfer)
Expand All @@ -492,77 +541,71 @@ async def on_acknowledgement(self, peer: Peer, payload: Acknowledgement):

transfer = self.outgoing.get(peer)
if not transfer:
logger.warning("No outgoing transfer found with peer %s associated with incoming ackowledgement.", peer)
return

if transfer.block_number > payload.number:
logger.warning("Cannot handle incoming acknowledgement from peer %s - ack num mismatch (%d - %d)",
peer, transfer.block_number, payload.number)
logger.warning(f'No outgoing transfer found with peer {peer} associated with incoming acknowledgement.')
return

if transfer.nonce != payload.nonce:
logger.warning("Cannot handle incoming acknowledgement from peer %s - nonce mismatch", peer)
logger.warning(f'Cannot handle incoming acknowledgement from peer {peer} - nonce mismatch.')
return

transfer.block_number = payload.number
if transfer.block_number > transfer.block_count:
transfer.acknowledgement_received = True
transfer.updated = time.time()

is_final_acknowledgement = payload.number > transfer.block_count
if is_final_acknowledgement:
transfer.finish()
self.send_scheduled()
return

transfer.window_size = max(self.MIN_WINDOWS_SIZE, min(payload.window_size, self.binary_size_limit))
transfer.updated = time.time()

for block_number in range(transfer.block_number, transfer.block_number + transfer.window_size):
start_position = block_number * self.block_size
stop_position = start_position + self.block_size
data = transfer.data[start_position:stop_position]
logger.debug(f'Transmit({block_number}). Peer: {peer}.')
self.community.eva_send_message(peer, Data(block_number, transfer.nonce, data))
if len(data) == 0:
for number in range(payload.number, payload.number + payload.window_size):
block = transfer.get_block(number)
logger.debug(f'Transmit({number}). Peer: {peer}.')
self.community.eva_send_message(peer, Data(number, transfer.nonce, block))
if len(block) == 0:
break

async def on_data(self, peer, payload):
logger.debug(f'On data({payload.block_number}). Peer: {peer}. Data hash: {hash(payload.data)}')
logger.debug(f'On data({payload.number}). Peer: {peer}. Data hash: {hash(payload.data)}')
transfer = self.incoming.get(peer)
if not transfer:
return

can_be_handled = transfer.block_number == payload.block_number - 1
window_index = payload.number - transfer.window.start
# The packet can be handled if payload number within [window_start..window_start+window_size)
can_be_handled = 0 <= window_index < len(transfer.window.blocks)
if not can_be_handled or transfer.nonce != payload.nonce:
return

transfer.block_number = payload.block_number

is_final_data_packet = len(payload.data) == 0
if is_final_data_packet:
self.send_acknowledgement(transfer)
transfer.finish()
self.send_scheduled()
return
logger.debug(f'Last packet is ({payload.number})')

data_size = len(transfer.data) + len(payload.data)
if data_size > self.binary_size_limit:
e = SizeException(f'Current data size limit({self.binary_size_limit}) has been exceeded', transfer)
self._terminate_by_error(transfer, e)
return

transfer.data += payload.data
transfer.window.last_window = True
# cut current windows to the end of transfer
transfer.window.blocks = transfer.window.blocks[:window_index + 1]

transfer.window.add(window_index, payload.data)
transfer.attempt = 0
transfer.updated = time.time()

time_to_acknowledge = transfer.acknowledgement_number + transfer.window_size <= transfer.block_number + 1
if time_to_acknowledge:
if transfer.window.is_finished():
logger.debug(f'Window finished: {transfer.window}')
if not transfer.window.last_window:
self.send_acknowledgement(transfer)
return

logger.debug('Finish transfer')
self.send_acknowledgement(transfer)
transfer.data = b''.join(transfer.data_list)
transfer.finish()
self.send_scheduled()

def send_acknowledgement(self, transfer: Transfer):
ack = transfer.block_number + 1
transfer.create_window()

transfer.acknowledgement_number = ack
logger.debug(f'Ack ({ack}). Window size: {transfer.window_size}. Peer: {transfer.peer}')
logger.debug(f'Ack ({transfer.window.start}). Peer: {transfer.peer}')

acknowledgement = Acknowledgement(ack, transfer.window_size, transfer.nonce)
acknowledgement = Acknowledgement(transfer.window.start, len(transfer.window.blocks), transfer.nonce)
self.community.eva_send_message(transfer.peer, acknowledgement)

async def on_error(self, peer: Peer, error: Error):
Expand Down Expand Up @@ -636,9 +679,7 @@ async def _resend_acknowledge_task(self, transfer: Transfer):
remaining_time = self.retransmit_interval_in_sec

current_attempt = f'{transfer.attempt + 1}/{self.retransmit_attempt_count}'
logger.debug(f'Re-ack ({transfer.acknowledgement_number}). '
f'Attempt: {current_attempt} for peer: {transfer.peer}')

logger.debug(f'Re-ack. Attempt: {current_attempt} for peer: {transfer.peer}')
self.send_acknowledgement(transfer)

async def _send_write_request_task(self, transfer: Transfer):
Expand All @@ -650,8 +691,9 @@ async def _send_write_request_task(self, transfer: Transfer):
for attempt in range(self.retransmit_attempt_count):
await asyncio.sleep(self.retransmit_interval_in_sec)

if transfer.terminated or transfer.block_number != Transfer.NONE:
if transfer.terminated or transfer.acknowledgement_received:
return

current_attempt = f'{attempt + 1}/{self.retransmit_attempt_count}'
logger.debug(f'Re-write request. Attempt: {current_attempt} for peer: {transfer.peer}')
self.send_write_request(transfer)
Expand Down