Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move GPIO writes to a separate thread. #565

Merged
merged 5 commits into from
Dec 8, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 66 additions & 54 deletions common/network/TCPConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,76 @@
namespace ola {
namespace network {

/*
* A TCP socket waiting to connect.
*/
class PendingTCPConnection: public ola::io::WriteFileDescriptor {
public:
PendingTCPConnection(TCPConnector *connector,
const IPV4Address &ip,
int fd,
TCPConnector::TCPConnectCallback *callback);

ola::io::DescriptorHandle WriteDescriptor() const { return m_handle; }

void PerformWrite();
void Close();

const IPV4Address ip_address;
TCPConnector::TCPConnectCallback *callback;
ola::thread::timeout_id timeout_id;

private:
TCPConnector *m_connector;
ola::io::DescriptorHandle m_handle;
};

PendingTCPConnection::PendingTCPConnection(
TCPConnector *connector,
const IPV4Address &ip,
int fd,
TCPConnector::TCPConnectCallback *callback)
: WriteFileDescriptor(),
ip_address(ip),
callback(callback),
timeout_id(ola::thread::INVALID_TIMEOUT),
m_connector(connector) {
#ifdef _WIN32
m_handle.m_handle.m_fd = fd;
m_handle.m_type = ola::io::SOCKET_DESCRIPTOR;
#else
m_handle = fd;
#endif
}

/*
* Close this connection
*/
void PendingTCPConnection::Close() {
#ifdef _WIN32
close(m_handle.m_handle.m_fd);
#else
close(m_handle);
#endif
}

/*
* Called when the socket becomes writeable
*/
void PendingTCPConnection::PerformWrite() {
m_connector->SocketWritable(this);
}

void DeleteConnection(PendingTCPConnection *connection) {
delete connection;
}

TCPConnector::TCPConnector(ola::io::SelectServerInterface *ss)
: m_ss(ss),
m_pending_callbacks(0) {
: m_ss(ss) {
}

TCPConnector::~TCPConnector() {
CancelAll();
if (m_pending_callbacks) {
m_ss->DrainCallbacks();
}
m_pending_callbacks++;
CleanUpOrphans();
}


Expand Down Expand Up @@ -181,9 +239,7 @@ void TCPConnector::SocketWritable(PendingTCPConnection *connection) {

// we're already within the PendingTCPConnection's call stack here
// schedule the deletion to run later
m_orphaned_connections.push_back(connection);
m_pending_callbacks++;
m_ss->Execute(ola::NewSingleCallback(this, &TCPConnector::CleanUpOrphans));
m_ss->Execute(ola::NewSingleCallback(DeleteConnection, connection));

if (error) {
OLA_WARN << "connect() to " << connection->ip_address << " returned: "
Expand Down Expand Up @@ -230,49 +286,5 @@ void TCPConnector::TimeoutEvent(PendingTCPConnection *connection) {
Timeout(iter);
m_connections.erase(iter);
}


TCPConnector::PendingTCPConnection::PendingTCPConnection(
TCPConnector *connector,
const IPV4Address &ip,
int fd,
TCPConnectCallback *callback)
: WriteFileDescriptor(),
ip_address(ip),
callback(callback),
timeout_id(ola::thread::INVALID_TIMEOUT),
m_connector(connector) {
#ifdef _WIN32
m_handle.m_handle.m_fd = fd;
m_handle.m_type = ola::io::SOCKET_DESCRIPTOR;
#else
m_handle = fd;
#endif
}


/**
* Close this connection
*/
void TCPConnector::PendingTCPConnection::Close() {
#ifdef _WIN32
close(m_handle.m_handle.m_fd);
#else
close(m_handle);
#endif
}


/**
* Called when the socket becomes writeable
*/
void TCPConnector::PendingTCPConnection::PerformWrite() {
m_connector->SocketWritable(this);
}

void TCPConnector::CleanUpOrphans() {
m_pending_callbacks--;
STLDeleteElements(&m_orphaned_connections);
}
} // namespace network
} // namespace ola
22 changes: 10 additions & 12 deletions common/rpc/RpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ using ola::network::IPV4SocketAddress;
using ola::network::TCPAcceptingSocket;
using ola::network::TCPSocket;

namespace {
void CleanupChannel(RpcChannel *channel,
ConnectedDescriptor *descriptor) {
delete channel;
delete descriptor;
}
} // namespace

const char RpcServer::K_CLIENT_VAR[] = "clients-connected";
const char RpcServer::K_RPC_PORT_VAR[] = "rpc-port";

Expand All @@ -58,10 +66,6 @@ RpcServer::RpcServer(ola::io::SelectServerInterface *ss,
}

RpcServer::~RpcServer() {
// Since we use ss->Execute() below, we need to ensure all pending callbacks
// are run.

m_ss->DrainCallbacks();
// Take a copy since calling the close handler will cause the socket to be
// removed from m_connected_sockets
ClientDescriptors sockets = m_connected_sockets;
Expand Down Expand Up @@ -168,14 +172,8 @@ void RpcServer::ChannelClosed(ConnectedDescriptor *descriptor,
// We're in the call stack of both the descriptor and the channel here.
// We schedule deletion during the next run of the event loop to break out of
// the stack.
m_ss->Execute(NewSingleCallback(this, &RpcServer::CleanupChannel,
session->Channel(), descriptor));
}

void RpcServer::CleanupChannel(RpcChannel *channel,
ConnectedDescriptor *descriptor) {
delete channel;
delete descriptor;
m_ss->Execute(
NewSingleCallback(CleanupChannel, session->Channel(), descriptor));
}
} // namespace rpc
} // namespace ola
3 changes: 0 additions & 3 deletions common/rpc/RpcServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ class RpcServer {
void ChannelClosed(ola::io::ConnectedDescriptor *socket,
class RpcSession *session);

void CleanupChannel(class RpcChannel *channel,
ola::io::ConnectedDescriptor *socket);

static const char K_CLIENT_VAR[];
static const char K_RPC_PORT_VAR[];
};
Expand Down
36 changes: 7 additions & 29 deletions include/ola/network/TCPConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,43 +100,21 @@ class TCPConnector {
*/
unsigned int ConnectionsPending() const { return m_connections.size(); }

private:
/**
* A TCP socket waiting to connect.
* @brief Called when the TCP socket connects.
* @param connection the connection that is now connected.
*/
class PendingTCPConnection: public ola::io::WriteFileDescriptor {
public:
PendingTCPConnection(TCPConnector *connector,
const IPV4Address &ip,
int fd,
TCPConnectCallback *callback);

ola::io::DescriptorHandle WriteDescriptor() const { return m_handle; }

void PerformWrite();
void Close();
void SocketWritable(class PendingTCPConnection *connection);

const IPV4Address ip_address;
TCPConnectCallback *callback;
ola::thread::timeout_id timeout_id;

private:
TCPConnector *m_connector;
ola::io::DescriptorHandle m_handle;
};

typedef std::set<PendingTCPConnection*> ConnectionSet;
typedef std::vector<PendingTCPConnection*> ConnectionList;
private:
typedef std::set<class PendingTCPConnection*> ConnectionSet;

ola::io::SelectServerInterface *m_ss;
ConnectionSet m_connections;
ConnectionList m_orphaned_connections;
unsigned int m_pending_callbacks;

void SocketWritable(PendingTCPConnection *connection);
void FreePendingConnection(PendingTCPConnection *connection);
void FreePendingConnection(class PendingTCPConnection *connection);
void Timeout(const ConnectionSet::iterator &iter);
void TimeoutEvent(PendingTCPConnection *connection);
void TimeoutEvent(class PendingTCPConnection *connection);
void CleanUpOrphans();

DISALLOW_COPY_AND_ASSIGN(TCPConnector);
Expand Down
6 changes: 0 additions & 6 deletions include/olad/Preferences.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,6 @@ class FilePreferenceSaverThread: public ola::thread::Thread {
private:
ola::io::SelectServer m_ss;

/**
* Perform the save
*/
void SaveToFile(const std::string *filename,
const PreferencesMap *preferences);

/**
* Notify the blocked thread we're done
*/
Expand Down
54 changes: 26 additions & 28 deletions olad/Preferences.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,30 @@ using std::pair;
using std::string;
using std::vector;

namespace {
void SavePreferencesToFile(
const string *filename_ptr,
const FilePreferenceSaverThread::PreferencesMap *pref_map_ptr) {
std::auto_ptr<const string> filename(filename_ptr);
std::auto_ptr<const FilePreferenceSaverThread::PreferencesMap> pref_map(
pref_map_ptr);

FilePreferenceSaverThread::PreferencesMap::const_iterator iter;
ofstream pref_file(filename->data());

if (!pref_file.is_open()) {
OLA_WARN << "Could not open " << *filename_ptr << ": " << strerror(errno);
return;
}

for (iter = pref_map->begin(); iter != pref_map->end(); ++iter) {
pref_file << iter->first << " = " << iter->second << std::endl;
}
pref_file.flush();
pref_file.close();
}
} // namespace

const char BoolValidator::ENABLED[] = "true";
const char BoolValidator::DISABLED[] = "false";

Expand Down Expand Up @@ -303,18 +327,14 @@ void FilePreferenceSaverThread::SavePreferences(
const PreferencesMap &preferences) {
const string *file_name_ptr = new string(file_name);
const PreferencesMap *save_map = new PreferencesMap(preferences);
SingleUseCallback0<void> *cb = NewSingleCallback(
this,
&FilePreferenceSaverThread::SaveToFile,
file_name_ptr,
save_map);
SingleUseCallback0<void> *cb =
NewSingleCallback(SavePreferencesToFile, file_name_ptr, save_map);
m_ss.Execute(cb);
}


void *FilePreferenceSaverThread::Run() {
m_ss.Run();
m_ss.DrainCallbacks();
return NULL;
}

Expand All @@ -338,28 +358,6 @@ void FilePreferenceSaverThread::Syncronize() {
}


void FilePreferenceSaverThread::SaveToFile(
const string *filename_ptr,
const PreferencesMap *pref_map_ptr) {
std::auto_ptr<const string> filename(filename_ptr);
std::auto_ptr<const PreferencesMap> pref_map(pref_map_ptr);

PreferencesMap::const_iterator iter;
ofstream pref_file(filename->data());

if (!pref_file.is_open()) {
OLA_WARN << "Could not open " << *filename_ptr << ": " << strerror(errno);
return;
}

for (iter = pref_map->begin(); iter != pref_map->end(); ++iter) {
pref_file << iter->first << " = " << iter->second << std::endl;
}
pref_file.flush();
pref_file.close();
}


void FilePreferenceSaverThread::CompleteSyncronization(
ConditionVariable *condition,
Mutex *mutex) {
Expand Down
Loading