Skip to content

Commit

Permalink
Added throttle rate to subscribers to prevent race conditions with hi…
Browse files Browse the repository at this point in the history
…gh frequency topics.
  • Loading branch information
StefanFabian committed Feb 10, 2022
1 parent da343af commit a9d4931
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 5 deletions.
6 changes: 6 additions & 0 deletions docs/pages/Subscribers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ has more properties to give you more fine-grained control.
id: mySubscriber
ns: "~" // Namespace
topic: "/intval"
throttleRate: 30 // Update rate of message property in Hz
queueSize: 10
running: true
onNewMessage: doStuff(message)
Expand All @@ -62,6 +63,11 @@ the ``ros::NodeHandle`` that is created to subscribe to the given topic.
The ``queueSize`` property controls how many incoming messages are queued for
processing before the oldest are dropped.

The ``throttleRate`` limits the rate in which QML receives updates from the given topic.
By default the Subscriber polls with 20 Hz on the UI thread and will notify of property changes with at most this rate.
This is to reduce load and prevent race conditions that could otherwise update the message while QML is using it since
the subscriber is receiving messages in a background thread by default.

Using the ``running`` property, the subscriber can be enabled and disabled.
If the property is set to ``false``, the subscriber is shut down until it is
set to ``true`` again and subscribes to the topic again.
Expand Down
23 changes: 21 additions & 2 deletions include/qml_ros_plugin/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <QVariant>
#include <QMap>
#include <QTimer>

#include <ros_babel_fish/babel_fish.h>
#include <ros_babel_fish/babel_fish_message.h>
Expand Down Expand Up @@ -37,6 +38,9 @@ Q_OBJECT
//! The type of the last received message, e.g., geometry_msgs/Pose.
Q_PROPERTY( QString messageType READ messageType NOTIFY messageTypeChanged )

//! Limits the frequency in which the notification for an updated message is emitted. Default: 20 Hz
Q_PROPERTY( int throttleRate READ throttleRate WRITE setThrottleRate NOTIFY throttleRateChanged )

//! Controls whether or not the subscriber is currently running, i.e., able to receive messages. Default: true
Q_PROPERTY( bool running READ running WRITE setRunning NOTIFY runningChanged )
// @formatter:on
Expand All @@ -63,6 +67,10 @@ Q_OBJECT

void setRunning( bool value );

int throttleRate() const;

void setThrottleRate( int value );

const QVariant &message() const;

const QString &messageType() const;
Expand All @@ -81,6 +89,9 @@ Q_OBJECT
//! Emitted when the namespace ns changed.
void nsChanged();

//! Emitted when the throttle rate was changed.
void throttleRateChanged();

//! Emitted if the running state of this subscriber changed.
void runningChanged();

Expand All @@ -100,7 +111,11 @@ protected slots:

void subscribe();

void updateMessage();

protected:
void initTimer();

void onRosInitialized() override;

void shutdown();
Expand All @@ -110,13 +125,17 @@ protected slots:
NodeHandle::Ptr nh_;
ros::Subscriber subscriber_;
ros_babel_fish::BabelFish babel_fish_;
ros_babel_fish::BabelFishMessage::ConstPtr last_message_;
ros_babel_fish::BabelFishMessage::ConstPtr current_message_;
QTimer throttle_timer_;
bool running_;
bool is_subscribed_;

QString topic_;
quint32 queue_size_;
QVariant message_;
QString message_type_;
QVariant message_;
quint32 queue_size_;
int throttle_rate_ = 20;
};
} // qml_ros_plugin

Expand Down
32 changes: 29 additions & 3 deletions src/subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Subscriber::Subscriber() : running_( true ), is_subscribed_( false ), queue_size
nh_ = std::make_shared<NodeHandle>();
babel_fish_ = BabelFishDispenser::getBabelFish();
connect( nh_.get(), &NodeHandle::ready, this, &Subscriber::subscribe );
initTimer();
}

Subscriber::Subscriber( NodeHandle::Ptr nh, QString topic, quint32 queue_size, bool running )
Expand All @@ -25,11 +26,20 @@ Subscriber::Subscriber( NodeHandle::Ptr nh, QString topic, quint32 queue_size, b
{
babel_fish_ = BabelFishDispenser::getBabelFish();
connect( nh_.get(), &NodeHandle::ready, this, &Subscriber::subscribe );
initTimer();
if ( nh_->isReady()) subscribe();
}

Subscriber::~Subscriber() = default;

void Subscriber::initTimer()
{
connect( &throttle_timer_, &QTimer::timeout, this, &Subscriber::updateMessage );
throttle_timer_.setSingleShot( false );
throttle_timer_.setInterval( 50 );
throttle_timer_.start();
}

QString Subscriber::topic() const { return QString::fromStdString( subscriber_.getTopic()); }

void Subscriber::setTopic( const QString &value )
Expand Down Expand Up @@ -71,6 +81,15 @@ void Subscriber::setRunning( bool value )
emit runningChanged();
}

int Subscriber::throttleRate() const { return throttle_rate_; }

void Subscriber::setThrottleRate( int value )
{
throttle_rate_ = value;
throttle_timer_.setInterval( 1000 / throttle_rate_ );
emit throttleRateChanged();
}

const QVariant &Subscriber::message() const { return message_; }

const QString &Subscriber::messageType() const { return message_type_; }
Expand Down Expand Up @@ -104,11 +123,18 @@ void Subscriber::shutdown()

void Subscriber::messageCallback( const ros_babel_fish::BabelFishMessage::ConstPtr &msg )
{
TranslatedMessage::Ptr translated = babel_fish_.translateMessage( msg );
last_message_ = msg;
}

void Subscriber::updateMessage()
{
if (current_message_ == last_message_) return;
current_message_ = last_message_;
TranslatedMessage::Ptr translated = babel_fish_.translateMessage( current_message_ );
message_ = msgToMap( translated, translated->translated_message->as<CompoundMessage>());
if ( msg->dataType() != message_type_.toStdString())
if (current_message_->dataType() != message_type_.toStdString())
{
message_type_ = QString::fromStdString( msg->dataType());
message_type_ = QString::fromStdString(current_message_->dataType());
emit messageTypeChanged();
}
emit messageChanged();
Expand Down

0 comments on commit a9d4931

Please sign in to comment.