Skip to content

Commit

Permalink
Work in progress on SocketTestSuite
Browse files Browse the repository at this point in the history
There is an unexpected hangup
  • Loading branch information
Gei0r committed May 7, 2017
1 parent 6d6f6c0 commit c6079f4
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 51 deletions.
5 changes: 5 additions & 0 deletions rct/EventLoop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "Rct.h"
#include "SocketClient.h"
#include "Timer.h"
#include "Log.h"
#if defined(RCT_EVENTLOOP_CALLBACK_TIME_THRESHOLD) && RCT_EVENTLOOP_CALLBACK_TIME_THRESHOLD > 0
# include "Log.h"
# include "StopWatch.h"
Expand Down Expand Up @@ -467,6 +468,7 @@ inline bool EventLoop::sendTimers()

bool EventLoop::registerSocket(int fd, unsigned int mode, std::function<void(int, unsigned int)>&& func)
{
debug() << "registerSocket(fd=" << fd << " mode=" << mode << ")";
std::lock_guard<std::mutex> locker(mMutex);
mSockets[fd] = std::make_pair(mode, std::forward<std::function<void(int, unsigned int)> >(func));

Expand Down Expand Up @@ -586,6 +588,7 @@ bool EventLoop::updateSocket(int fd, unsigned int mode)

void EventLoop::unregisterSocket(int fd)
{
debug() << "unregisterSocket(fd=" << fd << ")";
std::lock_guard<std::mutex> locker(mMutex);
auto socket = mSockets.find(fd);
if (socket == mSockets.end())
Expand Down Expand Up @@ -715,6 +718,7 @@ unsigned int EventLoop::fireSocket(int fd, unsigned int mode)

unsigned int EventLoop::processSocketEvents(NativeEvent* events, int eventCount)
{
debug() << "process " << eventCount << "events...";
unsigned int all = 0;
int e;

Expand All @@ -739,6 +743,7 @@ unsigned int EventLoop::processSocketEvents(NativeEvent* events, int eventCount)
#if defined(HAVE_EPOLL)
const uint32_t ev = events[i].events;
const int fd = events[i].data.fd;
debug() << "event " << ev << " on fd=" << fd;
if (ev & (EPOLLERR|EPOLLHUP) && !(ev & EPOLLRDHUP)) {
// bad, take the fd out
epoll_ctl(mPollFd, EPOLL_CTL_DEL, fd, &events[i]);
Expand Down
9 changes: 8 additions & 1 deletion rct/SocketClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ SocketClient::SocketClient(int f, unsigned int mode)
: fd(f), socketPort(0), socketState(Connected), socketMode(mode),
wMode(Asynchronous), writeWait(false), mLogsEnabled(true), writeOffset(0)
{
debug() << "SocketClient ctor fd=" << fd;
assert(fd >= 0);
#ifdef HAVE_NOSIGPIPE
int flags = 1;
Expand Down Expand Up @@ -68,6 +69,7 @@ SocketClient::SocketClient(int f, unsigned int mode)

SocketClient::~SocketClient()
{
debug() << "SocketClient dtor (fd=" << fd << ")";
close();
}

Expand Down Expand Up @@ -252,7 +254,10 @@ bool SocketClient::connect(const String& path)
SocketClient::SharedPtr unixSocket = shared_from_this();

int e;
debug() << "socket client " << this << " (fd=" << fd
<< ") about to connect to " << path << "...";
eintrwrap(e, ::connect(fd, &addr, sizeof(addr_un)));
debug() << "After connect() call, returned " << e;
address = path;
if (e == 0) { // we're done
socketState = Connected;
Expand Down Expand Up @@ -507,7 +512,8 @@ bool SocketClient::writeTo(const String& host, uint16_t port, const unsigned cha
} else {
eintrwrap(e, ::write(fd, data + total, size - total));
}
DEBUG() << "SENT(2)" << (size - total) << "BYTES" << e << errno;
DEBUG() << "SENT(2)" << (size - total) << "BYTES to " << fd << ": " << e << errno;
DEBUG().flags();
if (e == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (wMode == Synchronous) {
Expand Down Expand Up @@ -738,6 +744,7 @@ bool SocketClient::init(unsigned int mode)
}

fd = ::socket(domain, type, 0);
debug() << "created client socket with fd=" << fd;
if (fd < 0) {
// bad
return false;
Expand Down
2 changes: 2 additions & 0 deletions rct/SocketClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class SocketClient : public std::enable_shared_from_this<SocketClient>
bool logsEnabled() const { return mLogsEnabled; }
void setLogsEnabled(bool on) { mLogsEnabled = on; }
private:
friend class SocketTestSuite;

bool init(unsigned int mode);

int fd;
Expand Down
2 changes: 2 additions & 0 deletions rct/SocketServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ bool SocketServer::listen(const Path &p)
close();

fd = ::socket(PF_UNIX, SOCK_STREAM, 0);
debug() << "Socket Server unix listen fd=" << fd;
if (fd < 0) {
// bad
serverError(this, InitializeError);
Expand Down Expand Up @@ -227,6 +228,7 @@ void SocketServer::socketCallback(int /*fd*/, int mode)
}

//EventLoop::eventLoop()->unregisterSocket( fd );
debug() << "SocketServer: got new connection with fd=" << e;
accepted.push(e);
serverNewConnection(this);
}
Expand Down
7 changes: 1 addition & 6 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,10 @@ endif ()
link_directories(${CPPUNIT_LIBRARY_DIRS} ${PROJECT_BINARY_DIR} ${RCT_BINARY_DIR})

set(RCT_TEST_SRCS main.cpp
PathTestSuite.cpp
ProcessTestSuite.cpp
MemoryMappedFileTestSuite.cpp
SHA256TestSuite.cpp
StringTestSuite.cpp
)

if (NOT CMAKE_SYSTEM_NAME MATCHES "Windows")
list(APPEND RCT_TEST_SRCS DateTestSuite.cpp SocketTestSuite.cpp)
list(APPEND RCT_TEST_SRCS SocketTestSuite.cpp)
endif()

set(BINARY_NAME "rct_tests")
Expand Down
24 changes: 15 additions & 9 deletions tests/DeferredAssert.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,26 @@
#include <string>
#include <mutex>

#define DEFERRED_ASSERT(da, expr) (da).assert((expr), #expr);
#define DEFERRED_COMPARE(da, lhs, rhs) (da).compare(lhs, rhs, #lhs, #rhs)
#define DEFERRED_ASSERT(da, expr) (da).assert((expr), #expr, __LINE__);
#define DEFERRED_COMPARE(da, lhs, rhs) (da).compare(lhs, rhs, #lhs, #rhs, __LINE__)

class DeferredAsserter
{
public:
DeferredAsserter() : m_testSuccessful(true) {}

bool assert(bool value, const char* expr=nullptr)
bool assert(bool value, const char* expr=nullptr, int line=0)
{
if(!value)
{
std::lock_guard<std::mutex> g(m_mutex);
if(expr)
{
std::cerr << "TEST ERROR: " << expr
<< " is false" << std::endl;
std::cerr << "TEST ERROR ";
if(line) std::cerr << "(line " << line << ") ";
std::cerr << expr << " is false";

std::cerr << std::endl;
}
m_testSuccessful = false;
}
Expand All @@ -43,16 +46,19 @@ class DeferredAsserter
template<class L, class R>
bool compare(const L &lhs, const R &rhs,
const char* expr_lhs=nullptr,
const char *expr_rhs=nullptr)
const char *expr_rhs=nullptr,
int line=0)
{
if(lhs != rhs)
{
std::lock_guard<std::mutex> g(m_mutex);
if(expr_lhs && expr_rhs)
{
std::cerr << "TEST ERROR: "
<< expr_lhs << " (" << lhs << ") != "
<< expr_rhs << " (" << rhs << ")" << std::endl;
std::cerr << "TEST ERROR ";
if(line) std::cerr << "(line " << line << ") ";
std::cerr << expr_lhs << " (" << lhs << ") != "
<< expr_rhs << " (" << rhs << ")";
std::cerr << std::endl;
}
else
{
Expand Down
120 changes: 85 additions & 35 deletions tests/SocketTestSuite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <rct/EventLoop.h>
#include <rct/SocketServer.h>
#include <rct/Log.h>

#include <iostream>
#include <thread>
Expand All @@ -15,60 +16,109 @@ void SocketTestSuite::unixSockets()
unixPathToUse.rm();

DeferredAsserter da;
std::mutex m;

// We need an event loop to receive signals.
EventLoop::SharedPtr loop(new EventLoop);
loop->init(EventLoop::MainEventLoop);

SocketServer s;
SocketServer::SharedPtr s(new SocketServer);

CPPUNIT_ASSERT(!s.isListening());
CPPUNIT_ASSERT(!s->isListening());

// flag whether the newConnection slot was called.
bool gotNewConnection = false;

// flag whether the connected slot was called.
bool gotConnected = false;
// the following variables count how often certain slots are called:
int server_newConnection = 0;
int server_recv = 0;
int client_connected = 0;
int client_recv = 0;
SocketClient::SharedPtr serverSocket;

// the slot lambdas will be called from the serverThread's context.
s.newConnection().connect([&](SocketServer *s)
{
gotNewConnection = true;

auto client = s->nextConnection();

DEFERRED_COMPARE(da, client->mode(), SocketClient::Unix);
DEFERRED_COMPARE(da, client->state(), SocketClient::Connected);
});
s.error().connect([&](SocketServer*, SocketServer::Error e)
{
std::ostringstream oss;
oss << "Server error: " << e;
da.fail(oss.str());
});

CPPUNIT_ASSERT(s.listen(unixPathToUse));
CPPUNIT_ASSERT(s.isListening());

std::thread serverThread([&](){loop->exec(200);});
s->newConnection().connect([&](SocketServer *s)
{
server_newConnection++;

// important: serverSocket needs to survive beyond this lambda,
// otherwise the connection will be closed in its dtor.
serverSocket = s->nextConnection();

DEFERRED_COMPARE(da, serverSocket->mode(), SocketClient::Unix);
DEFERRED_COMPARE(da, serverSocket->state(), SocketClient::Connected);

serverSocket->write("msg frm server");

serverSocket->readyRead().connect(
[&](SocketClient::SharedPtr ptr, Buffer &&b)
{
server_recv++;
debug() << "Server received from socket " << ptr.get()
<< " fd=" << ptr->fd;
std::string recv(b.data(), b.end());
DEFERRED_COMPARE(da, b.size(), 15u);
DEFERRED_COMPARE(da, recv, "msg from client");
});
});

s->error().connect([&](SocketServer*, SocketServer::Error e)
{
std::ostringstream oss;
oss << "Server error: " << e;
da.fail(oss.str());
});

CPPUNIT_ASSERT(s->listen(unixPathToUse));
CPPUNIT_ASSERT(s->isListening());

std::thread serverThread([&](){loop->exec(300);});

realSleep(100);
debug() << "creating client...";
SocketClient::SharedPtr client(new SocketClient(SocketClient::Unix));

client->connected().connect([&](SocketClient::SharedPtr)
{
gotConnected = true;
});
debug() << "client created." << client.get();

client->connected().connect([&](SocketClient::SharedPtr c)
{
std::lock_guard<std::mutex> g(m);
debug() << "Client " << c.get()
<< " connected. Send message...";
client_connected++;
c->write("msg from client");
});
client->readyRead().connect([&](SocketClient::SharedPtr, Buffer &&b)
{
debug() << "client recv";
std::lock_guard<std::mutex> g(m);
client_recv++;
std::string receivedData(b.data(), b.end());
DEFERRED_COMPARE(da, receivedData, "msg from server");
});
client->error().connect([&](SocketClient::SharedPtr ptr,
SocketClient::Error e)
{
debug() << "client " << ptr.get() << " got error " << e;
});

client->disconnected().connect([&](SocketClient::SharedPtr ptr)
{
debug() << "Client " << ptr.get() << " disconnected";
});

CPPUNIT_ASSERT(client->state() == SocketClient::Disconnected);

debug() << "client about to connect to " << unixPathToUse << "...";
CPPUNIT_ASSERT(client->connect(unixPathToUse));
realSleep(50);
client->close();
debug() << "client connect done";

realSleep(100);
// client->close();

serverThread.join();

CPPUNIT_ASSERT(gotNewConnection);
CPPUNIT_ASSERT(gotConnected);
CPPUNIT_ASSERT(server_newConnection == 1);
CPPUNIT_ASSERT(client_connected == 1);
CPPUNIT_ASSERT(client_recv == 1);
CPPUNIT_ASSERT(server_recv == 1);
CPPUNIT_ASSERT(da.result());

}
Expand Down

0 comments on commit c6079f4

Please sign in to comment.