Skip to content

Commit

Permalink
Merge pull request #110 from AO-StreetArt/variableEventSizes
Browse files Browse the repository at this point in the history
check udp event size prior to pulling off the queue,
  • Loading branch information
AO-StreetArt committed Nov 10, 2018
2 parents 1cf718c + d1523b5 commit 0dc5c78
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions src/app/include/event_stream_process.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,31 @@ void event_stream(DeviceCache *cache, AOSSL::TieredApplicationProfile *config) {
config->get_opt(config->get_cluster_name() + \
std::string(".event.security.in.aes.iv"), aesin_iv_buffer);
if (aes_enabled_buffer.val == "true") aes_enabled = true;
// Build a buffer and recieve a message
char recv_buf[EVENT_LENGTH];
boost::asio::mutable_buffers_1 bbuffer = boost::asio::buffer(recv_buf);
boost::asio::ip::udp::endpoint remote_endpoint;
boost::system::error_code error;

// First, we call recieve_from on the socket with a null buffer,
// which returns when a message is on the recieve queue with the number
// of bytes in the message
socket.receive_from(boost::asio::null_buffers(), remote_endpoint, 0, error);
int available = socket.available();

// Build a buffer the size of the available data
char recv_buf[available];
boost::asio::mutable_buffers_1 bbuffer = boost::asio::buffer(recv_buf, available);

// Copy the data from the input queue to the buffer
int bytes_transferred = socket.receive_from(bbuffer, remote_endpoint, 0, error);
char* event_data_ptr = boost::asio::buffer_cast<char*>(bbuffer);
if (!(error && error != boost::asio::error::message_size && bytes_transferred > 0) \
&& is_app_running.load()) {
logger.debug("Recieved UDP Update");

// Copy the message buffer into dynamic memory
char *event_msg = new char[EVENT_LENGTH+1]();
char *event_msg = new char[available+1]();
memcpy(event_msg, event_data_ptr, bytes_transferred);
logger.debug(event_msg);

// Clear out any left-over event sender
if (evt_senders[sender_index]) delete evt_senders[sender_index];
// Build the new event sender
Expand All @@ -172,6 +183,7 @@ void event_stream(DeviceCache *cache, AOSSL::TieredApplicationProfile *config) {
} else {
evt_senders[sender_index] = new EventSender(cache, event_msg, io_service);
}

// Fire off another thread to actually send UDP messages
if (sender_index == 12) {
// If we have used up all the space in our array of senders,
Expand Down

0 comments on commit 0dc5c78

Please sign in to comment.