Skip to content

Commit

Permalink
Merge pull request #565 from nomis52/opc
Browse files Browse the repository at this point in the history
Move GPIO writes to a separate thread.
  • Loading branch information
nomis52 committed Dec 8, 2014
2 parents 7a72ab7 + b2586ec commit 6fbfa46
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 147 deletions.
120 changes: 66 additions & 54 deletions common/network/TCPConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,76 @@
namespace ola {
namespace network {

/*
* A TCP socket waiting to connect.
*/
class PendingTCPConnection: public ola::io::WriteFileDescriptor {
public:
PendingTCPConnection(TCPConnector *connector,
const IPV4Address &ip,
int fd,
TCPConnector::TCPConnectCallback *callback);

ola::io::DescriptorHandle WriteDescriptor() const { return m_handle; }

void PerformWrite();
void Close();

const IPV4Address ip_address;
TCPConnector::TCPConnectCallback *callback;
ola::thread::timeout_id timeout_id;

private:
TCPConnector *m_connector;
ola::io::DescriptorHandle m_handle;
};

PendingTCPConnection::PendingTCPConnection(
TCPConnector *connector,
const IPV4Address &ip,
int fd,
TCPConnector::TCPConnectCallback *callback)
: WriteFileDescriptor(),
ip_address(ip),
callback(callback),
timeout_id(ola::thread::INVALID_TIMEOUT),
m_connector(connector) {
#ifdef _WIN32
m_handle.m_handle.m_fd = fd;
m_handle.m_type = ola::io::SOCKET_DESCRIPTOR;
#else
m_handle = fd;
#endif
}

/*
* Close this connection
*/
void PendingTCPConnection::Close() {
#ifdef _WIN32
close(m_handle.m_handle.m_fd);
#else
close(m_handle);
#endif
}

/*
* Called when the socket becomes writeable
*/
void PendingTCPConnection::PerformWrite() {
m_connector->SocketWritable(this);
}

void DeleteConnection(PendingTCPConnection *connection) {
delete connection;
}

TCPConnector::TCPConnector(ola::io::SelectServerInterface *ss)
: m_ss(ss),
m_pending_callbacks(0) {
: m_ss(ss) {
}

TCPConnector::~TCPConnector() {
CancelAll();
if (m_pending_callbacks) {
m_ss->DrainCallbacks();
}
m_pending_callbacks++;
CleanUpOrphans();
}


Expand Down Expand Up @@ -181,9 +239,7 @@ void TCPConnector::SocketWritable(PendingTCPConnection *connection) {

// we're already within the PendingTCPConnection's call stack here
// schedule the deletion to run later
m_orphaned_connections.push_back(connection);
m_pending_callbacks++;
m_ss->Execute(ola::NewSingleCallback(this, &TCPConnector::CleanUpOrphans));
m_ss->Execute(ola::NewSingleCallback(DeleteConnection, connection));

if (error) {
OLA_WARN << "connect() to " << connection->ip_address << " returned: "
Expand Down Expand Up @@ -230,49 +286,5 @@ void TCPConnector::TimeoutEvent(PendingTCPConnection *connection) {
Timeout(iter);
m_connections.erase(iter);
}


TCPConnector::PendingTCPConnection::PendingTCPConnection(
TCPConnector *connector,
const IPV4Address &ip,
int fd,
TCPConnectCallback *callback)
: WriteFileDescriptor(),
ip_address(ip),
callback(callback),
timeout_id(ola::thread::INVALID_TIMEOUT),
m_connector(connector) {
#ifdef _WIN32
m_handle.m_handle.m_fd = fd;
m_handle.m_type = ola::io::SOCKET_DESCRIPTOR;
#else
m_handle = fd;
#endif
}


/**
* Close this connection
*/
void TCPConnector::PendingTCPConnection::Close() {
#ifdef _WIN32
close(m_handle.m_handle.m_fd);
#else
close(m_handle);
#endif
}


/**
* Called when the socket becomes writeable
*/
void TCPConnector::PendingTCPConnection::PerformWrite() {
m_connector->SocketWritable(this);
}

void TCPConnector::CleanUpOrphans() {
m_pending_callbacks--;
STLDeleteElements(&m_orphaned_connections);
}
} // namespace network
} // namespace ola
22 changes: 10 additions & 12 deletions common/rpc/RpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ using ola::network::IPV4SocketAddress;
using ola::network::TCPAcceptingSocket;
using ola::network::TCPSocket;

namespace {
void CleanupChannel(RpcChannel *channel,
ConnectedDescriptor *descriptor) {
delete channel;
delete descriptor;
}
} // namespace

const char RpcServer::K_CLIENT_VAR[] = "clients-connected";
const char RpcServer::K_RPC_PORT_VAR[] = "rpc-port";

Expand All @@ -58,10 +66,6 @@ RpcServer::RpcServer(ola::io::SelectServerInterface *ss,
}

RpcServer::~RpcServer() {
// Since we use ss->Execute() below, we need to ensure all pending callbacks
// are run.

m_ss->DrainCallbacks();
// Take a copy since calling the close handler will cause the socket to be
// removed from m_connected_sockets
ClientDescriptors sockets = m_connected_sockets;
Expand Down Expand Up @@ -168,14 +172,8 @@ void RpcServer::ChannelClosed(ConnectedDescriptor *descriptor,
// We're in the call stack of both the descriptor and the channel here.
// We schedule deletion during the next run of the event loop to break out of
// the stack.
m_ss->Execute(NewSingleCallback(this, &RpcServer::CleanupChannel,
session->Channel(), descriptor));
}

void RpcServer::CleanupChannel(RpcChannel *channel,
ConnectedDescriptor *descriptor) {
delete channel;
delete descriptor;
m_ss->Execute(
NewSingleCallback(CleanupChannel, session->Channel(), descriptor));
}
} // namespace rpc
} // namespace ola
3 changes: 0 additions & 3 deletions common/rpc/RpcServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ class RpcServer {
void ChannelClosed(ola::io::ConnectedDescriptor *socket,
class RpcSession *session);

void CleanupChannel(class RpcChannel *channel,
ola::io::ConnectedDescriptor *socket);

static const char K_CLIENT_VAR[];
static const char K_RPC_PORT_VAR[];
};
Expand Down
36 changes: 7 additions & 29 deletions include/ola/network/TCPConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,43 +100,21 @@ class TCPConnector {
*/
unsigned int ConnectionsPending() const { return m_connections.size(); }

private:
/**
* A TCP socket waiting to connect.
* @brief Called when the TCP socket connects.
* @param connection the connection that is now connected.
*/
class PendingTCPConnection: public ola::io::WriteFileDescriptor {
public:
PendingTCPConnection(TCPConnector *connector,
const IPV4Address &ip,
int fd,
TCPConnectCallback *callback);

ola::io::DescriptorHandle WriteDescriptor() const { return m_handle; }

void PerformWrite();
void Close();
void SocketWritable(class PendingTCPConnection *connection);

const IPV4Address ip_address;
TCPConnectCallback *callback;
ola::thread::timeout_id timeout_id;

private:
TCPConnector *m_connector;
ola::io::DescriptorHandle m_handle;
};

typedef std::set<PendingTCPConnection*> ConnectionSet;
typedef std::vector<PendingTCPConnection*> ConnectionList;
private:
typedef std::set<class PendingTCPConnection*> ConnectionSet;

ola::io::SelectServerInterface *m_ss;
ConnectionSet m_connections;
ConnectionList m_orphaned_connections;
unsigned int m_pending_callbacks;

void SocketWritable(PendingTCPConnection *connection);
void FreePendingConnection(PendingTCPConnection *connection);
void FreePendingConnection(class PendingTCPConnection *connection);
void Timeout(const ConnectionSet::iterator &iter);
void TimeoutEvent(PendingTCPConnection *connection);
void TimeoutEvent(class PendingTCPConnection *connection);
void CleanUpOrphans();

DISALLOW_COPY_AND_ASSIGN(TCPConnector);
Expand Down
6 changes: 0 additions & 6 deletions include/olad/Preferences.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,6 @@ class FilePreferenceSaverThread: public ola::thread::Thread {
private:
ola::io::SelectServer m_ss;

/**
* Perform the save
*/
void SaveToFile(const std::string *filename,
const PreferencesMap *preferences);

/**
* Notify the blocked thread we're done
*/
Expand Down
54 changes: 26 additions & 28 deletions olad/Preferences.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,30 @@ using std::pair;
using std::string;
using std::vector;

namespace {
void SavePreferencesToFile(
const string *filename_ptr,
const FilePreferenceSaverThread::PreferencesMap *pref_map_ptr) {
std::auto_ptr<const string> filename(filename_ptr);
std::auto_ptr<const FilePreferenceSaverThread::PreferencesMap> pref_map(
pref_map_ptr);

FilePreferenceSaverThread::PreferencesMap::const_iterator iter;
ofstream pref_file(filename->data());

if (!pref_file.is_open()) {
OLA_WARN << "Could not open " << *filename_ptr << ": " << strerror(errno);
return;
}

for (iter = pref_map->begin(); iter != pref_map->end(); ++iter) {
pref_file << iter->first << " = " << iter->second << std::endl;
}
pref_file.flush();
pref_file.close();
}
} // namespace

const char BoolValidator::ENABLED[] = "true";
const char BoolValidator::DISABLED[] = "false";

Expand Down Expand Up @@ -303,18 +327,14 @@ void FilePreferenceSaverThread::SavePreferences(
const PreferencesMap &preferences) {
const string *file_name_ptr = new string(file_name);
const PreferencesMap *save_map = new PreferencesMap(preferences);
SingleUseCallback0<void> *cb = NewSingleCallback(
this,
&FilePreferenceSaverThread::SaveToFile,
file_name_ptr,
save_map);
SingleUseCallback0<void> *cb =
NewSingleCallback(SavePreferencesToFile, file_name_ptr, save_map);
m_ss.Execute(cb);
}


void *FilePreferenceSaverThread::Run() {
m_ss.Run();
m_ss.DrainCallbacks();
return NULL;
}

Expand All @@ -338,28 +358,6 @@ void FilePreferenceSaverThread::Syncronize() {
}


void FilePreferenceSaverThread::SaveToFile(
const string *filename_ptr,
const PreferencesMap *pref_map_ptr) {
std::auto_ptr<const string> filename(filename_ptr);
std::auto_ptr<const PreferencesMap> pref_map(pref_map_ptr);

PreferencesMap::const_iterator iter;
ofstream pref_file(filename->data());

if (!pref_file.is_open()) {
OLA_WARN << "Could not open " << *filename_ptr << ": " << strerror(errno);
return;
}

for (iter = pref_map->begin(); iter != pref_map->end(); ++iter) {
pref_file << iter->first << " = " << iter->second << std::endl;
}
pref_file.flush();
pref_file.close();
}


void FilePreferenceSaverThread::CompleteSyncronization(
ConditionVariable *condition,
Mutex *mutex) {
Expand Down
Loading

0 comments on commit 6fbfa46

Please sign in to comment.