Skip to content

Commit

Permalink
Fixed bug where load balancing would receive a throttle interval of i…
Browse files Browse the repository at this point in the history
…nt max resulting in throttled cameras never updating.
  • Loading branch information
StefanFabian committed Sep 30, 2021
1 parent a859383 commit 922281b
Showing 1 changed file with 39 additions and 62 deletions.
101 changes: 39 additions & 62 deletions src/image_transport_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@ Q_OBJECT
LoadBalancer()
{
connect( &timer_, &QTimer::timeout, this, &ImageTransportManager::LoadBalancer::onTimeout );
timer_.setInterval( 33 );
timer_.setSingleShot( false );
timer_.start();
}

void setEnabled( bool value )
{
load_balancing_enabled_ = value;
}

void notifyImageReceived( ImageTransportManager::Subscription *subscription );
void notifyImageReceived( ImageTransportManager::Subscription *subscription, long throttle_interval );

void registerThrottledSubscription( ImageTransportManager::Subscription *subscription );

Expand All @@ -57,32 +60,13 @@ private slots:
std::vector<ImageTransportManager::Subscription *> ready;
for ( ; i < timeouts_.size(); ++i )
{
if ( timeouts_[i].first > now + 50 ) break; // Trigger all subscriptions that would be ready in the next 50 ms
if ( timeouts_[i].first > now ) break;
ready.push_back( timeouts_[i].second );
}
for ( auto &sub : ready ) triggerSubscription( sub );
if ( timeouts_.empty()) return;
int msec = static_cast<int>(timeouts_[0].first - now);
QMetaObject::invokeMethod( this, "startTimer", Qt::AutoConnection, Q_ARG( int, msec ));
for ( auto &sub: ready ) triggerSubscription( sub );
}

private:

Q_INVOKABLE void startTimer( int msec )
{
timer_.start( msec );
}

void startTimerIfNotActive()
{
if ( timer_.isActive() || timeouts_.empty()) return;
using namespace std::chrono;
long now = duration_cast<std::chrono::milliseconds>( system_clock::now().time_since_epoch()).count();
long msec = static_cast<int>( timeouts_[0].first - now);
if ( msec > 0 ) QMetaObject::invokeMethod( this, "startTimer", Qt::AutoConnection, Q_ARG( int, msec ));
else onTimeout();
}

void triggerSubscription( ImageTransportManager::Subscription *subscription );

void insertTimeout( long desired_throttle_interval, ImageTransportManager::Subscription *subscription );
Expand Down Expand Up @@ -112,13 +96,15 @@ Q_OBJECT

int getThrottleInterval() const
{
return std::accumulate( subscription_handles_.begin(), subscription_handles_.end(), std::numeric_limits<int>::max(),
[]( int current, const std::weak_ptr<ImageTransportSubscriptionHandle> &sub_weak )
{
std::shared_ptr<ImageTransportSubscriptionHandle> sub = sub_weak.lock();
if ( sub == nullptr ) return current;
return std::min( current, sub->throttle_interval );
} );
int interval = std::accumulate( subscription_handles_.begin(), subscription_handles_.end(),
std::numeric_limits<int>::max(),
[]( int current, const std::weak_ptr<ImageTransportSubscriptionHandle> &sub_weak )
{
std::shared_ptr<ImageTransportSubscriptionHandle> sub = sub_weak.lock();
if ( sub == nullptr ) return current;
return std::min( current, sub->throttle_interval );
} );
return interval == std::numeric_limits<int>::max() ? current_throttle_interval_ : interval;
}

void updateThrottling()
Expand All @@ -139,10 +125,6 @@ Q_OBJECT

void subscribe()
{
{
std::lock_guard<std::mutex> lock( subscribe_mutex_ );
if ( subscriber_ ) return;
}
// Subscribing on background thread to reduce load on UI thread
std::thread( [ this ]()
{
Expand Down Expand Up @@ -215,7 +197,7 @@ Q_OBJECT
std::lock_guard<std::mutex> lock( subscribe_mutex_ );
subscriber_.shutdown();
}
manager->load_balancer_->notifyImageReceived( this );
manager->load_balancer_->notifyImageReceived( this, interval );
}
else
{
Expand Down Expand Up @@ -286,7 +268,7 @@ Q_OBJECT
{
// This nested lock makes sure that our destruction of the subscription pointer will not lead to a deadlock
std::lock_guard<std::mutex> subscriptions_lock( subscriptions_mutex_ );
for ( const auto &sub_weak : subscription_handles_ )
for ( const auto &sub_weak: subscription_handles_ )
{
if ( sub_weak.expired()) continue;
subscribers.push_back( sub_weak.lock());
Expand All @@ -298,13 +280,13 @@ Q_OBJECT
network_latency_average_.add( network_latency );
int processing_latency = static_cast<int>((ros::Time::now() - received).toSec() * 1000);
processing_latency_average_.add( processing_latency );
image_interval_average_.add(base_interval);
for ( const auto &sub : subscribers )
image_interval_average_.add( base_interval );
for ( const auto &sub: subscribers )
{
if ( sub == nullptr || !sub->callback ) continue;
sub->network_latency = network_latency_average_;
sub->processing_latency = processing_latency_average_;
sub->framerate_ = std::round(1000.0 / image_interval_average_ * 10) / 10;
sub->framerate_ = std::round( 1000.0 / image_interval_average_ * 10 ) / 10;
sub->callback( frame );
}
}
Expand All @@ -313,7 +295,7 @@ Q_OBJECT
void updateSupportedFormats()
{
bool first = true;
for ( const auto &sub_weak : subscription_handles_ )
for ( const auto &sub_weak: subscription_handles_ )
{
std::shared_ptr<ImageTransportSubscriptionHandle> sub = sub_weak.lock();
if ( sub == nullptr || sub->surface == nullptr ) continue;
Expand Down Expand Up @@ -352,37 +334,32 @@ Q_OBJECT


// ============================= Load Balancing =============================
void ImageTransportManager::LoadBalancer::notifyImageReceived( ImageTransportManager::Subscription *subscription )
void ImageTransportManager::LoadBalancer::notifyImageReceived( ImageTransportManager::Subscription *subscription,
long throttle_interval )
{
std::lock_guard<std::mutex> lock( mutex_ );
waiting_subscriptions_.erase( subscription );
insertTimeout( throttle_interval, subscription );
if ( !waiting_subscriptions_.empty() && !new_subscriptions_.empty())
{
std::lock_guard<std::mutex> lock( mutex_ );
waiting_subscriptions_.erase( subscription );
insertTimeout( subscription->getThrottleInterval(), subscription );
if ( !waiting_subscriptions_.empty() && !new_subscriptions_.empty())
{
// Can trigger the next camera if we have new subscriptions
triggerSubscription( new_subscriptions_.front());
}
// Can trigger the next camera if we have new subscriptions
triggerSubscription( new_subscriptions_.front());
}
startTimerIfNotActive();
}

void ImageTransportManager::LoadBalancer::registerThrottledSubscription( Subscription *subscription )
{
{
std::lock_guard<std::mutex> lock( mutex_ );
for ( auto &timeout : timeouts_ )
if ( timeout.second == subscription )
return;
insertTimeout( subscription->getThrottleInterval(), subscription );
if ( !waiting_subscriptions_.empty())
{
new_subscriptions_.push_back( subscription );
std::lock_guard<std::mutex> lock( mutex_ );
for ( auto &timeout: timeouts_ )
if ( timeout.second == subscription )
return;
}
triggerSubscription( subscription );
insertTimeout( subscription->getThrottleInterval(), subscription );
if ( !waiting_subscriptions_.empty())
{
new_subscriptions_.push_back( subscription );
return;
}
startTimerIfNotActive();
triggerSubscription( subscription );
}

void ImageTransportManager::LoadBalancer::unregister( ImageTransportManager::Subscription *subscription )
Expand Down Expand Up @@ -411,7 +388,7 @@ void ImageTransportManager::LoadBalancer::triggerSubscription( ImageTransportMan
void ImageTransportManager::LoadBalancer::insertTimeout( const long desired_throttle_interval,
ImageTransportManager::Subscription *subscription )
{
long now = std::chrono::duration_cast<std::chrono::milliseconds>(
const auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
long timeout = now + desired_throttle_interval;
if ( !load_balancing_enabled_ )
Expand Down

0 comments on commit 922281b

Please sign in to comment.