diff --git a/conan_package/base.py b/conan_package/base.py index 723f3d3..e66a305 100644 --- a/conan_package/base.py +++ b/conan_package/base.py @@ -3,7 +3,7 @@ class ProtocolSessionBase(ConanFile): name = "ProtocolSession" - version = "0.2.0" + version = "0.2.1" license = "(c) JoyStream Inc. 2016-2017" url = "https://github.com/JoyStream/protocol_session-cpp.git" repo_ssh_url = "git@github.com:JoyStream/protocol_session-cpp.git" diff --git a/sources/include/protocol_session/Session.cpp b/sources/include/protocol_session/Session.cpp index 21a5645..67c37cf 100644 --- a/sources/include/protocol_session/Session.cpp +++ b/sources/include/protocol_session/Session.cpp @@ -166,7 +166,8 @@ namespace protocol_session { const SentPayment & sentPayment, const protocol_wire::BuyerTerms & terms, const TorrentPieceInformation & information, - const AllSellersGone & allSellersGone) { + const AllSellersGone & allSellersGone, + std::chrono::duration maxTimeToServicePiece) { // Prepare for exiting current state switch(_mode) { @@ -210,7 +211,8 @@ namespace protocol_session { sentPayment, terms, information, - allSellersGone); + allSellersGone, + maxTimeToServicePiece); } template diff --git a/sources/include/protocol_session/Session.hpp b/sources/include/protocol_session/Session.hpp index b86c0df..cb702fe 100644 --- a/sources/include/protocol_session/Session.hpp +++ b/sources/include/protocol_session/Session.hpp @@ -15,6 +15,7 @@ #include #include +#include // ConnectionIdType: Type for identifying connections. // 1) must be possible to use as key in std::map @@ -87,7 +88,8 @@ namespace detail { const SentPayment &, const protocol_wire::BuyerTerms &, const TorrentPieceInformation &, - const AllSellersGone &); + const AllSellersGone &, + std::chrono::duration = std::chrono::duration::zero()); /** * Warning: Do not call any of these operations diff --git a/sources/include/protocol_session/detail/Buying.cpp b/sources/include/protocol_session/detail/Buying.cpp index 1042f9b..8db6238 100644 --- a/sources/include/protocol_session/detail/Buying.cpp +++ b/sources/include/protocol_session/detail/Buying.cpp @@ -28,7 +28,8 @@ namespace detail { const SentPayment & sentPayment, const protocol_wire::BuyerTerms & terms, const TorrentPieceInformation & information, - const AllSellersGone & allSellersGone) + const AllSellersGone & allSellersGone, + std::chrono::duration maxTimeToServicePiece) : _session(session) , _removedConnection(removedConnection) , _fullPieceArrived(fullPieceArrived) @@ -37,7 +38,8 @@ namespace detail { , _terms(terms) , _numberOfMissingPieces(0) , _allSellersGone(allSellersGone) - , _maxConcurrentRequests(4) { + , _maxConcurrentRequests(4) + , _maxTimeToServicePiece(maxTimeToServicePiece) { //, _lastStartOfSendingInvitations(0) { // Setup pieces @@ -86,20 +88,6 @@ namespace detail { removeConnection(id, DisconnectCause::client); } - template - void Buying::disconnectSlowSellers(const std::chrono::duration & limit) { - - for(auto mapping : _sellers) { - auto seller = mapping.second; - - if (seller.isGone()) continue; - - if (seller.servicingPieceHasTimedOut(limit)) { - removeConnection(seller.connection()->connectionId(), DisconnectCause::seller_servicing_piece_has_timed_out); - } - } - } - template void Buying::validPieceReceivedOnConnection(detail::Seller &seller, int index) { // Cannot happen when stopped, as there are no connections @@ -286,8 +274,8 @@ namespace detail { // Only process if we are active if(_session->_state == SessionState::started) { + // Disconnect timed out sellers // Allocate pieces if we are downloading - // Timeout sellers if they have not seviced a piece in time. // Reset state to allow restarting downloading after all sellers are gone if(_state == BuyingState::downloading) { @@ -296,8 +284,16 @@ namespace detail { // Reference to seller detail::Seller & s = mapping.second; + if (s.isGone()) continue; + + // Disconnect if seller timed-out servicing request + if (s.servicingPieceHasTimedOut(_maxTimeToServicePiece)) { + removeConnection(s.connection()->connectionId(), DisconnectCause::seller_servicing_piece_has_timed_out); + continue; + } + // A seller may be waiting to be assigned a new piece - if(!s.isGone() && s.piecesAwaitingArrival().size() == 0) { + if(s.piecesAwaitingArrival().size() == 0) { // This can happen when a seller has previously uploaded a valid piece, // but there were no unassigned pieces at that time, diff --git a/sources/include/protocol_session/detail/Buying.hpp b/sources/include/protocol_session/detail/Buying.hpp index 03822f4..2cb01ee 100644 --- a/sources/include/protocol_session/detail/Buying.hpp +++ b/sources/include/protocol_session/detail/Buying.hpp @@ -48,7 +48,8 @@ class Buying { const SentPayment &, const protocol_wire::BuyerTerms &, const TorrentPieceInformation &, - const AllSellersGone &); + const AllSellersGone &, + std::chrono::duration = std::chrono::duration::zero()); //// Connection level client events @@ -58,9 +59,6 @@ class Buying { // Remove connection void removeConnection(const ConnectionIdType &); - // Disconnect seller connections if it has taken longer than `limit` to service next expected piece - void disconnectSlowSellers(const std::chrono::duration & limit); - // Transition to BuyingState::sending_invitations void startDownloading(const Coin::Transaction & contractTx, const PeerToStartDownloadInformationMap & peerToStartDownloadInformationMap); @@ -188,6 +186,8 @@ class Buying { // Maximum number of concurrent requests to send before waiting for piece responses // The optimum value depends on many factors. It is hardcoded to 4 for now. const int _maxConcurrentRequests; + + std::chrono::duration _maxTimeToServicePiece; }; } diff --git a/sources/include/protocol_session/detail/Seller.cpp b/sources/include/protocol_session/detail/Seller.cpp index 29c70b0..0b67e45 100644 --- a/sources/include/protocol_session/detail/Seller.cpp +++ b/sources/include/protocol_session/detail/Seller.cpp @@ -127,6 +127,9 @@ namespace detail { if(_piecesAwaitingArrival.size() == 0) return false; + if(timeOutLimit == std::chrono::duration::zero()) + return false; + // Get current time auto now = std::chrono::high_resolution_clock::now();