From be25e61cb7a5dc92d2ea29e6cd93ac0f38bf9fda Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Fri, 5 Jan 2018 14:07:14 +0200 Subject: [PATCH 01/11] handle case where normal peer sends a piece before joystream peer Fixes issue #35 --- sources/src/TorrentPlugin.cpp | 37 ++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/sources/src/TorrentPlugin.cpp b/sources/src/TorrentPlugin.cpp index 5152892..52afcd8 100644 --- a/sources/src/TorrentPlugin.cpp +++ b/sources/src/TorrentPlugin.cpp @@ -735,22 +735,27 @@ protocol_session::RemovedConnectionCallbackHandler TorrentP protocol_session::FullPieceArrived TorrentPlugin::fullPieceArrived() { return [this](const libtorrent::peer_id & peerId, const protocol_wire::PieceData & pieceData, int index) -> void { - - // Make sure no outstanding calls exist for this index - assert(!_outstandingFullPieceArrivedCalls.count(index)); - - _outstandingFullPieceArrivedCalls[index] = peerId; - - // Tell libtorrent to validate piece - // last argument is a flag which presently seems to only test - // flags & torrent::overwrite_existing, which seems to be whether - // the piece should be overwritten if it is already present - // - // libtorrent::torrent_plugin::on_piece_pass() - // libtorrent::torrent_plugin::on_piece_failed() - // processes result of checking - - torrent()->add_piece(index, pieceData.piece().get(), 0); + if (!torrent()->have_piece(index)) { + // Make sure no outstanding calls exist for this index + assert(!_outstandingFullPieceArrivedCalls.count(index)); + + _outstandingFullPieceArrivedCalls[index] = peerId; + + // Tell libtorrent to validate piece + // last argument is a flag which presently seems to only test + // flags & torrent::overwrite_existing, which seems to be whether + // the piece should be overwritten if it is already present + // + // libtorrent::torrent_plugin::on_piece_pass() + // libtorrent::torrent_plugin::on_piece_failed() + // processes result of checking + torrent()->add_piece(index, pieceData.piece().get(), 0); + } else { + // We already received the piece from another peer (most likely a non joystream peer) + // For now we ignore the validity of the piece data + // tell session about endpoint and piece + _session.validPieceReceivedOnConnection(peerId, index); + } }; } From aee486006e9237a738f3938800e4d83ba5942d66 Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Mon, 8 Jan 2018 17:40:25 +0200 Subject: [PATCH 02/11] Add AllSellersGone event and protocol session AllSellersGone callback handler --- sources/include/extension/Alert.hpp | 13 +++++++++++++ sources/include/extension/TorrentPlugin.hpp | 1 + sources/src/TorrentPlugin.cpp | 14 +++++++++++++- 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sources/include/extension/Alert.hpp b/sources/include/extension/Alert.hpp index 55850e6..1bf70af 100644 --- a/sources/include/extension/Alert.hpp +++ b/sources/include/extension/Alert.hpp @@ -548,6 +548,19 @@ namespace alert { const Coin::KeyPair contractKeyPair; const Coin::PubKeyHash finalPkHash; }; + + struct AllSellersGone : public libtorrent::torrent_alert { + + AllSellersGone(libtorrent::aux::stack_allocator& alloc, + const libtorrent::torrent_handle & h) + : libtorrent::torrent_alert(alloc, h) {} + + TORRENT_DEFINE_ALERT(DownloadStarted, libtorrent::user_alert_id + 32) + static const int static_category = alert::status_notification; + virtual std::string message() const override { + return torrent_alert::message() + " all sellers gone"; + } + }; } } } diff --git a/sources/include/extension/TorrentPlugin.hpp b/sources/include/extension/TorrentPlugin.hpp index c02fd65..5c3f579 100644 --- a/sources/include/extension/TorrentPlugin.hpp +++ b/sources/include/extension/TorrentPlugin.hpp @@ -215,6 +215,7 @@ class TorrentPlugin : public libtorrent::torrent_plugin { protocol_session::AnchorAnnounced anchorAnnounced(); protocol_session::ReceivedValidPayment receivedValidPayment(); protocol_session::SentPayment sentPayment(); + protocol_session::AllSellersGone allSellersGone(); /// Members diff --git a/sources/src/TorrentPlugin.cpp b/sources/src/TorrentPlugin.cpp index 52afcd8..7acb3ab 100644 --- a/sources/src/TorrentPlugin.cpp +++ b/sources/src/TorrentPlugin.cpp @@ -466,7 +466,8 @@ void TorrentPlugin::toBuyMode(const protocol_wire::BuyerTerms & terms) { fullPieceArrived(), sentPayment(), terms, - torrentPieceInformation()); + torrentPieceInformation(), + allSellersGone()); // Send notification _alertManager->emplace_alert(_torrent, terms); @@ -863,6 +864,17 @@ protocol_session::SentPayment TorrentPlugin::sentPayment() } +protocol_session::AllSellersGone TorrentPlugin::allSellersGone() { + // Get alert manager and handle for torrent + libtorrent::torrent * t = torrent(); + libtorrent::alert_manager & manager = t->alerts(); + libtorrent::torrent_handle h = t->get_handle(); + + return [&manager, h, this](void) -> void { + manager.emplace_alert(h); + }; +} + int TorrentPlugin::pickNextPiece(const std::vector> * pieces) { libtorrent::torrent * t = torrent(); From 9404169faf586918c1ef6780c57b6bda850844d3 Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Mon, 8 Jan 2018 17:40:25 +0200 Subject: [PATCH 03/11] Add AllSellersGone event and protocol session AllSellersGone callback handler --- sources/include/extension/Alert.hpp | 13 +++++++++++++ sources/include/extension/TorrentPlugin.hpp | 1 + sources/src/TorrentPlugin.cpp | 14 +++++++++++++- 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sources/include/extension/Alert.hpp b/sources/include/extension/Alert.hpp index 55850e6..1bf70af 100644 --- a/sources/include/extension/Alert.hpp +++ b/sources/include/extension/Alert.hpp @@ -548,6 +548,19 @@ namespace alert { const Coin::KeyPair contractKeyPair; const Coin::PubKeyHash finalPkHash; }; + + struct AllSellersGone : public libtorrent::torrent_alert { + + AllSellersGone(libtorrent::aux::stack_allocator& alloc, + const libtorrent::torrent_handle & h) + : libtorrent::torrent_alert(alloc, h) {} + + TORRENT_DEFINE_ALERT(DownloadStarted, libtorrent::user_alert_id + 32) + static const int static_category = alert::status_notification; + virtual std::string message() const override { + return torrent_alert::message() + " all sellers gone"; + } + }; } } } diff --git a/sources/include/extension/TorrentPlugin.hpp b/sources/include/extension/TorrentPlugin.hpp index c02fd65..5c3f579 100644 --- a/sources/include/extension/TorrentPlugin.hpp +++ b/sources/include/extension/TorrentPlugin.hpp @@ -215,6 +215,7 @@ class TorrentPlugin : public libtorrent::torrent_plugin { protocol_session::AnchorAnnounced anchorAnnounced(); protocol_session::ReceivedValidPayment receivedValidPayment(); protocol_session::SentPayment sentPayment(); + protocol_session::AllSellersGone allSellersGone(); /// Members diff --git a/sources/src/TorrentPlugin.cpp b/sources/src/TorrentPlugin.cpp index 5152892..90b0175 100644 --- a/sources/src/TorrentPlugin.cpp +++ b/sources/src/TorrentPlugin.cpp @@ -466,7 +466,8 @@ void TorrentPlugin::toBuyMode(const protocol_wire::BuyerTerms & terms) { fullPieceArrived(), sentPayment(), terms, - torrentPieceInformation()); + torrentPieceInformation(), + allSellersGone()); // Send notification _alertManager->emplace_alert(_torrent, terms); @@ -858,6 +859,17 @@ protocol_session::SentPayment TorrentPlugin::sentPayment() } +protocol_session::AllSellersGone TorrentPlugin::allSellersGone() { + // Get alert manager and handle for torrent + libtorrent::torrent * t = torrent(); + libtorrent::alert_manager & manager = t->alerts(); + libtorrent::torrent_handle h = t->get_handle(); + + return [&manager, h, this](void) -> void { + manager.emplace_alert(h); + }; +} + int TorrentPlugin::pickNextPiece(const std::vector> * pieces) { libtorrent::torrent * t = torrent(); From 05a4e4c4ad01f281734180872f0942d1e1c2aba3 Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Fri, 26 Jan 2018 16:44:20 +0200 Subject: [PATCH 04/11] drop connections if protocol major versions are different --- sources/src/PeerPlugin.cpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sources/src/PeerPlugin.cpp b/sources/src/PeerPlugin.cpp index bfde3b1..e4442b0 100644 --- a/sources/src/PeerPlugin.cpp +++ b/sources/src/PeerPlugin.cpp @@ -241,6 +241,20 @@ namespace extension { return true; } + // Only communicate with peers with same protocol major version number + if(_protocolVersionOfPeer.major() != protocol_statemachine::CBStateMachine::protocolVersion.major()) { + // Mark peer as not supporting this extension + _peerPaymentBEPSupportStatus = BEPSupportStatus::not_supported; + + // Remove peer + std::clog << "Dropping Peer: (incompatible protocol vesrion)" << std::endl; + libtorrent::error_code ec; + drop(ec); + + // Keep us around + return true; + } + // Try to extract m key, if its not present, then we are done libtorrent::bdecode_node m = handshake.dict_find_dict("m"); From bbc0665554ff8273cc34856dce70473ef1b8b91f Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Fri, 26 Jan 2018 19:04:40 +0200 Subject: [PATCH 05/11] synchronously validate full piece when received from seller --- sources/src/TorrentPlugin.cpp | 102 ++++++++-------------------------- 1 file changed, 22 insertions(+), 80 deletions(-) diff --git a/sources/src/TorrentPlugin.cpp b/sources/src/TorrentPlugin.cpp index 7acb3ab..75284fd 100644 --- a/sources/src/TorrentPlugin.cpp +++ b/sources/src/TorrentPlugin.cpp @@ -14,6 +14,7 @@ #include #include #include // print_endpoint +#include namespace joystream { @@ -164,60 +165,12 @@ void TorrentPlugin::on_piece_pass(int index) { // Make sure we are in correct mode, as mode changed may have occured if(_session.mode() == protocol_session::SessionMode::buying) { - - auto it = _outstandingFullPieceArrivedCalls.find(index); - - // If this validation is not due to us - if(it == _outstandingFullPieceArrivedCalls.cend()) { - - // then just tell session about it - _session.pieceDownloaded(index); - - } else { - auto peerId = it->second; - auto peerPlugin = peer(peerId); - auto endPoint = peerPlugin->endPoint(); - - _alertManager->emplace_alert(_torrent, endPoint, peerId, index); - - // if its due to us, then tell session about endpoint and piece - _session.validPieceReceivedOnConnection(peerId, index); - - // and remove call - _outstandingFullPieceArrivedCalls.erase(it); - } + _session.pieceDownloaded(index); } } void TorrentPlugin::on_piece_failed(int index) { - // Make sure we are in correct mode, as mode changed may have occured - if(_session.mode() == protocol_session::SessionMode::buying) { - - auto it = _outstandingFullPieceArrivedCalls.find(index); - - // If this validation is not due to us - if(it == _outstandingFullPieceArrivedCalls.cend()) { - - // then there is nothing to do - - } else { - - // if its due to us, then - - auto peerId = it->second; - auto peerPlugin = peer(peerId); - auto endPoint = peerPlugin->endPoint(); - - _alertManager->emplace_alert(_torrent, endPoint, peerId, index); - - // tell session about endpoint and piece - _session.invalidPieceReceivedOnConnection(peerId, index); - - // and remove call - _outstandingFullPieceArrivedCalls.erase(it); - } - } } void TorrentPlugin::tick() { @@ -407,8 +360,6 @@ void TorrentPlugin::toObserveMode() { // NB: We are doing clearing regardless of whether operation is successful! if(_session.mode() == protocol_session::SessionMode::selling) _outstandingLoadPieceForBuyerCalls.clear(); - else if(_session.mode() == protocol_session::SessionMode::buying) - _outstandingFullPieceArrivedCalls.clear(); _session.toObserveMode(removeConnection()); @@ -421,11 +372,6 @@ void TorrentPlugin::toSellMode(const protocol_wire::SellerTerms & terms) { // Should have been cleared before assert(_outstandingLoadPieceForBuyerCalls.empty()); - // Clear relevant mappings - // NB: We are doing clearing regardless of whether operation is successful! - if(_session.mode() == protocol_session::SessionMode::buying) - _outstandingFullPieceArrivedCalls.clear(); - if(_torrent.status().state != libtorrent::torrent_status::state_t::seeding) { throw exception::InvalidModeTransition(); } @@ -450,9 +396,6 @@ void TorrentPlugin::toSellMode(const protocol_wire::SellerTerms & terms) { void TorrentPlugin::toBuyMode(const protocol_wire::BuyerTerms & terms) { - // Should have been cleared before - assert(_outstandingFullPieceArrivedCalls.empty()); - // Clear relevant mappings // NB: We are doing clearing regardless of whether operation is successful! if(_session.mode() == protocol_session::SessionMode::selling) @@ -686,18 +629,6 @@ void TorrentPlugin::removeFromSession(PeerPlugin* peerPlugin) { protocol_session::RemovedConnectionCallbackHandler TorrentPlugin::removeConnection() { return [this](const libtorrent::peer_id & peerId, protocol_session::DisconnectCause cause) { - // remove call for peerId from _outstandingFullPieceArrivedCalls - typedef std::map OutstandingFullPieceArrivedCallsMap; - - auto call = std::find_if( - std::begin(_outstandingFullPieceArrivedCalls), - std::end(_outstandingFullPieceArrivedCalls), - boost::bind(&OutstandingFullPieceArrivedCallsMap::value_type::second, _1) == peerId - ); - - if(call != std::end(_outstandingFullPieceArrivedCalls)) - _outstandingFullPieceArrivedCalls.erase(call); - // remove call for peer_id from _outstandingLoadPieceForBuyerCalls for(auto mapping : _outstandingLoadPieceForBuyerCalls) { auto calls = mapping.second; @@ -735,14 +666,24 @@ protocol_session::RemovedConnectionCallbackHandler TorrentP protocol_session::FullPieceArrived TorrentPlugin::fullPieceArrived() { - return [this](const libtorrent::peer_id & peerId, const protocol_wire::PieceData & pieceData, int index) -> void { - if (!torrent()->have_piece(index)) { - // Make sure no outstanding calls exist for this index - assert(!_outstandingFullPieceArrivedCalls.count(index)); + return [this](const libtorrent::peer_id & peerId, const protocol_wire::PieceData & pieceData, int index) -> bool { + auto peerPlugin = peer(peerId); + auto endPoint = peerPlugin->endPoint(); + + const auto ti = torrent()->torrent_file(); + + // test if piece data is valid + const libtorrent::sha1_hash expected = ti.hash_for_piece(index); + const libtorrent::sha1_hash computed = libtorrent::hasher(pieceData.piece().get(), pieceData.length()).final(); - _outstandingFullPieceArrivedCalls[index] = peerId; + if (computed != expected) { + _alertManager->emplace_alert(_torrent, endPoint, peerId, index); + return false; + } + + if (!torrent()->have_piece(index)) { - // Tell libtorrent to validate piece + // Tell libtorrent to add and validate piece // last argument is a flag which presently seems to only test // flags & torrent::overwrite_existing, which seems to be whether // the piece should be overwritten if it is already present @@ -753,10 +694,11 @@ protocol_session::FullPieceArrived TorrentPlugin::fullPiece torrent()->add_piece(index, pieceData.piece().get(), 0); } else { // We already received the piece from another peer (most likely a non joystream peer) - // For now we ignore the validity of the piece data - // tell session about endpoint and piece - _session.validPieceReceivedOnConnection(peerId, index); } + + _alertManager->emplace_alert(_torrent, endPoint, peerId, index); + + return true; }; } From 39491aa72df5ab6b301c9bd144aba39f2d3a934d Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Sat, 27 Jan 2018 11:25:12 +0200 Subject: [PATCH 06/11] remove binding of loadPiece to specific peer --- sources/include/extension/TorrentPlugin.hpp | 9 +- sources/src/TorrentPlugin.cpp | 95 ++++++++------------- 2 files changed, 37 insertions(+), 67 deletions(-) diff --git a/sources/include/extension/TorrentPlugin.hpp b/sources/include/extension/TorrentPlugin.hpp index 5c3f579..a318f69 100644 --- a/sources/include/extension/TorrentPlugin.hpp +++ b/sources/include/extension/TorrentPlugin.hpp @@ -298,16 +298,11 @@ class TorrentPlugin : public libtorrent::torrent_plugin { /// Sell mode spesific state - // While selling, this maintains mapping between piece index and peers that are - // waiting for this piece to be read from disk. - // Will typically just be one, but may be multiple - hence set is used - std::map > _outstandingLoadPieceForBuyerCalls; + // While selling, this maintains set of pieces peers are waiting for to be read from disk. + std::set _outstandingLoadPieceForBuyers; /// Buy mode spesific state - // While buying, this maintains mapping between piece index and the single - // peer waiting for it to be validated and stored. - std::map _outstandingFullPieceArrivedCalls; /// Utilities diff --git a/sources/src/TorrentPlugin.cpp b/sources/src/TorrentPlugin.cpp index 75284fd..41c076d 100644 --- a/sources/src/TorrentPlugin.cpp +++ b/sources/src/TorrentPlugin.cpp @@ -231,40 +231,31 @@ void TorrentPlugin::on_add_peer(const libtorrent::tcp::endpoint & endPoint, int void TorrentPlugin::pieceRead(const libtorrent::read_piece_alert * alert) { - // There should be at least one peer registered for this piece, unless they have disconnected - auto it = _outstandingLoadPieceForBuyerCalls.find(alert->piece); + // There should be a registeration for this piece, unless we have left selling mode + auto it = _outstandingLoadPieceForBuyers.find(alert->piece); - if(it == _outstandingLoadPieceForBuyerCalls.cend()) { + if(it == _outstandingLoadPieceForBuyers.cend()) { std::clog << "Ignoring piece read, must be for some other purpose." << std::endl; return; } - // Make a callback for each peer registered - const std::set & peers = it->second; + // Remove registeration + _outstandingLoadPieceForBuyers.erase(it); - // Iterate peers - for(auto peerId : peers) { - auto peerPlugin = peer(peerId); - auto endPoint = peerPlugin->endPoint(); + // Make sure reading worked + if(alert->ec) { - // Make sure reading worked - if(alert->ec) { + std::clog << "Failed reading piece" << alert->piece << std::endl; + assert(false); - std::clog << "Failed reading piece" << alert->piece << "for" << libtorrent::print_address(endPoint.address()).c_str() << std::endl; - assert(false); + } else { - } else { + std::clog << "Read piece" << alert->piece << std::endl; - std::clog << "Read piece" << alert->piece << "for" << libtorrent::print_address(endPoint.address()).c_str() << std::endl; - - // tell session - _session.pieceLoaded(peerId, protocol_wire::PieceData(alert->buffer, alert->size), alert->piece); - } + // tell session + _session.pieceLoaded(protocol_wire::PieceData(alert->buffer, alert->size), alert->piece); } - - // Remove all peers registered for this piece - _outstandingLoadPieceForBuyerCalls.erase(it); } void TorrentPlugin::start() { @@ -359,7 +350,7 @@ void TorrentPlugin::toObserveMode() { // Clear relevant mappings // NB: We are doing clearing regardless of whether operation is successful! if(_session.mode() == protocol_session::SessionMode::selling) - _outstandingLoadPieceForBuyerCalls.clear(); + _outstandingLoadPieceForBuyers.clear(); _session.toObserveMode(removeConnection()); @@ -370,7 +361,7 @@ void TorrentPlugin::toObserveMode() { void TorrentPlugin::toSellMode(const protocol_wire::SellerTerms & terms) { // Should have been cleared before - assert(_outstandingLoadPieceForBuyerCalls.empty()); + assert(_outstandingLoadPieceForBuyers.empty()); if(_torrent.status().state != libtorrent::torrent_status::state_t::seeding) { throw exception::InvalidModeTransition(); @@ -399,7 +390,7 @@ void TorrentPlugin::toBuyMode(const protocol_wire::BuyerTerms & terms) { // Clear relevant mappings // NB: We are doing clearing regardless of whether operation is successful! if(_session.mode() == protocol_session::SessionMode::selling) - _outstandingLoadPieceForBuyerCalls.clear(); + _outstandingLoadPieceForBuyers.clear(); if(_torrent.status().state != libtorrent::torrent_status::state_t::downloading) { throw exception::InvalidModeTransition(); @@ -629,16 +620,6 @@ void TorrentPlugin::removeFromSession(PeerPlugin* peerPlugin) { protocol_session::RemovedConnectionCallbackHandler TorrentPlugin::removeConnection() { return [this](const libtorrent::peer_id & peerId, protocol_session::DisconnectCause cause) { - // remove call for peer_id from _outstandingLoadPieceForBuyerCalls - for(auto mapping : _outstandingLoadPieceForBuyerCalls) { - auto calls = mapping.second; - - auto call = calls.find(peerId); - - if(call != calls.end()) - calls.erase(call); - } - // Send notification auto peerPlugin = peer(peerId); auto endPoint = peerPlugin->endPoint(); @@ -702,46 +683,40 @@ protocol_session::FullPieceArrived TorrentPlugin::fullPiece }; } +/// protocol_session::LoadPieceForBuyer TorrentPlugin::loadPieceForBuyer() { return [this](const libtorrent::peer_id & peerId, int index) -> void { + // See if we have previous calls for this piece + auto it = this->_outstandingLoadPieceForBuyers.find(index); - // Get reference to, possibly new - and hence empty, set of calls for given piece index - std::set & callSet = this->_outstandingLoadPieceForBuyerCalls[index]; - - // Was there no previous calls for this piece? - const bool noPreviousCalls = callSet.empty(); - - // Remember to notify this endpoint when piece is loaded - // NB it is important the callSet be updated before call to read_piece below as a piece could be read in the - // same call triggering re-entry into hanlding read_piece_alert which checks this set then erases it - callSet.insert(peerId); + bool noPreviousCall = it == this->_outstandingLoadPieceForBuyers.end(); auto endPoint = peer(peerId)->endPoint(); - if(noPreviousCalls) { - std::clog << "Requested piece " - << index - << " by" - << libtorrent::print_address(endPoint.address()).c_str() - << std::endl; + if(noPreviousCall) { + // Remember to notify session when piece is loaded + // NB it is important the set be updated before call to read_piece below as a piece could be read in the + // same call triggering re-entry into hanlding read_piece_alert which checks this set + this->_outstandingLoadPieceForBuyers.insert(index); - // Make first call - torrent()->read_piece(index); + std::clog << "Requested piece " + << index + << " by" + << libtorrent::print_address(endPoint.address()).c_str() + << std::endl; - } else { + // Make first call + torrent()->read_piece(index); - // otherwise we dont need to make a new call, a response will come from libtorrent - std::clog << "[" - << _outstandingLoadPieceForBuyerCalls[index].size() - << "] Skipping reading requeted piece " + } else { + // We dont need to make a new call, a response will come from libtorrent + std::clog << "Skipping reading of requested piece " << index << " by" << libtorrent::print_address(endPoint.address()).c_str() << std::endl; - } - }; } From 0b5b9202fd173989b668f2828b74781b5155e7cc Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Sat, 27 Jan 2018 11:29:33 +0200 Subject: [PATCH 07/11] bump version to 0.2.0, update dependencies for protocol upgrade --- conan_package/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/conan_package/base.py b/conan_package/base.py index fe7dd1d..4292408 100644 --- a/conan_package/base.py +++ b/conan_package/base.py @@ -3,14 +3,14 @@ class ExtensionBase(ConanFile): name = "Extension" - version = "0.1.6" + version = "0.2.0" license = "(c) JoyStream Inc. 2016-2017" url = "https://github.com/JoyStream/extension-cpp.git" repo_ssh_url = "git@github.com:JoyStream/extension-cpp.git" repo_https_url = "https://github.com/JoyStream/extension-cpp.git" settings = "os", "compiler", "build_type", "arch" generators = "cmake" - requires = "ProtocolSession/0.1.4@joystream/stable", "Libtorrent/1.1.1@joystream/stable" + requires = "ProtocolSession/0.2.0@joystream/stable", "Libtorrent/1.1.1@joystream/stable", "Common/0.1.3@joystream/stable" build_policy = "missing" def source(self): From 1c671bb87b99fc19296498ade2dcd2485e9cfa46 Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Wed, 31 Jan 2018 08:48:33 +0200 Subject: [PATCH 08/11] disconnect slow sellers when buying --- sources/include/extension/Common.hpp | 11 +++++++++++ sources/include/extension/TorrentPlugin.hpp | 6 ++++++ sources/src/Common.cpp | 15 ++++++++++++++ sources/src/TorrentPlugin.cpp | 22 +++++++++++++++++++-- 4 files changed, 52 insertions(+), 2 deletions(-) diff --git a/sources/include/extension/Common.hpp b/sources/include/extension/Common.hpp index 193202a..133e966 100644 --- a/sources/include/extension/Common.hpp +++ b/sources/include/extension/Common.hpp @@ -11,6 +11,7 @@ #include #include #include +#include namespace std { @@ -27,4 +28,14 @@ struct hash { }; } +namespace joystream { +namespace extension { + +std::chrono::duration +calculatePieceTimeout(const double & pieceLengthBytes, + const double & targetRateBytesPerSecond, + const double & minTimeoutSeconds); + +} +} #endif // JOYSTREAM_EXTENSION_COMMON_HPP diff --git a/sources/include/extension/TorrentPlugin.hpp b/sources/include/extension/TorrentPlugin.hpp index a318f69..8aabcc0 100644 --- a/sources/include/extension/TorrentPlugin.hpp +++ b/sources/include/extension/TorrentPlugin.hpp @@ -14,6 +14,7 @@ #include #include #include +#include namespace joystream { namespace extension { @@ -249,6 +250,11 @@ class TorrentPlugin : public libtorrent::torrent_plugin { // a) PeerPlugin::on_extension_handshake: sent extended message, despite claiming not to support BEP10 std::set _misbehavedPeers; + // An upper bound on the amount of time to allow a seller to service one piece request before we + // ask the session to disconnect them. This is set to a reasonable low value based on + // size of the torrent piece when we go to buy mode. value of zero means sellers will not be timed out. + std::chrono::duration _maxTimeToServicePiece; + // Torrent info hash const libtorrent::sha1_hash _infoHash; diff --git a/sources/src/Common.cpp b/sources/src/Common.cpp index 392bbee..43a51e9 100644 --- a/sources/src/Common.cpp +++ b/sources/src/Common.cpp @@ -24,3 +24,18 @@ size_t hash::operator()(const libtorrent::peer_id & peerId) } } + +namespace joystream { +namespace extension { + + std::chrono::duration + calculatePieceTimeout(const double & pieceLengthBytes, + const double & targetRateBytesPerSecond, + const double & minTimeoutSeconds) { + + double targetTimeout = std::ceil(pieceLengthBytes / targetRateBytesPerSecond); + int timeout = std::max(targetTimeout, minTimeoutSeconds); + return std::chrono::seconds(timeout); + } +} +} diff --git a/sources/src/TorrentPlugin.cpp b/sources/src/TorrentPlugin.cpp index 41c076d..0999014 100644 --- a/sources/src/TorrentPlugin.cpp +++ b/sources/src/TorrentPlugin.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -16,6 +17,8 @@ #include // print_endpoint #include +#include // std::max + namespace joystream { namespace protocol_session { @@ -47,7 +50,8 @@ TorrentPlugin::TorrentPlugin(Plugin * plugin, , _alertManager(alertManager) , _policy(policy) , _libtorrentInteraction(libtorrentInteraction) - , _infoHash(torrent.info_hash()) { + , _infoHash(torrent.info_hash()) + , _maxTimeToServicePiece(std::chrono::duration::zero()) { } TorrentPlugin::~TorrentPlugin() { @@ -176,8 +180,15 @@ void TorrentPlugin::on_piece_failed(int index) { void TorrentPlugin::tick() { // Asynch processing in session if its setup - if(_session.mode() != protocol_session::SessionMode::not_set) + if(_session.mode() != protocol_session::SessionMode::not_set) { _session.tick(); + } + + if(_session.mode() == protocol_session::SessionMode::buying && + _maxTimeToServicePiece != std::chrono::duration::zero()) { + + _session.disconnectSlowSellers(_maxTimeToServicePiece); + } } bool TorrentPlugin::on_resume() { @@ -403,6 +414,13 @@ void TorrentPlugin::toBuyMode(const protocol_wire::BuyerTerms & terms) { torrentPieceInformation(), allSellersGone()); + // Set maxium time to service a piece based on its size, using a target download rate + // Assuming uniform piece size across torrent + const double pieceSize = torrent()->torrent_file().piece_length(); // Bytes + const double targetRate = 10000; // Bytes/s + const double minTimeout = 3; // lower bound + _maxTimeToServicePiece = calculatePieceTimeout(pieceSize, targetRate, minTimeout); + // Send notification _alertManager->emplace_alert(_torrent, terms); } From b1e4697c5b7e482c9665b733ff8a4207d149d88f Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Wed, 31 Jan 2018 09:38:47 +0200 Subject: [PATCH 09/11] don't ban timed out peers --- sources/src/TorrentPlugin.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sources/src/TorrentPlugin.cpp b/sources/src/TorrentPlugin.cpp index 0999014..05073ad 100644 --- a/sources/src/TorrentPlugin.cpp +++ b/sources/src/TorrentPlugin.cpp @@ -650,9 +650,12 @@ protocol_session::RemovedConnectionCallbackHandler TorrentP if(cause == protocol_session::DisconnectCause::client) return; else { - std::clog << "Adding peer to misbehavedPeers list: " << endPoint << " cause: " << (int)cause << std::endl; - // all other reasons are considered misbehaviour - _misbehavedPeers.insert(endPoint); + // Add peer to banlist unless it was just due to timeout + if (cause != protocol_session::DisconnectCause::seller_servicing_piece_has_timed_out) { + std::clog << "Adding peer to misbehavedPeers list: " << endPoint << " cause: " << (int)cause << std::endl; + // all other reasons are considered misbehaviour + _misbehavedPeers.insert(endPoint); + } } // *** Record cause for some purpose? *** From 45e5a9a5eeda897ddfeb7eeaa8e4851d1bcbc704 Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Fri, 2 Feb 2018 13:19:41 +0200 Subject: [PATCH 10/11] let Buyer session handle disconnecting timedout sellers --- conan_package/base.py | 2 +- sources/include/extension/Common.hpp | 11 -------- sources/src/Common.cpp | 15 ----------- sources/src/TorrentPlugin.cpp | 40 +++++++++++++++++----------- 4 files changed, 25 insertions(+), 43 deletions(-) diff --git a/conan_package/base.py b/conan_package/base.py index 4292408..76876aa 100644 --- a/conan_package/base.py +++ b/conan_package/base.py @@ -10,7 +10,7 @@ class ExtensionBase(ConanFile): repo_https_url = "https://github.com/JoyStream/extension-cpp.git" settings = "os", "compiler", "build_type", "arch" generators = "cmake" - requires = "ProtocolSession/0.2.0@joystream/stable", "Libtorrent/1.1.1@joystream/stable", "Common/0.1.3@joystream/stable" + requires = "ProtocolSession/0.2.1@joystream/stable", "Libtorrent/1.1.1@joystream/stable", "Common/0.1.3@joystream/stable" build_policy = "missing" def source(self): diff --git a/sources/include/extension/Common.hpp b/sources/include/extension/Common.hpp index 133e966..193202a 100644 --- a/sources/include/extension/Common.hpp +++ b/sources/include/extension/Common.hpp @@ -11,7 +11,6 @@ #include #include #include -#include namespace std { @@ -28,14 +27,4 @@ struct hash { }; } -namespace joystream { -namespace extension { - -std::chrono::duration -calculatePieceTimeout(const double & pieceLengthBytes, - const double & targetRateBytesPerSecond, - const double & minTimeoutSeconds); - -} -} #endif // JOYSTREAM_EXTENSION_COMMON_HPP diff --git a/sources/src/Common.cpp b/sources/src/Common.cpp index 43a51e9..392bbee 100644 --- a/sources/src/Common.cpp +++ b/sources/src/Common.cpp @@ -24,18 +24,3 @@ size_t hash::operator()(const libtorrent::peer_id & peerId) } } - -namespace joystream { -namespace extension { - - std::chrono::duration - calculatePieceTimeout(const double & pieceLengthBytes, - const double & targetRateBytesPerSecond, - const double & minTimeoutSeconds) { - - double targetTimeout = std::ceil(pieceLengthBytes / targetRateBytesPerSecond); - int timeout = std::max(targetTimeout, minTimeoutSeconds); - return std::chrono::seconds(timeout); - } -} -} diff --git a/sources/src/TorrentPlugin.cpp b/sources/src/TorrentPlugin.cpp index 05073ad..f694086 100644 --- a/sources/src/TorrentPlugin.cpp +++ b/sources/src/TorrentPlugin.cpp @@ -38,6 +38,16 @@ namespace protocol_session { namespace extension { +std::chrono::duration +calculatePieceTimeout(const double & pieceLengthBytes, + const double & targetRateBytesPerSecond, + const double & minTimeoutSeconds) { + + double targetTimeout = std::ceil(pieceLengthBytes / targetRateBytesPerSecond); + int timeout = std::max(targetTimeout, minTimeoutSeconds); + return std::chrono::seconds(timeout); +} + TorrentPlugin::TorrentPlugin(Plugin * plugin, const libtorrent::torrent_handle & torrent, uint minimumMessageId, @@ -183,12 +193,6 @@ void TorrentPlugin::tick() { if(_session.mode() != protocol_session::SessionMode::not_set) { _session.tick(); } - - if(_session.mode() == protocol_session::SessionMode::buying && - _maxTimeToServicePiece != std::chrono::duration::zero()) { - - _session.disconnectSlowSellers(_maxTimeToServicePiece); - } } bool TorrentPlugin::on_resume() { @@ -205,8 +209,18 @@ void TorrentPlugin::on_files_checked() { // nothing to do } -void TorrentPlugin::on_state(int) { - // nothing to do +void TorrentPlugin::on_state(int state) { + + // When the torrent goes into downloading state we calculate the default maximum time to service + // a piece, which is given to the buying session to know when to timeout sellers. + if (state == libtorrent::torrent_status::state_t::downloading) { + // Set maxium time to service a piece based on its size, using a target download rate + // Assuming uniform piece size across torrent + const double pieceSize = torrent()->torrent_file().piece_length(); // Bytes + const double targetRate = 10000; // Bytes/s + const double minTimeout = 3; // lower bound + _maxTimeToServicePiece = calculatePieceTimeout(pieceSize, targetRate, minTimeout); + } } void TorrentPlugin::on_add_peer(const libtorrent::tcp::endpoint & endPoint, int /*src*/, int /*flags*/) { @@ -412,14 +426,8 @@ void TorrentPlugin::toBuyMode(const protocol_wire::BuyerTerms & terms) { sentPayment(), terms, torrentPieceInformation(), - allSellersGone()); - - // Set maxium time to service a piece based on its size, using a target download rate - // Assuming uniform piece size across torrent - const double pieceSize = torrent()->torrent_file().piece_length(); // Bytes - const double targetRate = 10000; // Bytes/s - const double minTimeout = 3; // lower bound - _maxTimeToServicePiece = calculatePieceTimeout(pieceSize, targetRate, minTimeout); + allSellersGone(), + _maxTimeToServicePiece); // Send notification _alertManager->emplace_alert(_torrent, terms); From 58751648531fffbb78666882564033b50df38be4 Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Fri, 2 Feb 2018 16:50:54 +0200 Subject: [PATCH 11/11] calculate max time to service piece when going to buy mode --- sources/include/extension/TorrentPlugin.hpp | 5 ----- sources/src/TorrentPlugin.cpp | 25 +++++++++------------ 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/sources/include/extension/TorrentPlugin.hpp b/sources/include/extension/TorrentPlugin.hpp index 8aabcc0..88fb289 100644 --- a/sources/include/extension/TorrentPlugin.hpp +++ b/sources/include/extension/TorrentPlugin.hpp @@ -250,11 +250,6 @@ class TorrentPlugin : public libtorrent::torrent_plugin { // a) PeerPlugin::on_extension_handshake: sent extended message, despite claiming not to support BEP10 std::set _misbehavedPeers; - // An upper bound on the amount of time to allow a seller to service one piece request before we - // ask the session to disconnect them. This is set to a reasonable low value based on - // size of the torrent piece when we go to buy mode. value of zero means sellers will not be timed out. - std::chrono::duration _maxTimeToServicePiece; - // Torrent info hash const libtorrent::sha1_hash _infoHash; diff --git a/sources/src/TorrentPlugin.cpp b/sources/src/TorrentPlugin.cpp index f694086..087d9ff 100644 --- a/sources/src/TorrentPlugin.cpp +++ b/sources/src/TorrentPlugin.cpp @@ -60,8 +60,7 @@ TorrentPlugin::TorrentPlugin(Plugin * plugin, , _alertManager(alertManager) , _policy(policy) , _libtorrentInteraction(libtorrentInteraction) - , _infoHash(torrent.info_hash()) - , _maxTimeToServicePiece(std::chrono::duration::zero()) { + , _infoHash(torrent.info_hash()) { } TorrentPlugin::~TorrentPlugin() { @@ -210,17 +209,7 @@ void TorrentPlugin::on_files_checked() { } void TorrentPlugin::on_state(int state) { - - // When the torrent goes into downloading state we calculate the default maximum time to service - // a piece, which is given to the buying session to know when to timeout sellers. - if (state == libtorrent::torrent_status::state_t::downloading) { - // Set maxium time to service a piece based on its size, using a target download rate - // Assuming uniform piece size across torrent - const double pieceSize = torrent()->torrent_file().piece_length(); // Bytes - const double targetRate = 10000; // Bytes/s - const double minTimeout = 3; // lower bound - _maxTimeToServicePiece = calculatePieceTimeout(pieceSize, targetRate, minTimeout); - } + // nothing to do } void TorrentPlugin::on_add_peer(const libtorrent::tcp::endpoint & endPoint, int /*src*/, int /*flags*/) { @@ -421,13 +410,21 @@ void TorrentPlugin::toBuyMode(const protocol_wire::BuyerTerms & terms) { throw exception::InvalidModeTransition(); } + // An upper bound on the amount of time to allow a seller to service one piece request before we + // the session should disconnect them. + // Set maxium time to service a piece based on its size, using a target download rate + // Assuming uniform piece size across torrent + const double pieceSize = torrent()->torrent_file().piece_length(); // Bytes + const double targetRate = 10000; // Bytes/s + const double minTimeout = 3; // lower bound + _session.toBuyMode(removeConnection(), fullPieceArrived(), sentPayment(), terms, torrentPieceInformation(), allSellersGone(), - _maxTimeToServicePiece); + calculatePieceTimeout(pieceSize, targetRate, minTimeout)); // Send notification _alertManager->emplace_alert(_torrent, terms);