Skip to content

Commit

Permalink
Added camera information including framerate, processing and network …
Browse files Browse the repository at this point in the history
…latency.

Improved reassignment of ImageSubscriber to other video surface.
Fixed race condition where throttled camera would receive no images.
  • Loading branch information
StefanFabian committed Jul 21, 2021
1 parent a0ab0d7 commit 7da46ce
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 23 deletions.
43 changes: 43 additions & 0 deletions include/qml_ros_plugin/helpers/rolling_average.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//
// Created by stefan on 21.07.21.
//

#ifndef QML_ROS_PLUGIN_ROLLING_AVERAGE_H
#define QML_ROS_PLUGIN_ROLLING_AVERAGE_H

#include <array>

namespace qml_ros_plugin
{

template<typename T, int COUNT>
class RollingAverage
{
public:
RollingAverage()
{
values_.fill( 0 );
}

void add( const T &value )
{
if (count_values_ == COUNT) sum_ -= values_[index_];
values_[index_] = value;
sum_ += value;
if ( ++index_ == COUNT ) index_ = 0;
if (count_values_ != COUNT) ++count_values_;
}

T value() const { return count_values_ == 0 ? 0 : sum_ / count_values_; }

operator T() const { return value(); }

private:
std::array<T, COUNT> values_;
T sum_ = 0;
size_t count_values_ = 0;
size_t index_ = 0;
};
}

#endif //QML_ROS_PLUGIN_ROLLING_AVERAGE_H
20 changes: 18 additions & 2 deletions include/qml_ros_plugin/image_transport_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,29 @@ class ImageTransportSubscriptionHandle
//! The images may still arrive at a higher rate if other subscriptions request it.
void updateThrottleInterval( int interval );

std::string getTopic();
//! The subscribed topic. Once subscribed this is the full topic name without the transport.
std::string getTopic() const;

//! The full latency (in ms) from camera to your display excluding drawing time.
int latency() const;

//! The latency (in ms) from the camera to the reception of the image in this node.
int networkLatency() const;

//! The latency (in ms) from the reception of the image until it is in a displayable format.
int processingLatency() const;

//! The framerate (in frames per second).
double framerate() const;

private:
int throttle_interval = 0;
std::shared_ptr<ImageTransportManager::Subscription> subscription;
QAbstractVideoSurface *surface = nullptr;
std::function<void( const QVideoFrame & )> callback;
double framerate_ = 0;
int throttle_interval = 0;
int network_latency = -1;
int processing_latency = -1;

friend class ImageTransportManager;
};
Expand Down
29 changes: 29 additions & 0 deletions include/qml_ros_plugin/image_transport_subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ Q_OBJECT
Q_PROPERTY(QString defaultTransport READ defaultTransport WRITE setDefaultTransport NOTIFY defaultTransportChanged)
//! Whether or not this ImageTransportSubscriber is subscribed to the given topic
Q_PROPERTY(bool subscribed READ subscribed NOTIFY subscribedChanged)
//! The latency from the sender to the received time in ms not including the conversion latency before displaying.
//! This latency is based on the ROS time of the sending and receiving machines, hence, they need to be synchronized.
Q_PROPERTY(int networkLatency READ networkLatency NOTIFY networkLatencyChanged)
//! The latency (in ms) from the reception of the image until it is in a displayable format.
Q_PROPERTY(int processingLatency READ processingLatency NOTIFY processingLatencyChanged)
//! The full latency (in ms) from the camera to your display excluding drawing time.
Q_PROPERTY(int latency READ latency NOTIFY latencyChanged)
//! The framerate of the received camera frames in frames per second.
Q_PROPERTY(double framerate READ framerate NOTIFY framerateChanged)
//! The timeout when no image is received until a blank frame is served. Set to 0 to disable and always show last frame.
//! Default is 3000 ms.
Q_PROPERTY(int timeout READ timeout WRITE setTimeout NOTIFY timeoutChanged)
Expand Down Expand Up @@ -68,6 +77,14 @@ Q_OBJECT

void setThrottleRate( double value );

double framerate() const;

int latency() const;

int networkLatency() const;

int processingLatency() const;

signals:

void topicChanged();
Expand All @@ -80,6 +97,14 @@ Q_OBJECT

void throttleRateChanged();

void framerateChanged();

void latencyChanged();

void networkLatencyChanged();

void processingLatencyChanged();

private slots:

void onNodeHandleReady();
Expand All @@ -99,12 +124,16 @@ private slots:
QVideoSurfaceFormat format_;
QString topic_;
QString default_transport_;
QVideoFrame last_frame_;
NodeHandle::Ptr nh_;
std::shared_ptr<ImageTransportSubscriptionHandle> subscription_;
QAbstractVideoSurface *surface_ = nullptr;
ros::Time last_frame_timestamp_;
double last_framerate_ = 0;
quint32 queue_size_;
int throttle_interval_ = 0;
int last_network_latency_ = -1;
int last_processing_latency_ = -1;
int timeout_ = 3000;
bool subscribed_ = false;
};
Expand Down
76 changes: 64 additions & 12 deletions src/image_transport_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//

#include "qml_ros_plugin/image_transport_manager.h"

#include "qml_ros_plugin/helpers/rolling_average.h"
#include "qml_ros_plugin/image_buffer.h"

#include <QTimer>
Expand Down Expand Up @@ -137,7 +139,10 @@ Q_OBJECT

void subscribe()
{
if ( subscriber_ ) return;
{
std::lock_guard<std::mutex> lock( subscribe_mutex_ );
if ( subscriber_ ) return;
}
// Subscribing on background thread to reduce load on UI thread
std::thread( [ this ]()
{
Expand Down Expand Up @@ -204,10 +209,13 @@ Q_OBJECT
int interval = getThrottleInterval();
if ( throttled_ )
{
manager->load_balancer_->notifyImageReceived( this );
if ( interval != 0 && interval > camera_base_interval_ )
{
subscriber_.shutdown();
{
std::lock_guard<std::mutex> lock( subscribe_mutex_ );
subscriber_.shutdown();
}
manager->load_balancer_->notifyImageReceived( this );
}
else
{
Expand All @@ -216,10 +224,13 @@ Q_OBJECT
throttled_ = false;
}
}
else if ( interval != 0 && interval > camera_base_interval_ * 1.1 )
else if ( interval != 0 && interval > camera_base_interval_ * 1.1 ) // 10% off for hysteresis to prevent oscillation
{
// Need to throttle now
subscriber_.shutdown();
{
std::lock_guard<std::mutex> lock( subscribe_mutex_ );
subscriber_.shutdown();
}
manager->load_balancer_->registerThrottledSubscription( this );
throttled_ = true;
}
Expand All @@ -242,6 +253,10 @@ Q_OBJECT
camera_base_interval_ = static_cast<int>((image->header.stamp - last_image_->header.stamp).toNSec() /
(1000 * 1000));
}
else if ( throttled_ )
{
current_throttle_interval_ = interval;
}
last_received_stamp_ = received_stamp;
last_image_ = image;
last_buffer_ = buffer;
Expand All @@ -254,11 +269,15 @@ Q_OBJECT
{
ImageBuffer *buffer;
sensor_msgs::ImageConstPtr image;
ros::Time received;
int base_interval;
{
std::lock_guard<std::mutex> image_lock( image_mutex_ );
if ( last_buffer_ == nullptr || last_image_ == nullptr ) return;
buffer = last_buffer_;
image = last_image_;
received = last_received_stamp_;
base_interval = throttled_ ? current_throttle_interval_ : camera_base_interval_;
last_buffer_ = nullptr;
}
QVideoFrame frame( buffer, QSize( image->width, image->height ), buffer->format());
Expand All @@ -272,9 +291,19 @@ Q_OBJECT
subscribers.push_back( sub_weak.lock());
}
}
const ros::Time &image_stamp = image->header.stamp;
int network_latency = !image_stamp.isZero() ? static_cast<int>((received - image_stamp).toSec() * 1000)
: -1;
network_latency_average_.add( network_latency );
int processing_latency = static_cast<int>((ros::Time::now() - received).toSec() * 1000);
processing_latency_average_.add( processing_latency );
if ( base_interval > 0 ) framerate_average_.add( 1000.0 / base_interval );
for ( const auto &sub : subscribers )
{
if ( sub == nullptr || !sub->callback ) continue;
sub->network_latency = network_latency_average_;
sub->processing_latency = processing_latency_average_;
sub->framerate_ = std::round(framerate_average_ * 10) / 10;
sub->callback( frame );
}
}
Expand Down Expand Up @@ -309,10 +338,14 @@ Q_OBJECT
std::vector<ImageTransportSubscriptionHandle *> subscriptions_;
std::vector<std::weak_ptr<ImageTransportSubscriptionHandle>> subscription_handles_;
QList<QVideoFrame::PixelFormat> supported_formats_;
ImageBuffer *last_buffer_ = nullptr;
sensor_msgs::ImageConstPtr last_image_;
RollingAverage<double, 10> framerate_average_;
RollingAverage<int, 10> network_latency_average_;
RollingAverage<int, 10> processing_latency_average_;
ros::Time last_received_stamp_;
sensor_msgs::ImageConstPtr last_image_;
ImageBuffer *last_buffer_ = nullptr;
int camera_base_interval_ = 0;
int current_throttle_interval_ = 0;
bool throttled_ = false;
};

Expand All @@ -337,7 +370,9 @@ void ImageTransportManager::LoadBalancer::registerThrottledSubscription( Subscri
{
{
std::lock_guard<std::mutex> lock( mutex_ );
for ( auto &timeout : timeouts_ ) if ( timeout.second == subscription ) return;
for ( auto &timeout : timeouts_ )
if ( timeout.second == subscription )
return;
insertTimeout( subscription->getThrottleInterval(), subscription );
if ( !waiting_subscriptions_.empty())
{
Expand Down Expand Up @@ -370,9 +405,6 @@ void ImageTransportManager::LoadBalancer::triggerSubscription( ImageTransportMan

waiting_subscriptions_.insert( subscription );
subscription->subscribe();
long now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
std::cout << now << std::endl;
}

void ImageTransportManager::LoadBalancer::insertTimeout( const long desired_throttle_interval,
Expand Down Expand Up @@ -438,11 +470,31 @@ void ImageTransportSubscriptionHandle::updateThrottleInterval( int interval )
subscription->updateThrottling();
}

std::string ImageTransportSubscriptionHandle::getTopic()
std::string ImageTransportSubscriptionHandle::getTopic() const
{
return subscription->getTopic();
}

int ImageTransportSubscriptionHandle::latency() const
{
return network_latency + processing_latency;
}

int ImageTransportSubscriptionHandle::networkLatency() const
{
return network_latency;
}

int ImageTransportSubscriptionHandle::processingLatency() const
{
return processing_latency;
}

double ImageTransportSubscriptionHandle::framerate() const
{
return framerate_;
}

ImageTransportManager::ImageTransportManager()
{
load_balancer_ = std::make_unique<ImageTransportManager::LoadBalancer>();
Expand Down
51 changes: 42 additions & 9 deletions src/image_transport_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ QAbstractVideoSurface *ImageTransportSubscriber::videoSurface() const { return s
void ImageTransportSubscriber::setVideoSurface( QAbstractVideoSurface *surface )
{
if ( surface == surface_ ) return;

bool subscribed = subscribed_;
blockSignals( true );
shutdownSubscriber();
if ( surface_ != nullptr && surface_->isActive())
surface_->stop();
surface_ = surface;
initSubscriber();
blockSignals( false );
if ( subscribed != subscribed_ ) emit subscribedChanged();
if ( surface_ == nullptr && subscribed_ ) {
shutdownSubscriber();
return;
}
if ( !subscribed_ ) initSubscriber();
if ( last_frame_.isValid()) presentFrame( last_frame_ );
}

void ImageTransportSubscriber::onNodeHandleReady()
Expand Down Expand Up @@ -67,8 +68,10 @@ void ImageTransportSubscriber::initSubscriber()
// TODO Transport hints
image_transport::TransportHints transport_hints( default_transport_.toStdString());
subscription_ = ImageTransportManager::getInstance().subscribe( nh_, topic_, queue_size_, transport_hints,
std::bind( &ImageTransportSubscriber::presentFrame,
this, std::placeholders::_1 ),
[ this ]( const QVideoFrame &frame )
{
presentFrame( frame );
},
surface_, throttle_interval_ );
subscribed_ = subscription_ != nullptr;
if ( !was_subscribed ) emit subscribedChanged();
Expand Down Expand Up @@ -147,8 +150,18 @@ void ImageTransportSubscriber::presentFrame( const QVideoFrame &frame )
return;
}
}
last_frame_ = frame;
surface_->present( frame );
bool network_latency_changed = last_network_latency_ != subscription_->networkLatency();
bool processing_latency_changed = last_processing_latency_ != subscription_->processingLatency();
if ( network_latency_changed ) emit networkLatencyChanged();
if ( processing_latency_changed ) emit processingLatencyChanged();
if ( network_latency_changed || processing_latency_changed ) emit latencyChanged();
if ( std::abs( last_framerate_ - subscription_->framerate()) > 0.1 ) emit framerateChanged();
last_framerate_ = subscription_->framerate();
last_frame_timestamp_ = ros::Time::now();
last_network_latency_ = subscription_->networkLatency();
last_processing_latency_ = subscription_->processingLatency();
if ( timeout_ != 0 )
{
no_image_timer_.start( throttle_interval_ + timeout_ );
Expand Down Expand Up @@ -201,4 +214,24 @@ void ImageTransportSubscriber::setThrottleRate( double value )
}
emit throttleRateChanged();
}

double ImageTransportSubscriber::framerate() const
{
return subscription_ == nullptr ? 0 : subscription_->framerate();
}

int ImageTransportSubscriber::latency() const
{
return subscription_ == nullptr ? -1 : subscription_->latency();
}

int ImageTransportSubscriber::networkLatency() const
{
return subscription_ == nullptr ? -1 : subscription_->networkLatency();
}

int ImageTransportSubscriber::processingLatency() const
{
return subscription_ == nullptr ? -1 : subscription_->processingLatency();
}
}

0 comments on commit 7da46ce

Please sign in to comment.