Skip to content
Browse files

blocks can have arbitrary number of msg inputs

  • Loading branch information...
1 parent e83aaff commit e36ebfbd310bcc3b40095d0b03e461c9db549061 @guruofquality committed Aug 31, 2012
Showing with 84 additions and 61 deletions.
  1. +15 −3 include/gnuradio/block.h
  2. +51 −38 lib/block.cc
  3. +2 −2 lib/block_gateway.cc
  4. +1 −1 lib/block_gateway.h
  5. +15 −17 python/block_gateway.py
View
18 include/gnuradio/block.h
@@ -57,13 +57,25 @@ template <typename PtrType> struct Buffer
//! Message signature describes the inputs and outputs of message passing
struct GR_EXTRAS_API msg_signature
{
- msg_signature(const bool has_input = false, const size_t num_outputs = 0):
- has_input(has_input), num_outputs(num_outputs)
+ msg_signature(void):
+ num_inputs(0), num_outputs(0)
{
//NOP
}
- bool has_input;
+ msg_signature(const bool has_input, const size_t num_outputs):
+ num_inputs(has_input?1:0), num_outputs(num_outputs)
+ {
+ //NOP
+ }
+
+ msg_signature(const size_t num_inputs, const size_t num_outputs):
+ num_inputs(num_inputs), num_outputs(num_outputs)
+ {
+ //NOP
+ }
+
+ size_t num_inputs;
size_t num_outputs;
};
View
89 lib/block.cc
@@ -39,6 +39,43 @@ static int mylround(double x)
return int(x + 0.5);
}
+template <typename T>
+struct MyQueue
+{
+
+ void push(const T &msg)
+ {
+ boost::mutex::scoped_lock lock(_msg_queue_mutex);
+ _msg_queue.push(msg);
+ lock.unlock();
+ _msg_queue_condition_variable.notify_one();
+ }
+
+ bool ready(void)
+ {
+ boost::mutex::scoped_lock lock(_msg_queue_mutex);
+ return !_msg_queue.empty();
+ }
+
+ T pop(void)
+ {
+ boost::mutex::scoped_lock lock(_msg_queue_mutex);
+ while (_msg_queue.empty())
+ {
+ _msg_queue_condition_variable.wait(lock);
+ }
+
+ T msg = _msg_queue.front();
+ _msg_queue.front() = T(); //resets ref counts
+ _msg_queue.pop();
+ return msg;
+ }
+
+ std::queue<T> _msg_queue;
+ boost::mutex _msg_queue_mutex;
+ boost::condition_variable _msg_queue_condition_variable;
+};
+
/***********************************************************************
* The message sourcer object
**********************************************************************/
@@ -140,47 +177,20 @@ class msg_sinker : public gr_block
//push the tags in the queue
BOOST_FOREACH(gr_tag_t &msg, _tags)
{
- this->push_msg_queue(msg);
+ msg.offset = this->index;
+ this->queue->push(msg);
msg = gr_tag_t(); //resets PMT ref in _tags
}
//return produced
return 0;
}
- void push_msg_queue(const gr_tag_t &msg)
- {
- boost::mutex::scoped_lock lock(_msg_queue_mutex);
- _msg_queue.push(msg);
- lock.unlock();
- _msg_queue_condition_variable.notify_one();
- }
-
- bool check_msg_queue(void)
- {
- boost::mutex::scoped_lock lock(_msg_queue_mutex);
- return !_msg_queue.empty();
- }
-
- gr_tag_t pop_msg_queue(void)
- {
- boost::mutex::scoped_lock lock(_msg_queue_mutex);
- while (_msg_queue.empty())
- {
- _msg_queue_condition_variable.wait(lock);
- }
-
- gr_tag_t msg = _msg_queue.front();
- _msg_queue.front() = gr_tag_t(); //resets PMT ref counts
- _msg_queue.pop();
- return msg;
- }
+ size_t index;
+ MyQueue<gr_tag_t> *queue;
private:
std::vector<gr_tag_t> _tags;
- std::queue<gr_tag_t> _msg_queue;
- boost::mutex _msg_queue_mutex;
- boost::condition_variable _msg_queue_condition_variable;
};
/***********************************************************************
@@ -342,9 +352,10 @@ class master_block : public gr_block
struct block::impl
{
boost::shared_ptr<master_block> master;
- boost::shared_ptr<msg_sinker> sinker;
+ std::vector<boost::shared_ptr<msg_sinker> > sinkers;
std::vector<boost::shared_ptr<msg_sourcer> > sourcers;
gr_null_sink_sptr null_sink;
+ MyQueue<gr_tag_t> queue;
};
static gr_io_signature_sptr extend_sig(gr_io_signature_sptr sig, const size_t num){
@@ -372,7 +383,7 @@ block::block(
):
gr_hier_block2(
name + " wrapper",
- extend_sig(in_sig, msg_sig.has_input? 1 : 0),
+ extend_sig(in_sig, msg_sig.num_inputs),
extend_sig(out_sig, msg_sig.num_outputs)
)
{
@@ -406,10 +417,12 @@ block::block(
}
//connect sinker to upper port
- if (msg_sig.has_input)
+ for (size_t i = 0; i < msg_sig.num_inputs; i++)
{
- _impl->sinker = boost::make_shared<msg_sinker>();
- this->connect(this->self(), in_sig->max_streams(), _impl->sinker, 0);
+ _impl->sinkers.push_back(boost::make_shared<msg_sinker>());
+ _impl->sinkers.back()->queue = &_impl->queue;
+ _impl->sinkers.back()->index = i;
+ this->connect(this->self(), in_sig->max_streams(), _impl->sinkers.back(), 0);
}
//connect sourcer to upper ports
@@ -559,12 +572,12 @@ void block::get_tags_in_range(
bool block::check_msg_queue(void)
{
- return _impl->sinker->check_msg_queue();
+ return _impl->queue.ready();
}
gr_tag_t block::pop_msg_queue(void)
{
- return _impl->sinker->pop_msg_queue();
+ return _impl->queue.pop();
}
void block::post_msg(const size_t port, const gr_tag_t &msg)
View
4 lib/block_gateway.cc
@@ -131,10 +131,10 @@ block_gateway::sptr block_gateway::make(
const std::string &name,
gr_io_signature_sptr in_sig,
gr_io_signature_sptr out_sig,
- const bool has_msg_input,
+ const size_t num_msg_inputs,
const size_t num_msg_outputs
){
return gnuradio::get_initial_sptr(
- new block_gateway_impl(handler, name, in_sig, out_sig, msg_signature(has_msg_input, num_msg_outputs))
+ new block_gateway_impl(handler, name, in_sig, out_sig, msg_signature(num_msg_inputs, num_msg_outputs))
);
}
View
2 lib/block_gateway.h
@@ -85,7 +85,7 @@ class GR_EXTRAS_API block_gateway : virtual public gnuradio::block{
const std::string &name,
gr_io_signature_sptr in_sig,
gr_io_signature_sptr out_sig,
- const bool has_msg_input,
+ const size_t num_msg_inputs,
const size_t num_msg_outputs
);
View
32 python/block_gateway.py
@@ -62,7 +62,10 @@ def eval(self, arg):
########################################################################
class gateway_block(object):
- def __init__(self, name, in_sig, out_sig, has_msg_input, num_msg_outputs):
+ def __init__(self, name, in_sig, out_sig, num_msg_inputs=0, num_msg_outputs=0, has_msg_input=False):
+
+ #backwards compat to handle has_msg_input
+ if has_msg_input: num_msg_inputs += 1
#ensure that the sigs are iterable dtypes
def sig_to_dtype_sig(sig):
@@ -86,7 +89,7 @@ def sig_to_gr_io_sigv(sig):
self.__handler = gateway_handler()
self.__handler.init(self.__gr_block_handle)
self.__gateway = block_gateway(
- self.__handler, name, gr_in_sig, gr_out_sig, has_msg_input, num_msg_outputs)
+ self.__handler, name, gr_in_sig, gr_out_sig, num_msg_inputs, num_msg_outputs)
self.__message = self.__gateway.gr_block_message()
#register gr_block functions
@@ -157,13 +160,12 @@ def stop(self): return True
# Wrappers for the user to inherit from
########################################################################
class block(gateway_block):
- def __init__(self, name, in_sig, out_sig, has_msg_input=False, num_msg_outputs=0):
+ def __init__(self, name, in_sig, out_sig, **kwargs):
gateway_block.__init__(self,
name=name,
in_sig=in_sig,
out_sig=out_sig,
- has_msg_input=has_msg_input,
- num_msg_outputs=num_msg_outputs,
+ **kwargs
)
#inject into gr namespace
@@ -173,46 +175,42 @@ def __init__(self, name, in_sig, out_sig, has_msg_input=False, num_msg_outputs=0
# DEPRECATED: Wrappers for the user to inherit from
########################################################################
class basic_block(gateway_block):
- def __init__(self, name, in_sig, out_sig, has_msg_input=False, num_msg_outputs=0):
+ def __init__(self, name, in_sig, out_sig, **kwargs):
gateway_block.__init__(self,
name=name,
in_sig=in_sig,
out_sig=out_sig,
- has_msg_input=has_msg_input,
- num_msg_outputs=num_msg_outputs,
+ **kwargs
)
self.set_auto_consume(False)
self.work = self.general_work #makes it backwards compatible since we only call work
class sync_block(gateway_block):
- def __init__(self, name, in_sig, out_sig, has_msg_input=False, num_msg_outputs=0):
+ def __init__(self, name, in_sig, out_sig, **kwargs):
gateway_block.__init__(self,
name=name,
in_sig=in_sig,
out_sig=out_sig,
- has_msg_input=has_msg_input,
- num_msg_outputs=num_msg_outputs,
+ **kwargs
)
class decim_block(gateway_block):
- def __init__(self, name, in_sig, out_sig, decim, has_msg_input=False, num_msg_outputs=0):
+ def __init__(self, name, in_sig, out_sig, decim, **kwargs):
gateway_block.__init__(self,
name=name,
in_sig=in_sig,
out_sig=out_sig,
- has_msg_input=has_msg_input,
- num_msg_outputs=num_msg_outputs,
+ **kwargs
)
self.set_relative_rate(1.0/decim)
class interp_block(gateway_block):
- def __init__(self, name, in_sig, out_sig, interp, has_msg_input=False, num_msg_outputs=0):
+ def __init__(self, name, in_sig, out_sig, interp, **kwargs):
gateway_block.__init__(self,
name=name,
in_sig=in_sig,
out_sig=out_sig,
- has_msg_input=has_msg_input,
- num_msg_outputs=num_msg_outputs,
+ **kwargs
)
self.set_relative_rate(1.0*interp)

0 comments on commit e36ebfb

Please sign in to comment.
Something went wrong with that request. Please try again.