Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

many 2 one block should have a worker per input

  • Loading branch information...
commit b13a60f4c87d6d95fcebe2ebf38be36a68ba90da 1 parent 3bda241
@guruofquality authored
Showing with 67 additions and 23 deletions.
  1. +2 −2 include/gnuradio/extras/msg_many_to_one.h
  2. +65 −21 lib/msg_many_to_one.cc
View
4 include/gnuradio/extras/msg_many_to_one.h
@@ -23,11 +23,11 @@
#define INCLUDED_GR_EXTRAS_MSG_MANY_TO_ONE_H
#include <gnuradio/extras/api.h>
-#include <gnuradio/block.h>
+#include <gr_hier_block2.h>
namespace gnuradio{ namespace extras{
-class GR_EXTRAS_API msg_many_to_one : virtual public block{
+class GR_EXTRAS_API msg_many_to_one : virtual public gr_hier_block2{
public:
typedef boost::shared_ptr<msg_many_to_one> sptr;
View
86 lib/msg_many_to_one.cc
@@ -20,46 +20,90 @@
*/
#include <gnuradio/extras/msg_many_to_one.h>
+#include <gnuradio/block.h>
+#include <gruel/thread.h>
#include <gr_io_signature.h>
#include <boost/foreach.hpp>
+#include <queue>
using namespace gnuradio::extras;
-class msg_many_to_one_impl : public msg_many_to_one
+class m21_output : public gnuradio::block
{
public:
- msg_many_to_one_impl(const size_t num_inputs):
- block(
- "message many to one",
- gr_make_io_signature(num_inputs, num_inputs, 1),
- gr_make_io_signature(1, 1, 1)
- )
- {
- _tags.reserve(1024); //something reasonably large so we dont malloc
- }
+ m21_output(void):
+ gnuradio::block("m21 output", gr_make_io_signature(0, 0, 0), gr_make_io_signature(1, 1, 1))
+ {}
int work(
const InputItems &input_items,
const OutputItems &output_items
){
- std::vector<gr_tag_t> tags;
- for (size_t i = 0; i < input_items.size(); i++)
+ gruel::scoped_lock lock(mutex);
+ while (queue.empty())
{
- const uint64_t nread = this->nitems_read(i); //number of items read on port i
+ cond.wait(lock);
+ }
+ gr_tag_t tag = queue.front();
+ queue.pop();
+ tag.offset = this->nitems_written(0);
+ this->add_item_tag(0, tag);
+ return output_items[0].size();
+ }
- //read all tags associated with port i for items in this work function
- this->get_tags_in_range(_tags, i, nread, nread+input_items[i].size());
+ std::queue<gr_tag_t> queue;
+ gruel::mutex mutex;
+ gruel::condition_variable cond;
+};
- BOOST_FOREACH(const gr_tag_t &tag, _tags)
- {
- this->add_item_tag(0, tag);
- }
- }
+class m21_input : public gnuradio::block
+{
+public:
+ m21_input(void):
+ gnuradio::block("m21 input", gr_make_io_signature(1, 1, 1), gr_make_io_signature(0, 0, 0))
+ {}
+ int work(
+ const InputItems &input_items,
+ const OutputItems &output_items
+ ){
+ const uint64_t nread = this->nitems_read(0);
+ this->get_tags_in_range(tags, 0, nread, nread+input_items[0].size());
+ BOOST_FOREACH(const gr_tag_t &tag, tags)
+ {
+ gruel::scoped_lock lock(output->mutex);
+ output->queue.push(tag);
+ output->cond.notify_one();
+ }
return output_items[0].size();
}
- std::vector<gr_tag_t> _tags;
+ boost::shared_ptr<m21_output> output;
+ std::vector<gr_tag_t> tags;
+};
+
+class msg_many_to_one_impl : public msg_many_to_one
+{
+public:
+ msg_many_to_one_impl(const size_t num_inputs):
+ gr_hier_block2(
+ "message many to one",
+ gr_make_io_signature(num_inputs, num_inputs, 1),
+ gr_make_io_signature(1, 1, 1)
+ )
+ {
+ output = gnuradio::get_initial_sptr(new m21_output());
+ this->connect(output, 0, this->self(), 0);
+ for (size_t i = 0; i < num_inputs; i++)
+ {
+ inputs.push_back(gnuradio::get_initial_sptr(new m21_input()));
+ inputs[i]->output = output;
+ this->connect(this->self(), i, inputs[i], 0);
+ }
+ }
+
+ std::vector<boost::shared_ptr<m21_input> > inputs;
+ boost::shared_ptr<m21_output> output;
};
msg_many_to_one::sptr msg_many_to_one::make(const size_t num_inputs)
Please sign in to comment.
Something went wrong with that request. Please try again.