Permalink
Browse files

Successfully reading tile data asynchronously.

Headers are coming through with some extra gibberish at the end.
  • Loading branch information...
1 parent 3d896c1 commit c71c7aac0374d84d2d7d99e7fee2aea160181ec4 @chadrik committed Mar 26, 2012
@@ -128,14 +128,14 @@ driver_open
const char* filename = "foo.socket";
AiMsgInfo("[driver_socket] Connecting");
- ImageOutput* out = ImageOutput::create(filename);
+ ImageOutput* out = v1_1::ImageOutput::create(filename);
if (!out)
{
AiMsgError("[driver_socket] %s", out->geterror().c_str());
return;
}
- ImageSpec spec = ImageSpec();
+ ImageSpec spec = v1_1::ImageSpec();
const char *name = "";
int pixel_type;
@@ -214,7 +214,6 @@ driver_prepare_bucket
driver_write_bucket
{
AiMsgInfo("[driver_socket] write bucket (%d, %d)", bucket_xo, bucket_yo);
- std::cout << "test" << std::endl;
int pixel_type;
const void* bucket_data;
View
@@ -53,11 +53,31 @@ OIIO_NAMESPACE_ENTER
using boost::asio::ip::tcp;
+typedef boost::shared_ptr<boost::asio::io_service> io_service_ptr;
+typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr;
+
+class Session
+{
+public:
+ Session(boost::asio::io_service& io_service);
+ tcp::socket& socket();
+ void start();
+ void handle_read(const boost::system::error_code& error,
+ size_t bytes_transferred);
+
+private:
+ tcp::socket m_socket;
+ unsigned int m_header_length;
+ enum { max_length = 1024 };
+ char data_[max_length];
+};
+
class Server
{
public:
Server (boost::asio::io_service& io_service, short port, boost::function<void(std::string&)> accept_handler);
+ //void handle_accept (Session* session, const boost::system::error_code& error);
void handle_accept (const boost::system::error_code& error);
tcp::socket& get_socket ();
@@ -92,7 +112,8 @@ class ServerPool
void operator delete (void * /*todel*/) { }
static ServerPool* m_instance;
- boost::asio::io_service m_io_service;
+ io_service_ptr m_io_service;
+ work_ptr m_work;
std::vector<server_ptr> m_server_list;
std::map<int, server_ptr> m_server_map;
};
@@ -38,6 +38,46 @@ OIIO_NAMESPACE_ENTER
using boost::asio::ip::tcp;
+Session::Session(boost::asio::io_service& io_service)
+ : m_socket(io_service),
+ m_header_length(0)
+{
+}
+
+tcp::socket& Session::socket()
+{
+ return m_socket;
+}
+
+void Session::start()
+{
+// m_socket.async_read_some(boost::asio::buffer(data_, max_length),
+// boost::bind(&Session::handle_read, this,
+// boost::asio::placeholders::error,
+// boost::asio::placeholders::bytes_transferred));
+ std::cout << "Session::start" << std::endl;
+ boost::asio::async_read (m_socket,
+ boost::asio::buffer (reinterpret_cast<char *> (&m_header_length), sizeof (boost::uint32_t)),
+ boost::bind (&Session::handle_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+}
+
+void Session::handle_read(const boost::system::error_code& error,
+ size_t bytes_transferred)
+{
+ if (!error)
+ {
+ std::cout << "handle_read" << std::endl;
+ }
+ else
+ {
+ std::cout << "handle_read error" << std::endl;
+ delete this;
+ }
+}
+
+
Server::Server(boost::asio::io_service& io_service, short port, boost::function<void(std::string&)> accept_handler)
: m_socket(io_service),
m_io_service(io_service),
@@ -46,20 +86,24 @@ Server::Server(boost::asio::io_service& io_service, short port, boost::function<
{
m_filename = Strutil::format ("sockethandle?port=%d.socket", port);
std::cout << "setting up accept handler " << port << std::endl;
+ //Session* new_session = new Session(m_io_service);
m_acceptor.async_accept(m_socket,
- boost::bind(&Server::handle_accept, this,
+ boost::bind(&Server::handle_accept, this, //new_session,
boost::asio::placeholders::error));
}
void
+//Server::handle_accept(Session* session, const boost::system::error_code& error)
Server::handle_accept(const boost::system::error_code& error)
{
+ //delete session;
if (!error) {
std::cout << "handle accept" << std::endl;
m_accept_handler(m_filename);
+ //session->start();
// what to do wit the current image
//callback(newimage);
// std::cout << "setting up new accept handler " << std::endl;
@@ -85,7 +129,9 @@ Server::get_socket()
ServerPool* ServerPool::m_instance = NULL;
-ServerPool::ServerPool () : m_io_service()
+ServerPool::ServerPool () :
+ m_io_service(new boost::asio::io_service),
+ m_work(new boost::asio::io_service::work(*m_io_service))
{
}
@@ -122,7 +168,7 @@ bool
ServerPool::run ()
{
// try {
- m_io_service.run();
+ m_io_service->run();
// }
// catch (std::exception& e) {
// std::cerr << "Exception: " << e.what() << "\n";
@@ -136,7 +182,7 @@ ServerPool::run ()
void
ServerPool::add_server (short port, boost::function<void(std::string&)> accept_handler)
{
- server_ptr server(new Server(m_io_service, port, accept_handler));
+ server_ptr server(new Server(*m_io_service, port, accept_handler));
m_server_list.push_back(server);
m_server_map[port] = server;
}
@@ -145,7 +191,8 @@ ServerPool::add_server (short port, boost::function<void(std::string&)> accept_h
boost::asio::io_service&
ServerPool::get_io_service ()
{
- return m_io_service;
+ boost::asio::io_service& io_service = *m_io_service;
+ return io_service;
}
@@ -53,6 +53,7 @@
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
+#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>
OIIO_PLUGIN_NAMESPACE_BEGIN
@@ -107,8 +108,9 @@ class SocketInput : public ImageInput {
// io_service io;
ip::tcp::socket* m_socket;
// boost::shared_ptr <ip::tcp::acceptor> acceptor;
- int m_header_length;
+ unsigned int m_header_length;
short m_port;
+ boost::thread m_thread;
bool accept_connection (const std::string &name);
bool get_spec_from_client (ImageSpec &spec);
@@ -114,6 +114,13 @@ SocketInput::open (const std::string &name, ImageSpec &newspec,
std::cout << "removed socket" << std::endl;
}
}
+// else {
+// std::cout << "running" << std::endl;
+// m_thread = boost::thread(
+// boost::bind(&boost::asio::io_service::run, &ServerPool::instance()->get_io_service()));
+//// ServerPool::instance()->get_io_service().run();
+//
+// }
} else {
if (m_socket) {
m_socket->close();
@@ -164,7 +171,7 @@ SocketInput::read_native_scanline (int y, int z, void *data)
bool
SocketInput::read_native_tile (int x, int y, int z, void *data)
{
- std::cout << "tile " << x << " " << y << std::endl;
+// std::cout << "tile " << x << " " << y << std::endl;
// try {
// boost::asio::read (*m_socket, buffer (reinterpret_cast<char *> (data),
// m_spec.tile_bytes ()));
@@ -268,8 +275,12 @@ SocketInput::listen_for_header_from_client ()
try {
boost::asio::async_read (*m_socket,
boost::asio::buffer (reinterpret_cast<char *> (&m_header_length), sizeof (boost::uint32_t)),
- boost::bind(&SocketInput::handle_read_header, this,
+ boost::bind (&SocketInput::handle_read_header, this,
placeholders::error));
+// m_socket->async_read_some (
+// boost::asio::buffer (reinterpret_cast<char *> (&m_header_length), sizeof (boost::uint32_t)),
+// boost::bind (&SocketInput::handle_read_header, this,
+// placeholders::error));
} catch (boost::system::system_error &err) {
error ("Error while reading: %s", err.what ());
@@ -282,18 +293,22 @@ SocketInput::listen_for_header_from_client ()
void
-SocketInput::handle_read_header(const boost::system::error_code& error)
+SocketInput::handle_read_header (const boost::system::error_code& error)
{
- std::cout << "handle_read_header" << std::endl;
+
if (!error) {
+ std::cout << "handle_read_header" << std::endl;
// try {
char *buf = new char[m_header_length + 1];
boost::asio::read (*m_socket, boost::asio::buffer (buf, m_header_length));
std::string header = buf;
- std::cout << header << std::endl;
+ std::cout << "TILE: " << header << std::endl;
delete [] buf;
+ // listen for next tile
+ listen_for_header_from_client();
+
// } catch (boost::system::system_error &err) {
// // FIXME: we have a memory leak if read fails and spec_xml is not deleted
// error ("Error while reading: %s", err.what ());
@@ -305,12 +320,13 @@ SocketInput::handle_read_header(const boost::system::error_code& error)
// boost::asio::placeholders::error));
}
else {
+ std::cout << "handle_read_header ERROR" << std::endl;
// room_.leave (shared_from_this ());
}
}
void
-SocketInput::handle_read_data(const boost::system::error_code& error)
+SocketInput::handle_read_data (const boost::system::error_code& error)
{
if (!error) {
// room_.deliver(read_msg_);
@@ -65,7 +65,7 @@ SocketOutput::open (const std::string &name, const ImageSpec &newspec,
if (! (connect_to_server (name) && send_spec_to_server (newspec))) {
return false;
}
- std::cout << "connection successful" << std::endl;
+ std::cout << "SocketOutput::open: connection successful" << std::endl;
m_next_scanline = 0;
m_spec = newspec;
@@ -112,7 +112,7 @@ SocketOutput::write_tile (int x, int y, int z,
// error ("Error while reading: %s", err.what ());
// return false;
// }
-// return true;
+ return true;
}
return false;
@@ -148,7 +148,7 @@ SocketOutput::send_spec_to_server (const ImageSpec& spec)
bool
SocketOutput::send_header_to_server (const std::string &header)
{
- int length = header.length ();
+ unsigned int length = header.length ();
try {
// first send the size of the header

0 comments on commit c71c7aa

Please sign in to comment.