Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conan_package/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class ProtocolSessionBase(ConanFile):
name = "ProtocolSession"
version = "0.2.0"
version = "0.2.1"
license = "(c) JoyStream Inc. 2016-2017"
url = "https://github.com/JoyStream/protocol_session-cpp.git"
repo_ssh_url = "git@github.com:JoyStream/protocol_session-cpp.git"
Expand Down
6 changes: 4 additions & 2 deletions sources/include/protocol_session/Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ namespace protocol_session {
const SentPayment<ConnectionIdType> & sentPayment,
const protocol_wire::BuyerTerms & terms,
const TorrentPieceInformation & information,
const AllSellersGone & allSellersGone) {
const AllSellersGone & allSellersGone,
std::chrono::duration<double> maxTimeToServicePiece) {

// Prepare for exiting current state
switch(_mode) {
Expand Down Expand Up @@ -210,7 +211,8 @@ namespace protocol_session {
sentPayment,
terms,
information,
allSellersGone);
allSellersGone,
maxTimeToServicePiece);
}

template <class ConnectionIdType>
Expand Down
4 changes: 3 additions & 1 deletion sources/include/protocol_session/Session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <protocol_session/SessionState.hpp>

#include <unordered_map>
#include <chrono>

// ConnectionIdType: Type for identifying connections.
// 1) must be possible to use as key in std::map
Expand Down Expand Up @@ -87,7 +88,8 @@ namespace detail {
const SentPayment<ConnectionIdType> &,
const protocol_wire::BuyerTerms &,
const TorrentPieceInformation &,
const AllSellersGone &);
const AllSellersGone &,
std::chrono::duration<double> = std::chrono::duration<double>::zero());

/**
* Warning: Do not call any of these operations
Expand Down
32 changes: 14 additions & 18 deletions sources/include/protocol_session/detail/Buying.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ namespace detail {
const SentPayment<ConnectionIdType> & sentPayment,
const protocol_wire::BuyerTerms & terms,
const TorrentPieceInformation & information,
const AllSellersGone & allSellersGone)
const AllSellersGone & allSellersGone,
std::chrono::duration<double> maxTimeToServicePiece)
: _session(session)
, _removedConnection(removedConnection)
, _fullPieceArrived(fullPieceArrived)
Expand All @@ -37,7 +38,8 @@ namespace detail {
, _terms(terms)
, _numberOfMissingPieces(0)
, _allSellersGone(allSellersGone)
, _maxConcurrentRequests(4) {
, _maxConcurrentRequests(4)
, _maxTimeToServicePiece(maxTimeToServicePiece) {
//, _lastStartOfSendingInvitations(0) {

// Setup pieces
Expand Down Expand Up @@ -86,20 +88,6 @@ namespace detail {
removeConnection(id, DisconnectCause::client);
}

template <class ConnectionIdType>
void Buying<ConnectionIdType>::disconnectSlowSellers(const std::chrono::duration<double> & limit) {

for(auto mapping : _sellers) {
auto seller = mapping.second;

if (seller.isGone()) continue;

if (seller.servicingPieceHasTimedOut(limit)) {
removeConnection(seller.connection()->connectionId(), DisconnectCause::seller_servicing_piece_has_timed_out);
}
}
}

template <class ConnectionIdType>
void Buying<ConnectionIdType>::validPieceReceivedOnConnection(detail::Seller<ConnectionIdType> &seller, int index) {
// Cannot happen when stopped, as there are no connections
Expand Down Expand Up @@ -286,8 +274,8 @@ namespace detail {
// Only process if we are active
if(_session->_state == SessionState::started) {

// Disconnect timed out sellers
// Allocate pieces if we are downloading
// Timeout sellers if they have not seviced a piece in time.
// Reset state to allow restarting downloading after all sellers are gone
if(_state == BuyingState::downloading) {

Expand All @@ -296,8 +284,16 @@ namespace detail {
// Reference to seller
detail::Seller<ConnectionIdType> & s = mapping.second;

if (s.isGone()) continue;

// Disconnect if seller timed-out servicing request
if (s.servicingPieceHasTimedOut(_maxTimeToServicePiece)) {
removeConnection(s.connection()->connectionId(), DisconnectCause::seller_servicing_piece_has_timed_out);
continue;
}

// A seller may be waiting to be assigned a new piece
if(!s.isGone() && s.piecesAwaitingArrival().size() == 0) {
if(s.piecesAwaitingArrival().size() == 0) {

// This can happen when a seller has previously uploaded a valid piece,
// but there were no unassigned pieces at that time,
Expand Down
8 changes: 4 additions & 4 deletions sources/include/protocol_session/detail/Buying.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class Buying {
const SentPayment<ConnectionIdType> &,
const protocol_wire::BuyerTerms &,
const TorrentPieceInformation &,
const AllSellersGone &);
const AllSellersGone &,
std::chrono::duration<double> = std::chrono::duration<double>::zero());

//// Connection level client events

Expand All @@ -58,9 +59,6 @@ class Buying {
// Remove connection
void removeConnection(const ConnectionIdType &);

// Disconnect seller connections if it has taken longer than `limit` to service next expected piece
void disconnectSlowSellers(const std::chrono::duration<double> & limit);

// Transition to BuyingState::sending_invitations
void startDownloading(const Coin::Transaction & contractTx,
const PeerToStartDownloadInformationMap<ConnectionIdType> & peerToStartDownloadInformationMap);
Expand Down Expand Up @@ -188,6 +186,8 @@ class Buying {
// Maximum number of concurrent requests to send before waiting for piece responses
// The optimum value depends on many factors. It is hardcoded to 4 for now.
const int _maxConcurrentRequests;

std::chrono::duration<double> _maxTimeToServicePiece;
};

}
Expand Down
3 changes: 3 additions & 0 deletions sources/include/protocol_session/detail/Seller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ namespace detail {
if(_piecesAwaitingArrival.size() == 0)
return false;

if(timeOutLimit == std::chrono::duration<double>::zero())
return false;

// Get current time
auto now = std::chrono::high_resolution_clock::now();

Expand Down