Skip to content

Commit

Permalink
Merge pull request #562 from nomis52/opc
Browse files Browse the repository at this point in the history
Add support for the Open Pixel Control Protocol, Closes #502.
  • Loading branch information
nomis52 committed Dec 7, 2014
2 parents 18f5f8c + ca02b45 commit 7a72ab7
Show file tree
Hide file tree
Showing 47 changed files with 2,604 additions and 219 deletions.
4 changes: 3 additions & 1 deletion NEWS
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
x/y/2014 ola-0.9.4
Features:
* Added support for hotplugging USB devices using the usbdmx plugin, #374.
* Added support for setting the thread scheduling options, #399
* Added support for setting the thread scheduling options, #399.
* Added support for the Open Pixel Control protocol, #502.
* Added a GPIO Plugin, closes #299.

API:
*
Expand Down
8 changes: 8 additions & 0 deletions common/io/IOQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ void IOQueue::AppendBlock(class MemoryBlock *block) {
m_blocks.push_back(block);
}

void IOQueue::AppendMove(IOQueue *other) {
BlockVector::const_iterator iter = other->m_blocks.begin();
for (; iter != other->m_blocks.end(); ++iter) {
m_blocks.push_back(*iter);
}
other->m_blocks.clear();
}


/**
* Remove all data from the IOQueue.
Expand Down
1 change: 1 addition & 0 deletions common/io/Makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ common_libolacommon_la_SOURCES += \
common/io/IOQueue.cpp \
common/io/IOStack.cpp \
common/io/IOUtils.cpp \
common/io/NonBlockingSender.cpp \
common/io/PollerInterface.cpp \
common/io/PollerInterface.h \
common/io/SelectServer.cpp \
Expand Down
96 changes: 96 additions & 0 deletions common/io/NonBlockingSender.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* NonBlockingSender.cpp
* Copyright (C) 2013 Simon Newton
*/

#include "ola/Callback.h"
#include "ola/io/IOQueue.h"
#include "ola/io/IOStack.h"
#include "ola/io/NonBlockingSender.h"

namespace ola {
namespace io {

const unsigned int NonBlockingSender::DEFAULT_MAX_BUFFER_SIZE = 1024;

NonBlockingSender::NonBlockingSender(ola::io::ConnectedDescriptor *descriptor,
ola::io::SelectServerInterface *ss,
ola::io::MemoryBlockPool *memory_pool,
unsigned int max_buffer_size)
: m_descriptor(descriptor),
m_ss(ss),
m_output_buffer(memory_pool),
m_associated(false),
m_max_buffer_size(max_buffer_size) {
m_descriptor->SetOnWritable(
ola::NewCallback(this, &NonBlockingSender::PerformWrite));
}

NonBlockingSender::~NonBlockingSender() {
if (m_associated) {
m_ss->RemoveWriteDescriptor(m_descriptor);
}
m_descriptor->SetOnWritable(NULL);
}

bool NonBlockingSender::LimitReached() const {
return m_output_buffer.Size() >= m_max_buffer_size;
}

bool NonBlockingSender::SendMessage(ola::io::IOStack *stack) {
if (LimitReached()) {
return false;
}

stack->MoveToIOQueue(&m_output_buffer);
AssociateIfRequired();
return true;
}

bool NonBlockingSender::SendMessage(IOQueue *queue) {
if (LimitReached()) {
return false;
}

m_output_buffer.AppendMove(queue);
AssociateIfRequired();
return true;
}

/*
* Called when the descriptor is writeable, this does the actual write() call.
*/
void NonBlockingSender::PerformWrite() {
m_descriptor->Send(&m_output_buffer);
if (m_output_buffer.Empty() && m_associated) {
m_ss->RemoveWriteDescriptor(m_descriptor);
m_associated = false;
}
}

/*
* Associate our descriptor with the SelectServer if we have data to send.
*/
void NonBlockingSender::AssociateIfRequired() {
if (m_output_buffer.Empty()) {
return;
}
m_ss->AddWriteDescriptor(m_descriptor);
m_associated = true;
}
} // namespace io
} // namespace ola
21 changes: 11 additions & 10 deletions common/network/TCPConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ void TCPConnector::SocketWritable(PendingTCPConnection *connection) {
error = errno;
}

ConnectionSet::iterator iter = m_connections.find(connection);
if (iter != m_connections.end()) {
m_connections.erase(iter);
}

// we're already within the PendingTCPConnection's call stack here
// schedule the deletion to run later
m_orphaned_connections.push_back(connection);
m_pending_callbacks++;
m_ss->Execute(ola::NewSingleCallback(this, &TCPConnector::CleanUpOrphans));

if (error) {
OLA_WARN << "connect() to " << connection->ip_address << " returned: "
<< strerror(error);
Expand All @@ -186,16 +197,6 @@ void TCPConnector::SocketWritable(PendingTCPConnection *connection) {
connection->callback->Run(connection->WriteDescriptor(), 0);
#endif
}

ConnectionSet::iterator iter = m_connections.find(connection);
if (iter != m_connections.end())
m_connections.erase(iter);

// we're already within the PendingTCPConnection's call stack here
// schedule the deletion to run later
m_orphaned_connections.push_back(connection);
m_pending_callbacks++;
m_ss->Execute(ola::NewSingleCallback(this, &TCPConnector::CleanUpOrphans));
}

/**
Expand Down
4 changes: 3 additions & 1 deletion common/protocol/Ola.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ enum PluginIds {
OLA_PLUGIN_MILINST = 18;
OLA_PLUGIN_RENARD = 19;
OLA_PLUGIN_UARTDMX = 20;
OLA_PLUGIN_OPENPIXELCONTROL = 21;
OLA_PLUGIN_GPIO = 22;

/*
* To obtain a new plugin ID, open a ticket at
* https://github.com/OpenLightingProject/ola/issues/new
* Plugin IDs are usually assigned just prior to merging the code into the
* mainline. For development of plugins please use the value of
* mainline. For development of plugins please use the value of
* OLA_PLUGIN_EXPERIMENTAL in a plugin ID you define above.
*/
OLA_PLUGIN_EXPERIMENTAL = 10000;
Expand Down
2 changes: 2 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ PLUGIN_SUPPORT(dummy, USE_DUMMY)
PLUGIN_SUPPORT(e131, USE_E131)
PLUGIN_SUPPORT(espnet, USE_ESPNET)
PLUGIN_SUPPORT(ftdidmx, USE_FTDI, [$have_libftdi])
PLUGIN_SUPPORT(gpio, USE_GPIO)
PLUGIN_SUPPORT(karate, USE_KARATE)
PLUGIN_SUPPORT(kinet, USE_KINET)
PLUGIN_SUPPORT(milinst, USE_MILINST)
Expand All @@ -659,6 +660,7 @@ PLUGIN_SUPPORT(sandnet, USE_SANDNET)
PLUGIN_SUPPORT(shownet, USE_SHOWNET)
PLUGIN_SUPPORT(spi, USE_SPI, [$have_spi])
PLUGIN_SUPPORT(stageprofi, USE_STAGEPROFI)
PLUGIN_SUPPORT(openpixelcontrol, USE_OPENPIXELCONTROL)
PLUGIN_SUPPORT(uartdmx, USE_UART, [$have_uart])
PLUGIN_SUPPORT(usbpro, USE_USBPRO)
PLUGIN_SUPPORT(usbdmx, USE_LIBUSB, [$have_libusb])
Expand Down
6 changes: 6 additions & 0 deletions doxygen/namespaces.dox
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
* @namespace ola::plugin::ftdidmx
* @brief Code for FTDI devices.
*
* @namespace ola::plugin::gpio
* @brief The General Purpose digital I/O Plugin.
*
* @namespace ola::plugin::karate
* @brief Code for Karate devices.
*
Expand All @@ -62,6 +65,9 @@
* @namespace ola::plugin::opendmx
* @brief Code for the Enttec OpenDMX.
*
* @namespace ola::plugin::openpixelcontrol
* @brief The Open Pixel Control (OPC) plugin.
*
* @namespace ola::plugin::osc
* @brief Code for the OSC protocol.
*
Expand Down
6 changes: 6 additions & 0 deletions include/ola/io/IOQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ class IOQueue: public InputBufferInterface,
// Append a MemoryBlock to this IOQueue. Ownership of the block is taken.
void AppendBlock(class MemoryBlock *block);

/**
* @brief Move the contents of one IOQueue to another.
* @param other The IOQueue with the data to append to this IOQueue.
*/
void AppendMove(IOQueue *other);

void Clear();

// purge the underlying memory pool
Expand Down
1 change: 1 addition & 0 deletions include/ola/io/Makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ olaioinclude_HEADERS = \
include/ola/io/MemoryBlock.h \
include/ola/io/MemoryBlockPool.h \
include/ola/io/MemoryBuffer.h \
include/ola/io/NonBlockingSender.h \
include/ola/io/OutputBuffer.h \
include/ola/io/OutputStream.h \
include/ola/io/SelectServer.h \
Expand Down
129 changes: 129 additions & 0 deletions include/ola/io/NonBlockingSender.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* NonBlockingSender.h
* Copyright (C) 2013 Simon Newton
*
* Explaination:
* If we just write IOStacks directly to ConnectedDescriptors, we may not be
* able to write the entire message. This can happen if the remote end is slow
* to ack and data builds up in the kernel socket buffer.
*
* This class abstracts the caller from having to deal with this situation. At
* construction time we specify the maximum number of message bytes we want to
* buffer. Once the buffer reaches this size subsequent calls to SendMessage
* will return false.
*/

#ifndef INCLUDE_OLA_IO_NONBLOCKINGSENDER_H_
#define INCLUDE_OLA_IO_NONBLOCKINGSENDER_H_

#include <ola/io/Descriptor.h>
#include <ola/io/IOQueue.h>
#include <ola/io/MemoryBlockPool.h>
#include <ola/io/OutputBuffer.h>
#include <ola/io/SelectServerInterface.h>

namespace ola {
namespace io {

/**
* @brief Write data to ConnectedDescriptors without blocking or losing data
*
* A NonBlockingSender handles writing data from IOStacks or IOQueues to a
* ConnectedDescriptor. On calling SendMessage() the data from the stack or
* queue is 0-copied to an internal buffer and then as much as possible is
* written to the ConnectedDescriptor using scatter/gather I/O calls (if
* available). If there is more data than fits in the descriptor's socket
* buffer, the remaining data is held in the internal buffer.
*
* The internal buffer has a limit on the size. Once the limit is
* exceeded, calls to SendMessage() will return false. The limit is a soft
* limit however, a call to SendMessage() may cause the buffer to exceed the
* internal limit, provided the limit has not already been reached.
*/
class NonBlockingSender {
public:
/**
* @brief Create a new NonBlockingSender.
* @param descriptor the ConnectedDescriptor to send on, ownership is not
* transferred.
* @param ss the SelectServer to use to register for on-write events.
* @param memory_pool the pool to return MemoryBlocks to
* @param max_buffer_size the maximum amount of data to buffer. Note that
* because the underlying MemoryBlocks may be partially used, this does not
* reflect the actual amount of memory used (in pathological cases we may
* allocate up to max_buffer_size * memory_block_size bytes.
*/
NonBlockingSender(ola::io::ConnectedDescriptor *descriptor,
ola::io::SelectServerInterface *ss,
ola::io::MemoryBlockPool *memory_pool,
unsigned int max_buffer_size = DEFAULT_MAX_BUFFER_SIZE);

/**
* @brief Destructor
*/
~NonBlockingSender();

/**
* @brief Check if the limit for the internal buffer has been reached.
* @return true if the limit has been reached, false otherwise.
*/
bool LimitReached() const;

/**
* @brief Send the contents of an IOStack on the ConnectedDescriptor.
* @param stack the IOStack to send. All data in this stack will be sent and
* the stack will be emptied.
* @returns true if the contents of the stack were buffered for transmit,
* false the limit for this NonBlockingSender had already been reached.
*/
bool SendMessage(class IOStack *stack);

/**
* @brief Send the contents of an IOQueue on the ConnectedDescriptor.
* @param queue the IOQueue to send. All data in this queue will be sent and
* the queue will be emptied.
* @returns true if the contents of the stack were buffered for transmit,
* false the limit for this NonBlockingSender had already been reached.
*/
bool SendMessage(IOQueue *queue);

/**
* @brief The default max internal buffer size.
*
* Once this limit has been reached, calls to SendMessage() will return
* false.
*
* 1k is probably enough for userspace. The Linux kernel default is 4k,
* tunable via /proc/sys/net/core/wmem_{max,default}.
*/
static const unsigned int DEFAULT_MAX_BUFFER_SIZE;

private:
ola::io::ConnectedDescriptor *m_descriptor;
ola::io::SelectServerInterface *m_ss;
ola::io::IOQueue m_output_buffer;
bool m_associated;
unsigned int m_max_buffer_size;

void PerformWrite();
void AssociateIfRequired();

DISALLOW_COPY_AND_ASSIGN(NonBlockingSender);
};
} // namespace io
} // namespace ola
#endif // INCLUDE_OLA_IO_NONBLOCKINGSENDER_H_
2 changes: 1 addition & 1 deletion include/ola/network/AdvancedTCPConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace network {
* the connection fails it will retry according to a given BackOffPolicy.
*
* Limitations:
* - This class only supports a single connection per IP:%Port.
* - This class only supports a single connection per IP:Port.
* - This class should work fine for a small number of TCP connections (100 or
* so). It'll need to be re-written if we want to support 1000s.
*/
Expand Down
Loading

0 comments on commit 7a72ab7

Please sign in to comment.