Permalink
Fetching contributors…
Cannot retrieve contributors at this time
409 lines (351 sloc) 13.9 KB
/* -*- c++ -*- */
/*
* Copyright 2006,2008,2009,2011,2013 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
* GNU Radio is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3, or (at your option)
* any later version.
*
* GNU Radio is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with GNU Radio; see the file COPYING. If not, write to
* the Free Software Foundation, Inc., 51 Franklin Street,
* Boston, MA 02110-1301, USA.
*/
#ifndef INCLUDED_GR_BASIC_BLOCK_H
#define INCLUDED_GR_BASIC_BLOCK_H
#include <gnuradio/api.h>
#include <gnuradio/sptr_magic.h>
#include <gnuradio/msg_accepter.h>
#include <gnuradio/runtime_types.h>
#include <gnuradio/io_signature.h>
#include <gnuradio/thread/thread.h>
#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/foreach.hpp>
#include <boost/thread/condition_variable.hpp>
#include <iostream>
#include <string>
#include <deque>
#include <map>
#ifdef GR_CTRLPORT
#include <gnuradio/rpcregisterhelpers.h>
#endif
namespace gr {
/*!
* \brief The abstract base class for all signal processing blocks.
* \ingroup internal
*
* Basic blocks are the bare abstraction of an entity that has a
* name, a set of inputs and outputs, and a message queue. These
* are never instantiated directly; rather, this is the abstract
* parent class of both gr_hier_block, which is a recursive
* container, and block, which implements actual signal
* processing functions.
*/
class GR_RUNTIME_API basic_block : public msg_accepter,
public boost::enable_shared_from_this<basic_block>
{
typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
private:
typedef std::map<pmt::pmt_t , msg_handler_t, pmt::comparator> d_msg_handlers_t;
d_msg_handlers_t d_msg_handlers;
typedef std::deque<pmt::pmt_t> msg_queue_t;
typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comparator> msg_queue_map_t;
typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comparator>::iterator msg_queue_map_itr;
std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::comparator> msg_queue_ready;
gr::thread::mutex mutex; //< protects all vars
protected:
friend class flowgraph;
friend class flat_flowgraph; // TODO: will be redundant
friend class tpb_thread_body;
enum vcolor { WHITE, GREY, BLACK };
std::string d_name;
gr::io_signature::sptr d_input_signature;
gr::io_signature::sptr d_output_signature;
long d_unique_id;
long d_symbolic_id;
std::string d_symbol_name;
std::string d_symbol_alias;
vcolor d_color;
bool d_rpc_set;
msg_queue_map_t msg_queue;
std::vector<boost::any> d_rpc_vars; // container for all RPC variables
basic_block(void) {} // allows pure virtual interface sub-classes
//! Protected constructor prevents instantiation by non-derived classes
basic_block(const std::string &name,
gr::io_signature::sptr input_signature,
gr::io_signature::sptr output_signature);
//! may only be called during constructor
void set_input_signature(gr::io_signature::sptr iosig) {
d_input_signature = iosig;
}
//! may only be called during constructor
void set_output_signature(gr::io_signature::sptr iosig) {
d_output_signature = iosig;
}
/*!
* \brief Allow the flowgraph to set for sorting and partitioning
*/
void set_color(vcolor color) { d_color = color; }
vcolor color() const { return d_color; }
/*!
* \brief Tests if there is a handler attached to port \p which_port
*/
virtual bool has_msg_handler(pmt::pmt_t which_port) {
return (d_msg_handlers.find(which_port) != d_msg_handlers.end());
}
/*
* This function is called by the runtime system to dispatch messages.
*
* The thread-safety guarantees mentioned in set_msg_handler are
* implemented by the callers of this method.
*/
virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
{
// AA Update this
if(has_msg_handler(which_port)) { // Is there a handler?
d_msg_handlers[which_port](msg); // Yes, invoke it.
}
}
// Message passing interface
pmt::pmt_t d_message_subscribers;
public:
pmt::pmt_t message_subscribers(pmt::pmt_t port);
virtual ~basic_block();
long unique_id() const { return d_unique_id; }
long symbolic_id() const { return d_symbolic_id; }
/*! The name of the block */
std::string name() const { return d_name; }
/*!
* The sybolic name of the block, which is used in the
* block_registry. The name is assigned by the block's constructor
* and never changes during the life of the block.
*/
std::string symbol_name() const { return d_symbol_name; }
gr::io_signature::sptr input_signature() const { return d_input_signature; }
gr::io_signature::sptr output_signature() const { return d_output_signature; }
basic_block_sptr to_basic_block(); // Needed for Python type coercion
/*!
* True if the block has an alias (see set_block_alias).
*/
bool alias_set() { return !d_symbol_alias.empty(); }
/*!
* Returns the block's alias as a string.
*/
std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
/*!
* Returns the block's alias as PMT.
*/
pmt::pmt_t alias_pmt(){ return pmt::intern(alias()); }
/*!
* Set's a new alias for the block; also adds an entry into the
* block_registry to get the block using either the alias or the
* original symbol name.
*/
void set_block_alias(std::string name);
// ** Message passing interface **
void message_port_register_in(pmt::pmt_t port_id);
void message_port_register_out(pmt::pmt_t port_id);
void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
virtual bool message_port_is_hier(pmt::pmt_t port_id) { (void) port_id; return false; }
virtual bool message_port_is_hier_in(pmt::pmt_t port_id) { (void) port_id; return false; }
virtual bool message_port_is_hier_out(pmt::pmt_t port_id) { (void) port_id; return false; }
/*!
* \brief Get input message port names.
*
* Returns the available input message ports for a block. The
* return object is a PMT vector that is filled with PMT symbols.
*/
pmt::pmt_t message_ports_in();
/*!
* \brief Get output message port names.
*
* Returns the available output message ports for a block. The
* return object is a PMT vector that is filled with PMT symbols.
*/
pmt::pmt_t message_ports_out();
/*!
* Accept msg, place in queue, arrange for thread to be awakened if it's not already.
*/
void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
//! is the queue empty?
bool empty_p(pmt::pmt_t which_port) {
if(msg_queue.find(which_port) == msg_queue.end())
throw std::runtime_error("port does not exist!");
return msg_queue[which_port].empty();
}
bool empty_p() {
bool rv = true;
BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) {
rv &= msg_queue[i.first].empty();
}
return rv;
}
//! are all msg ports with handlers empty?
bool empty_handled_p(pmt::pmt_t which_port){
return (empty_p(which_port) || !has_msg_handler(which_port));
}
bool empty_handled_p() {
bool rv = true;
BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) {
rv &= empty_handled_p(i.first);
}
return rv;
}
//! How many messages in the queue?
size_t nmsgs(pmt::pmt_t which_port) {
if(msg_queue.find(which_port) == msg_queue.end())
throw std::runtime_error("port does not exist!");
return msg_queue[which_port].size();
}
//| Acquires and release the mutex
void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
/*!
* \returns returns pmt at head of queue or pmt::pmt_t() if empty.
*/
pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
/*!
* \param[in] which_port The message port from which to get the message.
* \param[in] millisec Optional timeout value (0=no timeout).
* \returns returns pmt at head of queue or pmt::pmt_t() if empty.
*/
pmt::pmt_t delete_head_blocking(pmt::pmt_t which_port, unsigned int millisec = 0);
msg_queue_t::iterator get_iterator(pmt::pmt_t which_port) {
return msg_queue[which_port].begin();
}
void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it) {
msg_queue[which_port].erase(it);
}
virtual bool has_msg_port(pmt::pmt_t which_port) {
if(msg_queue.find(which_port) != msg_queue.end()) {
return true;
}
if(pmt::dict_has_key(d_message_subscribers, which_port)) {
return true;
}
return false;
}
const msg_queue_map_t& get_msg_map(void) const {
return msg_queue;
}
#ifdef GR_CTRLPORT
/*!
* \brief Add an RPC variable (get or set).
*
* Using controlport, we create new getters/setters and need to
* store them. Each block has a vector to do this, and these never
* need to be accessed again once they are registered with the RPC
* backend. This function takes a
* boost::shared_sptr<rpcbasic_base> so that when the block is
* deleted, all RPC registered variables are cleaned up.
*
* \param s an rpcbasic_sptr of the new RPC variable register to store.
*/
void add_rpc_variable(rpcbasic_sptr s)
{
d_rpc_vars.push_back(s);
}
#endif /* GR_CTRLPORT */
/*!
* \brief Set up the RPC registered variables.
*
* This must be overloaded by a block that wants to use
* controlport. This is where rpcbasic_register_{get,set} pointers
* are created, which then get wrapped as shared pointers
* (rpcbasic_sptr(...)) and stored using add_rpc_variable.
*/
virtual void setup_rpc() {};
/*!
* \brief Ask if this block has been registered to the RPC.
*
* We can only register a block once, so we use this to protect us
* from calling it multiple times.
*/
bool is_rpc_set() { return d_rpc_set; }
/*!
* \brief When the block is registered with the RPC, set this.
*/
void rpc_set() { d_rpc_set = true; }
/*!
* \brief Confirm that ninputs and noutputs is an acceptable combination.
*
* \param ninputs number of input streams connected
* \param noutputs number of output streams connected
*
* \returns true if this is a valid configuration for this block.
*
* This function is called by the runtime system whenever the
* topology changes. Most classes do not need to override this.
* This check is in addition to the constraints specified by the
* input and output gr::io_signatures.
*/
virtual bool check_topology(int ninputs, int noutputs) {
(void)ninputs;
(void)noutputs;
return true;
}
/*!
* \brief Set the callback that is fired when messages are available.
*
* \p msg_handler can be any kind of function pointer or function object
* that has the signature:
* <pre>
* void msg_handler(pmt::pmt msg);
* </pre>
*
* (You may want to use boost::bind to massage your callable into
* the correct form. See gr::blocks::nop for an example that sets
* up a class method as the callback.)
*
* Blocks that desire to handle messages must call this method in
* their constructors to register the handler that will be invoked
* when messages are available.
*
* If the block inherits from block, the runtime system will
* ensure that msg_handler is called in a thread-safe manner, such
* that work and msg_handler will never be called concurrently.
* This allows msg_handler to update state variables without
* having to worry about thread-safety issues with work,
* general_work or another invocation of msg_handler.
*
* If the block inherits from hier_block2, the runtime system
* will ensure that no reentrant calls are made to msg_handler.
*/
template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler) {
if(msg_queue.find(which_port) == msg_queue.end()) {
throw std::runtime_error("attempt to set_msg_handler() on bad input message port!");
}
d_msg_handlers[which_port] = msg_handler_t(msg_handler);
}
virtual void set_processor_affinity(const std::vector<int> &mask)
{ (void) mask;
throw std::runtime_error("set_processor_affinity not overloaded in child class."); }
virtual void unset_processor_affinity()
{ throw std::runtime_error("unset_processor_affinity not overloaded in child class."); }
virtual std::vector<int> processor_affinity()
{ throw std::runtime_error("processor_affinity not overloaded in child class."); }
};
inline bool operator<(basic_block_sptr lhs, basic_block_sptr rhs)
{
return lhs->unique_id() < rhs->unique_id();
}
typedef std::vector<basic_block_sptr> basic_block_vector_t;
typedef std::vector<basic_block_sptr>::iterator basic_block_viter_t;
GR_RUNTIME_API long basic_block_ncurrently_allocated();
inline std::ostream &operator << (std::ostream &os, basic_block_sptr basic_block)
{
os << basic_block->name() << "(" << basic_block->unique_id() << ")";
return os;
}
} /* namespace gr */
#endif /* INCLUDED_GR_BASIC_BLOCK_H */