diff --git a/conan_package/base.py b/conan_package/base.py index fe7dd1d..76876aa 100644 --- a/conan_package/base.py +++ b/conan_package/base.py @@ -3,14 +3,14 @@ class ExtensionBase(ConanFile): name = "Extension" - version = "0.1.6" + version = "0.2.0" license = "(c) JoyStream Inc. 2016-2017" url = "https://github.com/JoyStream/extension-cpp.git" repo_ssh_url = "git@github.com:JoyStream/extension-cpp.git" repo_https_url = "https://github.com/JoyStream/extension-cpp.git" settings = "os", "compiler", "build_type", "arch" generators = "cmake" - requires = "ProtocolSession/0.1.4@joystream/stable", "Libtorrent/1.1.1@joystream/stable" + requires = "ProtocolSession/0.2.1@joystream/stable", "Libtorrent/1.1.1@joystream/stable", "Common/0.1.3@joystream/stable" build_policy = "missing" def source(self): diff --git a/sources/include/extension/Alert.hpp b/sources/include/extension/Alert.hpp index 55850e6..1bf70af 100644 --- a/sources/include/extension/Alert.hpp +++ b/sources/include/extension/Alert.hpp @@ -548,6 +548,19 @@ namespace alert { const Coin::KeyPair contractKeyPair; const Coin::PubKeyHash finalPkHash; }; + + struct AllSellersGone : public libtorrent::torrent_alert { + + AllSellersGone(libtorrent::aux::stack_allocator& alloc, + const libtorrent::torrent_handle & h) + : libtorrent::torrent_alert(alloc, h) {} + + TORRENT_DEFINE_ALERT(DownloadStarted, libtorrent::user_alert_id + 32) + static const int static_category = alert::status_notification; + virtual std::string message() const override { + return torrent_alert::message() + " all sellers gone"; + } + }; } } } diff --git a/sources/include/extension/TorrentPlugin.hpp b/sources/include/extension/TorrentPlugin.hpp index c02fd65..88fb289 100644 --- a/sources/include/extension/TorrentPlugin.hpp +++ b/sources/include/extension/TorrentPlugin.hpp @@ -14,6 +14,7 @@ #include #include #include +#include namespace joystream { namespace extension { @@ -215,6 +216,7 @@ class TorrentPlugin : public libtorrent::torrent_plugin { protocol_session::AnchorAnnounced anchorAnnounced(); protocol_session::ReceivedValidPayment receivedValidPayment(); protocol_session::SentPayment sentPayment(); + protocol_session::AllSellersGone allSellersGone(); /// Members @@ -297,16 +299,11 @@ class TorrentPlugin : public libtorrent::torrent_plugin { /// Sell mode spesific state - // While selling, this maintains mapping between piece index and peers that are - // waiting for this piece to be read from disk. - // Will typically just be one, but may be multiple - hence set is used - std::map > _outstandingLoadPieceForBuyerCalls; + // While selling, this maintains set of pieces peers are waiting for to be read from disk. + std::set _outstandingLoadPieceForBuyers; /// Buy mode spesific state - // While buying, this maintains mapping between piece index and the single - // peer waiting for it to be validated and stored. - std::map _outstandingFullPieceArrivedCalls; /// Utilities diff --git a/sources/src/PeerPlugin.cpp b/sources/src/PeerPlugin.cpp index bfde3b1..e4442b0 100644 --- a/sources/src/PeerPlugin.cpp +++ b/sources/src/PeerPlugin.cpp @@ -241,6 +241,20 @@ namespace extension { return true; } + // Only communicate with peers with same protocol major version number + if(_protocolVersionOfPeer.major() != protocol_statemachine::CBStateMachine::protocolVersion.major()) { + // Mark peer as not supporting this extension + _peerPaymentBEPSupportStatus = BEPSupportStatus::not_supported; + + // Remove peer + std::clog << "Dropping Peer: (incompatible protocol vesrion)" << std::endl; + libtorrent::error_code ec; + drop(ec); + + // Keep us around + return true; + } + // Try to extract m key, if its not present, then we are done libtorrent::bdecode_node m = handshake.dict_find_dict("m"); diff --git a/sources/src/TorrentPlugin.cpp b/sources/src/TorrentPlugin.cpp index 5152892..087d9ff 100644 --- a/sources/src/TorrentPlugin.cpp +++ b/sources/src/TorrentPlugin.cpp @@ -9,11 +9,15 @@ #include #include #include +#include #include #include #include #include #include // print_endpoint +#include + +#include // std::max namespace joystream { @@ -34,6 +38,16 @@ namespace protocol_session { namespace extension { +std::chrono::duration +calculatePieceTimeout(const double & pieceLengthBytes, + const double & targetRateBytesPerSecond, + const double & minTimeoutSeconds) { + + double targetTimeout = std::ceil(pieceLengthBytes / targetRateBytesPerSecond); + int timeout = std::max(targetTimeout, minTimeoutSeconds); + return std::chrono::seconds(timeout); +} + TorrentPlugin::TorrentPlugin(Plugin * plugin, const libtorrent::torrent_handle & torrent, uint minimumMessageId, @@ -164,67 +178,20 @@ void TorrentPlugin::on_piece_pass(int index) { // Make sure we are in correct mode, as mode changed may have occured if(_session.mode() == protocol_session::SessionMode::buying) { - - auto it = _outstandingFullPieceArrivedCalls.find(index); - - // If this validation is not due to us - if(it == _outstandingFullPieceArrivedCalls.cend()) { - - // then just tell session about it - _session.pieceDownloaded(index); - - } else { - auto peerId = it->second; - auto peerPlugin = peer(peerId); - auto endPoint = peerPlugin->endPoint(); - - _alertManager->emplace_alert(_torrent, endPoint, peerId, index); - - // if its due to us, then tell session about endpoint and piece - _session.validPieceReceivedOnConnection(peerId, index); - - // and remove call - _outstandingFullPieceArrivedCalls.erase(it); - } + _session.pieceDownloaded(index); } } void TorrentPlugin::on_piece_failed(int index) { - // Make sure we are in correct mode, as mode changed may have occured - if(_session.mode() == protocol_session::SessionMode::buying) { - - auto it = _outstandingFullPieceArrivedCalls.find(index); - - // If this validation is not due to us - if(it == _outstandingFullPieceArrivedCalls.cend()) { - - // then there is nothing to do - - } else { - - // if its due to us, then - - auto peerId = it->second; - auto peerPlugin = peer(peerId); - auto endPoint = peerPlugin->endPoint(); - - _alertManager->emplace_alert(_torrent, endPoint, peerId, index); - - // tell session about endpoint and piece - _session.invalidPieceReceivedOnConnection(peerId, index); - - // and remove call - _outstandingFullPieceArrivedCalls.erase(it); - } - } } void TorrentPlugin::tick() { // Asynch processing in session if its setup - if(_session.mode() != protocol_session::SessionMode::not_set) + if(_session.mode() != protocol_session::SessionMode::not_set) { _session.tick(); + } } bool TorrentPlugin::on_resume() { @@ -241,7 +208,7 @@ void TorrentPlugin::on_files_checked() { // nothing to do } -void TorrentPlugin::on_state(int) { +void TorrentPlugin::on_state(int state) { // nothing to do } @@ -278,40 +245,31 @@ void TorrentPlugin::on_add_peer(const libtorrent::tcp::endpoint & endPoint, int void TorrentPlugin::pieceRead(const libtorrent::read_piece_alert * alert) { - // There should be at least one peer registered for this piece, unless they have disconnected - auto it = _outstandingLoadPieceForBuyerCalls.find(alert->piece); + // There should be a registeration for this piece, unless we have left selling mode + auto it = _outstandingLoadPieceForBuyers.find(alert->piece); - if(it == _outstandingLoadPieceForBuyerCalls.cend()) { + if(it == _outstandingLoadPieceForBuyers.cend()) { std::clog << "Ignoring piece read, must be for some other purpose." << std::endl; return; } - // Make a callback for each peer registered - const std::set & peers = it->second; - - // Iterate peers - for(auto peerId : peers) { - auto peerPlugin = peer(peerId); - auto endPoint = peerPlugin->endPoint(); + // Remove registeration + _outstandingLoadPieceForBuyers.erase(it); - // Make sure reading worked - if(alert->ec) { + // Make sure reading worked + if(alert->ec) { - std::clog << "Failed reading piece" << alert->piece << "for" << libtorrent::print_address(endPoint.address()).c_str() << std::endl; - assert(false); + std::clog << "Failed reading piece" << alert->piece << std::endl; + assert(false); - } else { + } else { - std::clog << "Read piece" << alert->piece << "for" << libtorrent::print_address(endPoint.address()).c_str() << std::endl; + std::clog << "Read piece" << alert->piece << std::endl; - // tell session - _session.pieceLoaded(peerId, protocol_wire::PieceData(alert->buffer, alert->size), alert->piece); - } + // tell session + _session.pieceLoaded(protocol_wire::PieceData(alert->buffer, alert->size), alert->piece); } - - // Remove all peers registered for this piece - _outstandingLoadPieceForBuyerCalls.erase(it); } void TorrentPlugin::start() { @@ -406,9 +364,7 @@ void TorrentPlugin::toObserveMode() { // Clear relevant mappings // NB: We are doing clearing regardless of whether operation is successful! if(_session.mode() == protocol_session::SessionMode::selling) - _outstandingLoadPieceForBuyerCalls.clear(); - else if(_session.mode() == protocol_session::SessionMode::buying) - _outstandingFullPieceArrivedCalls.clear(); + _outstandingLoadPieceForBuyers.clear(); _session.toObserveMode(removeConnection()); @@ -419,12 +375,7 @@ void TorrentPlugin::toObserveMode() { void TorrentPlugin::toSellMode(const protocol_wire::SellerTerms & terms) { // Should have been cleared before - assert(_outstandingLoadPieceForBuyerCalls.empty()); - - // Clear relevant mappings - // NB: We are doing clearing regardless of whether operation is successful! - if(_session.mode() == protocol_session::SessionMode::buying) - _outstandingFullPieceArrivedCalls.clear(); + assert(_outstandingLoadPieceForBuyers.empty()); if(_torrent.status().state != libtorrent::torrent_status::state_t::seeding) { throw exception::InvalidModeTransition(); @@ -450,23 +401,30 @@ void TorrentPlugin::toSellMode(const protocol_wire::SellerTerms & terms) { void TorrentPlugin::toBuyMode(const protocol_wire::BuyerTerms & terms) { - // Should have been cleared before - assert(_outstandingFullPieceArrivedCalls.empty()); - // Clear relevant mappings // NB: We are doing clearing regardless of whether operation is successful! if(_session.mode() == protocol_session::SessionMode::selling) - _outstandingLoadPieceForBuyerCalls.clear(); + _outstandingLoadPieceForBuyers.clear(); if(_torrent.status().state != libtorrent::torrent_status::state_t::downloading) { throw exception::InvalidModeTransition(); } + // An upper bound on the amount of time to allow a seller to service one piece request before we + // the session should disconnect them. + // Set maxium time to service a piece based on its size, using a target download rate + // Assuming uniform piece size across torrent + const double pieceSize = torrent()->torrent_file().piece_length(); // Bytes + const double targetRate = 10000; // Bytes/s + const double minTimeout = 3; // lower bound + _session.toBuyMode(removeConnection(), fullPieceArrived(), sentPayment(), terms, - torrentPieceInformation()); + torrentPieceInformation(), + allSellersGone(), + calculatePieceTimeout(pieceSize, targetRate, minTimeout)); // Send notification _alertManager->emplace_alert(_torrent, terms); @@ -685,28 +643,6 @@ void TorrentPlugin::removeFromSession(PeerPlugin* peerPlugin) { protocol_session::RemovedConnectionCallbackHandler TorrentPlugin::removeConnection() { return [this](const libtorrent::peer_id & peerId, protocol_session::DisconnectCause cause) { - // remove call for peerId from _outstandingFullPieceArrivedCalls - typedef std::map OutstandingFullPieceArrivedCallsMap; - - auto call = std::find_if( - std::begin(_outstandingFullPieceArrivedCalls), - std::end(_outstandingFullPieceArrivedCalls), - boost::bind(&OutstandingFullPieceArrivedCallsMap::value_type::second, _1) == peerId - ); - - if(call != std::end(_outstandingFullPieceArrivedCalls)) - _outstandingFullPieceArrivedCalls.erase(call); - - // remove call for peer_id from _outstandingLoadPieceForBuyerCalls - for(auto mapping : _outstandingLoadPieceForBuyerCalls) { - auto calls = mapping.second; - - auto call = calls.find(peerId); - - if(call != calls.end()) - calls.erase(call); - } - // Send notification auto peerPlugin = peer(peerId); auto endPoint = peerPlugin->endPoint(); @@ -719,9 +655,12 @@ protocol_session::RemovedConnectionCallbackHandler TorrentP if(cause == protocol_session::DisconnectCause::client) return; else { - std::clog << "Adding peer to misbehavedPeers list: " << endPoint << " cause: " << (int)cause << std::endl; - // all other reasons are considered misbehaviour - _misbehavedPeers.insert(endPoint); + // Add peer to banlist unless it was just due to timeout + if (cause != protocol_session::DisconnectCause::seller_servicing_piece_has_timed_out) { + std::clog << "Adding peer to misbehavedPeers list: " << endPoint << " cause: " << (int)cause << std::endl; + // all other reasons are considered misbehaviour + _misbehavedPeers.insert(endPoint); + } } // *** Record cause for some purpose? *** @@ -734,66 +673,76 @@ protocol_session::RemovedConnectionCallbackHandler TorrentP protocol_session::FullPieceArrived TorrentPlugin::fullPieceArrived() { - return [this](const libtorrent::peer_id & peerId, const protocol_wire::PieceData & pieceData, int index) -> void { + return [this](const libtorrent::peer_id & peerId, const protocol_wire::PieceData & pieceData, int index) -> bool { + auto peerPlugin = peer(peerId); + auto endPoint = peerPlugin->endPoint(); + + const auto ti = torrent()->torrent_file(); + + // test if piece data is valid + const libtorrent::sha1_hash expected = ti.hash_for_piece(index); + const libtorrent::sha1_hash computed = libtorrent::hasher(pieceData.piece().get(), pieceData.length()).final(); - // Make sure no outstanding calls exist for this index - assert(!_outstandingFullPieceArrivedCalls.count(index)); + if (computed != expected) { + _alertManager->emplace_alert(_torrent, endPoint, peerId, index); + return false; + } - _outstandingFullPieceArrivedCalls[index] = peerId; + if (!torrent()->have_piece(index)) { + + // Tell libtorrent to add and validate piece + // last argument is a flag which presently seems to only test + // flags & torrent::overwrite_existing, which seems to be whether + // the piece should be overwritten if it is already present + // + // libtorrent::torrent_plugin::on_piece_pass() + // libtorrent::torrent_plugin::on_piece_failed() + // processes result of checking + torrent()->add_piece(index, pieceData.piece().get(), 0); + } else { + // We already received the piece from another peer (most likely a non joystream peer) + } - // Tell libtorrent to validate piece - // last argument is a flag which presently seems to only test - // flags & torrent::overwrite_existing, which seems to be whether - // the piece should be overwritten if it is already present - // - // libtorrent::torrent_plugin::on_piece_pass() - // libtorrent::torrent_plugin::on_piece_failed() - // processes result of checking + _alertManager->emplace_alert(_torrent, endPoint, peerId, index); - torrent()->add_piece(index, pieceData.piece().get(), 0); + return true; }; } +/// protocol_session::LoadPieceForBuyer TorrentPlugin::loadPieceForBuyer() { return [this](const libtorrent::peer_id & peerId, int index) -> void { + // See if we have previous calls for this piece + auto it = this->_outstandingLoadPieceForBuyers.find(index); - // Get reference to, possibly new - and hence empty, set of calls for given piece index - std::set & callSet = this->_outstandingLoadPieceForBuyerCalls[index]; - - // Was there no previous calls for this piece? - const bool noPreviousCalls = callSet.empty(); - - // Remember to notify this endpoint when piece is loaded - // NB it is important the callSet be updated before call to read_piece below as a piece could be read in the - // same call triggering re-entry into hanlding read_piece_alert which checks this set then erases it - callSet.insert(peerId); + bool noPreviousCall = it == this->_outstandingLoadPieceForBuyers.end(); auto endPoint = peer(peerId)->endPoint(); - if(noPreviousCalls) { - std::clog << "Requested piece " - << index - << " by" - << libtorrent::print_address(endPoint.address()).c_str() - << std::endl; + if(noPreviousCall) { + // Remember to notify session when piece is loaded + // NB it is important the set be updated before call to read_piece below as a piece could be read in the + // same call triggering re-entry into hanlding read_piece_alert which checks this set + this->_outstandingLoadPieceForBuyers.insert(index); - // Make first call - torrent()->read_piece(index); + std::clog << "Requested piece " + << index + << " by" + << libtorrent::print_address(endPoint.address()).c_str() + << std::endl; - } else { + // Make first call + torrent()->read_piece(index); - // otherwise we dont need to make a new call, a response will come from libtorrent - std::clog << "[" - << _outstandingLoadPieceForBuyerCalls[index].size() - << "] Skipping reading requeted piece " + } else { + // We dont need to make a new call, a response will come from libtorrent + std::clog << "Skipping reading of requested piece " << index << " by" << libtorrent::print_address(endPoint.address()).c_str() << std::endl; - } - }; } @@ -858,6 +807,17 @@ protocol_session::SentPayment TorrentPlugin::sentPayment() } +protocol_session::AllSellersGone TorrentPlugin::allSellersGone() { + // Get alert manager and handle for torrent + libtorrent::torrent * t = torrent(); + libtorrent::alert_manager & manager = t->alerts(); + libtorrent::torrent_handle h = t->get_handle(); + + return [&manager, h, this](void) -> void { + manager.emplace_alert(h); + }; +} + int TorrentPlugin::pickNextPiece(const std::vector> * pieces) { libtorrent::torrent * t = torrent();