Skip to content
Browse files

work on socket msg block with in and out msg ports

  • Loading branch information...
1 parent c417d80 commit 65df505e5d2709eefd0f4125d06b13eb0e30980e @guruofquality committed Sep 13, 2012
Showing with 289 additions and 0 deletions.
  1. +1 −0 include/gnuradio/extras/CMakeLists.txt
  2. +53 −0 include/gnuradio/extras/socket_msg.h
  3. +1 −0 lib/CMakeLists.txt
  4. +234 −0 lib/socket_msg.cc
View
1 include/gnuradio/extras/CMakeLists.txt
@@ -48,6 +48,7 @@ list(APPEND include_sources
stream_to_blob.h
tuntap.h
msg_many_to_one.h
+ socket_msg.h
)
install(
View
53 include/gnuradio/extras/socket_msg.h
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_EXTRAS_SOCKET_MSG_H
+#define INCLUDED_GR_EXTRAS_SOCKET_MSG_H
+
+#include <gnuradio/extras/api.h>
+#include <gr_hier_block2.h>
+
+namespace gnuradio{ namespace extras{
+
+class GR_EXTRAS_API socket_msg : virtual public gr_hier_block2{
+public:
+ typedef boost::shared_ptr<socket_msg> sptr;
+
+ /*!
+ * \brief Make a socket block with message blobs in and out
+ *
+ * The TCP socket will listen on addr/port, and accept the first connection.
+ *
+ * \param proto the protocol "TCP" only for now
+ * \param addr the resolvable interface address of the socket
+ * \param port the resolvable interface port of the socket
+ * \param mtu the max bytes in an incoming packet, 0 for default
+ * \return a new socket message block
+ */
+ static sptr make(
+ const std::string &proto, const std::string &addr, const std::string &port, const size_t mtu = 0
+ );
+
+};
+
+}}
+
+#endif /* INCLUDED_GR_EXTRAS_SOCKET_MSG_H */
View
1 lib/CMakeLists.txt
@@ -57,6 +57,7 @@ list(APPEND gr_extras_sources
stream_to_blob.cc
tuntap.cc
msg_many_to_one.cc
+ socket_msg.cc
)
list(APPEND gr_extras_libs
View
234 lib/socket_msg.cc
@@ -0,0 +1,234 @@
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include <gnuradio/extras/socket_msg.h>
+#include <gnuradio/block.h>
+#include <gr_io_signature.h>
+#include <boost/asio.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/weak_ptr.hpp>
+#include <iostream>
+
+namespace asio = boost::asio;
+
+using namespace gnuradio::extras;
+
+static const long timeout_us = 100*1000; //100ms
+static const pmt::pmt_t BLOB_KEY = pmt::pmt_string_to_symbol("blob_stream");
+static const size_t POOL_SIZE = 64; //num pre-allocated blobs to acquire at once
+
+static bool wait_for_recv_ready(int sock_fd)
+{
+ //setup timeval for timeout
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = timeout_us;
+
+ //setup rset for timeout
+ fd_set rset;
+ FD_ZERO(&rset);
+ FD_SET(sock_fd, &rset);
+
+ //call select with timeout on receive socket
+ return ::select(sock_fd+1, &rset, NULL, NULL, &tv) > 0;
+}
+
+/***********************************************************************
+ * produces messages to stream that recvd from socket
+ **********************************************************************/
+struct socket_msg_producer : gnuradio::block
+{
+ socket_msg_producer(const size_t mtu):
+ gnuradio::block(
+ "socket_msg_producer",
+ gr_make_io_signature(0, 0, 0),
+ gr_make_io_signature(1, 1, 1)
+ ),
+ _mtu(mtu)
+ {
+ std::stringstream str;
+ str << name() << unique_id();
+ _id = pmt::pmt_string_to_symbol(str.str());
+
+ //pre-allocate blobs
+ _mgr = pmt::pmt_mgr::make();
+ for (size_t i = 0; i < POOL_SIZE; i++){
+ _mgr->set(pmt::pmt_make_blob(mtu));
+ }
+ }
+
+ int work(
+ const InputItems &,
+ const OutputItems &
+ ){
+ while (!boost::this_thread::interruption_requested())
+ {
+ //wait for socket to be init'd
+ boost::shared_ptr<asio::ip::tcp::socket> _socket = weak_socket.lock();
+ if (not _socket)
+ {
+ boost::this_thread::sleep(boost::posix_time::microseconds(timeout_us));
+ continue;
+ }
+
+ if (!wait_for_recv_ready(_socket->native())) continue;
+
+ //perform a blocking receive
+ pmt::pmt_t blob = _mgr->acquire(true /*block*/);
+ pmt::pmt_blob_resize(blob, _mtu);
+ size_t num_bytes = 0;
+ try{
+ num_bytes = _socket->receive(asio::buffer(
+ pmt::pmt_blob_rw_data(blob), _mtu
+ ));
+ }catch(...){
+ std::cerr << "socket msg block, socket receive error, continuing..." << std::endl;
+ }
+
+ //post the message to downstream subscribers
+ pmt::pmt_blob_resize(blob, num_bytes);
+ this->post_msg(0, BLOB_KEY, blob, _id);
+ }
+ return -1;
+ }
+
+ const size_t _mtu;
+ pmt::pmt_t _id;
+ pmt::pmt_mgr::sptr _mgr;
+
+ boost::weak_ptr<asio::ip::tcp::socket> weak_socket;
+};
+
+
+/***********************************************************************
+ * consumes messages from stream and send to socket
+ **********************************************************************/
+struct socket_msg_consumer : gnuradio::block
+{
+ socket_msg_consumer(void):
+ gnuradio::block(
+ "socket_msg_consumer",
+ gr_make_io_signature(1, 1, 1),
+ gr_make_io_signature(0, 0, 0)
+ )
+ {
+ //NOP
+ }
+
+ int work(
+ const InputItems &,
+ const OutputItems &
+ ){
+ //loop for blobs until this thread is interrupted
+ while (true){
+ gr_tag_t msg = this->pop_msg_queue();
+ if (!pmt::pmt_is_blob(msg.value)) continue;
+ if (pmt::pmt_blob_length(msg.value) == 0) break; //empty blob, we are done here
+
+ //if we dont have a socket, the message is dropped
+ boost::shared_ptr<asio::ip::tcp::socket> _socket = weak_socket.lock();
+ if (not _socket) continue;
+
+ try{
+ _socket->send(asio::buffer(
+ pmt::pmt_blob_data(msg.value),
+ pmt::pmt_blob_length(msg.value)
+ ));
+ }catch(...){
+ std::cerr << "socket msg block, socket send error, continuing..." << std::endl;
+ }
+ }
+
+ //when handle msgs finished, work is marked done
+ return -1;
+ }
+
+ boost::weak_ptr<asio::ip::tcp::socket> weak_socket;
+};
+
+/***********************************************************************
+ * Hier block that combines it all
+ **********************************************************************/
+class socket_msg_impl : public socket_msg{
+public:
+ socket_msg_impl(const std::string &addr, const std::string &port, const size_t mtu):
+ gr_hier_block2(
+ "socket_msg",
+ gr_make_io_signature(1, 1, 1),
+ gr_make_io_signature(1, 1, 1)
+ )
+ {
+ //setup tcp listen service
+ asio::ip::tcp::resolver resolver(_io_service);
+ asio::ip::tcp::resolver::query query(asio::ip::tcp::v4(), addr, port);
+ asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
+ _acceptor = boost::shared_ptr<asio::ip::tcp::acceptor>(new asio::ip::tcp::acceptor(_io_service, endpoint));
+ _tg.create_thread(boost::bind(&socket_msg_impl::serve, this));
+
+ //make the blocks
+ _consumer = boost::make_shared<socket_msg_consumer>();
+ _producer = boost::make_shared<socket_msg_producer>(mtu);
+
+ //connect
+ this->connect(this->self(), 0, _consumer, 0);
+ this->connect(_producer, 0, this->self(), 0);
+ }
+
+ ~socket_msg_impl(void)
+ {
+ _tg.interrupt_all();
+ _tg.join_all();
+ }
+
+private:
+
+ void serve(void)
+ {
+ while (not boost::this_thread::interruption_requested())
+ {
+ if (!wait_for_recv_ready(_acceptor->native())) continue;
+ _socket = boost::shared_ptr<asio::ip::tcp::socket>(new asio::ip::tcp::socket(_io_service));
+ _acceptor->accept(*_socket);
+
+ //a synchronous switchover to a new client socket
+ _consumer->weak_socket = _socket;
+ _producer->weak_socket = _socket;
+ }
+ }
+
+ boost::thread_group _tg;
+ asio::io_service _io_service;
+ boost::shared_ptr<asio::ip::tcp::socket> _socket;
+ boost::shared_ptr<asio::ip::tcp::acceptor> _acceptor;
+
+ boost::shared_ptr<socket_msg_consumer> _consumer;
+ boost::shared_ptr<socket_msg_producer> _producer;
+};
+
+/***********************************************************************
+ * Factory function
+ **********************************************************************/
+socket_msg::sptr socket_msg::make(const std::string &proto, const std::string &addr, const std::string &port, const size_t mtu)
+{
+ if (proto != "TCP") throw std::invalid_argument("unknown protocol for socket msg: " + proto);
+ return gnuradio::get_initial_sptr(new socket_msg_impl(addr, port, mtu));
+}

0 comments on commit 65df505

Please sign in to comment.
Something went wrong with that request. Please try again.