Skip to content
This repository has been archived by the owner on Sep 23, 2021. It is now read-only.

Unregister non-used subscribers #6

Merged
merged 3 commits into from Mar 25, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions include/mjpeg_server/mjpeg_server.h
Expand Up @@ -147,6 +147,7 @@ class MJPEGServer
private:
typedef std::map<std::string, ImageBuffer*> ImageBufferMap;
typedef std::map<std::string, image_transport::Subscriber> ImageSubscriberMap;
typedef std::map<std::string, size_t> ImageSubscriberCountMap;
typedef std::map<std::string, std::string> ParameterMap;

std::string header;
Expand Down Expand Up @@ -284,6 +285,24 @@ class MJPEGServer
*/
void decodeParameter(const std::string& parameter, ParameterMap& parameter_map);

/**
* @brief increase the number of the subscribers of the specified topic
* @param topic name string
*/
void decreaseSubscriberCount(const std::string topic);

/**
* @brief decrease the number of the subscribers of the specified topic
* @param topic name string
*/
void increaseSubscriberCount(const std::string topic);

/**
* @brief remove ros::Subscriber if the number of the subscribers of the topic is equal to 0
* @param topic name string
*/
void unregisterSubscriberIfPossible(const std::string topic);

ros::NodeHandle node_;
image_transport::ImageTransport image_transport_;
int port_;
Expand All @@ -296,6 +315,7 @@ class MJPEGServer

ImageBufferMap image_buffers_;
ImageSubscriberMap image_subscribers_;
ImageSubscriberCountMap image_subscribers_count_;
boost::mutex image_maps_mutex_;

};
Expand Down
58 changes: 57 additions & 1 deletion src/mjpeg_server.cpp
Expand Up @@ -474,6 +474,7 @@ void MJPEGServer::sendStream(int fd, const char *parameter)
return;

std::string topic = itp->second;
increaseSubscriberCount(topic);
ImageBuffer* image_buffer = getImageBuffer(topic);

ROS_DEBUG("preparing header");
Expand Down Expand Up @@ -613,6 +614,9 @@ void MJPEGServer::sendStream(int fd, const char *parameter)
}

free(frame);
decreaseSubscriberCount(topic);
unregisterSubscriberIfPossible(topic);

}

void MJPEGServer::sendSnapshot(int fd, const char *parameter)
Expand All @@ -633,6 +637,7 @@ void MJPEGServer::sendSnapshot(int fd, const char *parameter)
return;

std::string topic = itp->second;
increaseSubscriberCount(topic);
ImageBuffer* image_buffer = getImageBuffer(topic);

/* wait for fresh frames */
Expand Down Expand Up @@ -735,6 +740,8 @@ void MJPEGServer::sendSnapshot(int fd, const char *parameter)
}

free(frame);
decreaseSubscriberCount(topic);
unregisterSubscriberIfPossible(topic);
}

void MJPEGServer::client(int fd)
Expand Down Expand Up @@ -879,7 +886,6 @@ void MJPEGServer::client(int fd)

close(fd);
freeRequest(&req);

ROS_INFO("Disconnecting HTTP client");
return;
}
Expand Down Expand Up @@ -1056,6 +1062,56 @@ void MJPEGServer::stop()
stop_requested_ = true;
}

void MJPEGServer::decreaseSubscriberCount(const std::string topic)
{
boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
ImageSubscriberCountMap::iterator it = image_subscribers_count_.find(topic);
if (it != image_subscribers_count_.end())
{
if (image_subscribers_count_[topic] == 1) {
image_subscribers_count_.erase(it);
ROS_INFO("no subscribers for %s", topic.c_str());
}
else if (image_subscribers_count_[topic] > 0) {
image_subscribers_count_[topic] = image_subscribers_count_[topic] - 1;
ROS_INFO("%lu subscribers for %s", image_subscribers_count_[topic], topic.c_str());
}
}
else
{
ROS_INFO("no subscribers counter for %s", topic.c_str());
}
}

void MJPEGServer::increaseSubscriberCount(const std::string topic)
{
boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
ImageSubscriberCountMap::iterator it = image_subscribers_count_.find(topic);
if (it == image_subscribers_count_.end())
{
image_subscribers_count_.insert(ImageSubscriberCountMap::value_type(topic, 1));
}
else {
image_subscribers_count_[topic] = image_subscribers_count_[topic] + 1;
}
ROS_INFO("%lu subscribers for %s", image_subscribers_count_[topic], topic.c_str());
}

void MJPEGServer::unregisterSubscriberIfPossible(const std::string topic)
{
boost::unique_lock<boost::mutex> lock(image_maps_mutex_);
ImageSubscriberCountMap::iterator it = image_subscribers_count_.find(topic);
if (it == image_subscribers_count_.end() ||
image_subscribers_count_[topic] == 0)
{
ImageSubscriberMap::iterator sub_it = image_subscribers_.find(topic);
if (sub_it != image_subscribers_.end())
{
ROS_INFO("Unsubscribing from %s", topic.c_str());
image_subscribers_.erase(sub_it);
}
}
}
}

int main(int argc, char** argv)
Expand Down