Skip to content

Commit

Permalink
Completely revived UnixAPISocket. Now async functions are explicit.
Browse files Browse the repository at this point in the history
  • Loading branch information
GamePad64 committed Feb 10, 2014
1 parent 1e29e43 commit 91a881c
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 49 deletions.
89 changes: 52 additions & 37 deletions src/common/api/UnixAPISocket.cpp
Expand Up @@ -32,77 +32,92 @@ std::list<std::string> getSocketPathList() {
}

/* UnixSocket */
UnixAPISocket::UnixAPISocket(boost::asio::io_service& io_service) : session_socket(io_service), cleaning(false) {}
UnixAPISocket::UnixAPISocket(boost::asio::io_service& io_service) : session_socket(io_service) {}
UnixAPISocket::~UnixAPISocket() {
session_socket.shutdown(session_socket.shutdown_both);
session_socket.close();
}

std::string UnixAPISocket::generateSendString(std::string from){
MESSAGE_SIZE_TYPE native_size = from.size();
MESSAGE_SIZE_TYPE size_netlong = htonl(native_size);

auto charlen_s = std::string((char*)(&size_netlong), MESSAGE_SIZE_LENGTH);
return charlen_s+from;
}

stream_protocol::socket& UnixAPISocket::getSocket() {
return session_socket;
}

void UnixAPISocket::assignShutdownHandler(std::function< void() > shutdown_handler) {
m_shutdown_func = shutdown_handler;
void UnixAPISocket::send(api::APIMessage data, int& error_code){
auto send_string = generateSendString(data.SerializeAsString());

boost::asio::write(session_socket, boost::asio::buffer(send_string), error_code);
}

api::APIMessage UnixAPISocket::receive(int& error_code){
error_code = 0;
char* char_message_size = new char[MESSAGE_SIZE_LENGTH];
boost::asio::read(session_socket, boost::asio::buffer(char_message_size, MESSAGE_SIZE_LENGTH), error_code);

if(error_code){
return api::APIMessage();
}

MESSAGE_SIZE_TYPE size = ntohl(*reinterpret_cast<MESSAGE_SIZE_TYPE*>(char_message_size));
delete[] char_message_size;

char* buffer = new char[size];
boost::asio::read(session_socket, boost::asio::buffer(buffer, size), error_code);

if(error_code){
return api::APIMessage();
}

APIMessage message_proto;
message_proto.ParseFromArray(buffer, size);
delete[] buffer;

return message_proto;
}

void UnixAPISocket::assignReceiveHandler(std::function< void(APIMessage) > receive_handler) {
m_process_func = receive_handler;
void UnixAPISocket::asyncSend(api::APIMessage data, SendHandler send_handler) {
auto send_string = generateSendString(data.SerializeAsString());

boost::asio::async_write(session_socket, boost::asio::buffer(send_string),
std::bind(send_handler, this, std::placeholders::_1));
}

void UnixAPISocket::startReceive() {
void UnixAPISocket::asyncReceive(ReceiveHandler receive_handler) {
char* buffer = new char[MESSAGE_SIZE_LENGTH];
boost::asio::async_read(session_socket, boost::asio::buffer(buffer, MESSAGE_SIZE_LENGTH),
std::bind(&UnixAPISocket::handleReceiveSize, this, std::placeholders::_1, buffer));
std::bind(&UnixAPISocket::handleReceiveSize, this, std::placeholders::_1, buffer, receive_handler));
}

void UnixAPISocket::handleReceiveSize(const boost::system::error_code& error, char* char_message_size) {
void UnixAPISocket::handleReceiveSize(const boost::system::error_code& error, char* char_message_size, ReceiveHandler receive_handler) {
if(error){
cleanup();
session_socket.get_io_service().dispatch(std::bind(receive_handler, api::APIMessage(), error));
return;
}
MESSAGE_SIZE_TYPE size = ntohl(*reinterpret_cast<MESSAGE_SIZE_TYPE*>(char_message_size));
delete[] char_message_size;

char* buffer = new char[size];
boost::asio::async_read(session_socket, boost::asio::buffer(buffer, size),
std::bind(&UnixAPISocket::handleReceive, this, std::placeholders::_1, buffer, std::placeholders::_2));
std::bind(&UnixAPISocket::handleReceive, this, std::placeholders::_1, buffer, std::placeholders::_2, receive_handler));
}

void UnixAPISocket::handleReceive(const boost::system::error_code& error, char* message, uint32_t size) {
void UnixAPISocket::handleReceive(const boost::system::error_code& error, char* message, uint32_t size, ReceiveHandler receive_handler) {
if(error){
cleanup();
session_socket.get_io_service().dispatch(std::bind(receive_handler, api::APIMessage(), error));
return;
}
APIMessage message_proto;
message_proto.ParseFromArray(message, size);
delete[] message;

m_process_func(message_proto);
startReceive();
}

void UnixAPISocket::send(APIMessage message) {
auto native_size = message.ByteSize();
MESSAGE_SIZE_TYPE size_netlong = htonl(native_size);

auto charlen_s = std::string((char*)(&size_netlong), MESSAGE_SIZE_LENGTH);

boost::asio::async_write(session_socket, boost::asio::buffer(charlen_s+message.SerializeAsString()),
std::bind(&UnixAPISocket::handleSend, this, std::placeholders::_1));
}

void UnixAPISocket::handleSend(const boost::system::error_code& error) {
if(error){
cleanup();
}
}

void UnixAPISocket::cleanup(){
if(m_shutdown_func && !cleaning){
cleaning = true;
m_shutdown_func();
}
session_socket.get_io_service().dispatch(std::bind(receive_handler, message_proto, 0));
}

} /* namespace unix */
Expand Down
22 changes: 10 additions & 12 deletions src/common/api/UnixAPISocket.h
Expand Up @@ -31,29 +31,27 @@ namespace unix {
std::list<std::string> getSocketPathList();

class UnixAPISocket : protected Loggable {
typedef std::function<void(int)> SendHandler;
typedef std::function<void(api::APIMessage, int)> ReceiveHandler;

stream_protocol::socket session_socket;
std::function<void()> m_shutdown_func;
std::function<void(APIMessage)> m_process_func;

bool cleaning;
std::string generateSendString(std::string from);

public:
UnixAPISocket(boost::asio::io_service& io_service);
virtual ~UnixAPISocket();

stream_protocol::socket& getSocket();

void assignShutdownHandler(std::function<void()> shutdown_handler);
void assignReceiveHandler(std::function<void(APIMessage)> receive_handler);

void startReceive();
void handleReceiveSize(const boost::system::error_code& error, char* char_message_size);
void handleReceive(const boost::system::error_code& error, char* message, uint32_t size);
void send(api::APIMessage data, int& error_code);
api::APIMessage receive(int& error_code);

void send(APIMessage message);
void handleSend(const boost::system::error_code& error);
void asyncSend(api::APIMessage data, SendHandler send_handler);
void asyncReceive(ReceiveHandler receive_handler);

void cleanup();
void handleReceiveSize(const boost::system::error_code& error, char* char_message_size, ReceiveHandler receive_handler);
void handleReceive(const boost::system::error_code& error, char* message, uint32_t size, ReceiveHandler receive_handler);
};

} /* namespace unix */
Expand Down

0 comments on commit 91a881c

Please sign in to comment.