Skip to content

Commit

Permalink
CPublisher::Config used in all writers
Browse files Browse the repository at this point in the history
xxx_udp_mc.yy -> xxx_udp.yy
  • Loading branch information
rex-schilasky committed Apr 29, 2024
1 parent 0ee1477 commit be6cf11
Show file tree
Hide file tree
Showing 21 changed files with 99 additions and 112 deletions.
8 changes: 4 additions & 4 deletions ecal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ if(ECAL_CORE_PUBLISHER)
)
if(ECAL_CORE_TRANSPORT_UDP)
list(APPEND ecal_writer_src
src/readwrite/udp/ecal_writer_udp_mc.cpp
src/readwrite/udp/ecal_writer_udp_mc.h
src/readwrite/udp/ecal_writer_udp.cpp
src/readwrite/udp/ecal_writer_udp.h
)
endif()
if(ECAL_CORE_TRANSPORT_TCP)
Expand All @@ -249,8 +249,8 @@ if(ECAL_CORE_SUBSCRIBER)
)
if(ECAL_CORE_TRANSPORT_UDP)
list(APPEND ecal_reader_src
src/readwrite/udp/ecal_reader_udp_mc.cpp
src/readwrite/udp/ecal_reader_udp_mc.h
src/readwrite/udp/ecal_reader_udp.cpp
src/readwrite/udp/ecal_reader_udp.h
)
endif()
if(ECAL_CORE_TRANSPORT_TCP)
Expand Down
19 changes: 11 additions & 8 deletions ecal/core/include/ecal/ecal_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,28 @@ namespace eCAL
{
public:

ECAL_API static constexpr long long DEFAULT_TIME_ARGUMENT = -1; /*!< Use DEFAULT_TIME_ARGUMENT in the `Send()` function to let eCAL determine the send timestamp */
ECAL_API static constexpr long long DEFAULT_TIME_ARGUMENT = -1; /*!< Use DEFAULT_TIME_ARGUMENT in the `Send()` function to let eCAL determine the send timestamp */

struct ECAL_API SHMConfig
{
TLayer::eSendMode send_mode = TLayer::smode_auto; //!< shm layer send mode (default auto)
bool zero_copy_mode = false; //!< enable zero copy shared memory transport mode
long long acknowledge_timeout_ms = 0; /*!< force connected subscribers to send acknowledge event after processing the message
the publisher send call is blocked on this event with this timeout (0 == no handshake) */
long buffer_count = 1; //!< maximum number of used buffers (needs to be greater than 1, default = 1)
TLayer::eSendMode send_mode = TLayer::smode_auto; //!< shm layer send mode (default auto)
bool zero_copy_mode = false; //!< enable zero copy shared memory transport mode
int acknowledge_timeout_ms = 0; /*!< force connected subscribers to send acknowledge event after processing the message
the publisher send call is blocked on this event with this timeout (0 == no handshake) */
size_t memfile_min_size_bytes = 4096; //!< default memory file size for new publisher
size_t memfile_reserve_percent = 50; //!< dynamic file size reserve before recreating memory file if topic size changes
size_t memfile_buffer_count = 1; //!< maximum number of used buffers (needs to be greater than 1, default = 1)
};

struct ECAL_API UDPConfig
{
TLayer::eSendMode send_mode = TLayer::smode_auto; //!< udp layer send mode (default auto)
TLayer::eSendMode send_mode = TLayer::smode_auto; //!< udp layer send mode (default auto)
int sndbuf_size_bytes = (5*1024*1024); //!< udp send buffer size in bytes (default 5MB)
};

struct ECAL_API TCPConfig
{
TLayer::eSendMode send_mode = TLayer::smode_off; //!< tcp layer send mode (default off)
TLayer::eSendMode send_mode = TLayer::smode_off; //!< tcp layer send mode (default off)
};

struct ECAL_API Config
Expand Down
1 change: 1 addition & 0 deletions ecal/core/src/ecal_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* ========================= eCAL LICENSE =================================
*/

#include "ecal_def.h"
#include "ecal_globals.h"
#include "ecal_event.h"
#include "registration/ecal_registration_receiver.h"
Expand Down
16 changes: 10 additions & 6 deletions ecal/core/src/pubsub/ecal_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,20 @@ namespace eCAL
share_topic_description(eCAL::Config::IsTopicDescriptionSharingEnabled())
{
// shm config
shm.send_mode = eCAL::Config::GetPublisherShmMode();
shm.buffer_count = eCAL::Config::GetMemfileBufferCount();
shm.zero_copy_mode = eCAL::Config::IsMemfileZerocopyEnabled();
shm.acknowledge_timeout_ms = eCAL::Config::GetMemfileAckTimeoutMs();
shm.send_mode = eCAL::Config::GetPublisherShmMode();
shm.zero_copy_mode = eCAL::Config::IsMemfileZerocopyEnabled();
shm.acknowledge_timeout_ms = eCAL::Config::GetMemfileAckTimeoutMs();

shm.memfile_min_size_bytes = eCAL::Config::GetMemfileMinsizeBytes();
shm.memfile_reserve_percent = eCAL::Config::GetMemfileOverprovisioningPercentage();
shm.memfile_buffer_count = eCAL::Config::GetMemfileBufferCount();

// udp config
udp.send_mode = eCAL::Config::GetPublisherUdpMulticastMode();
udp.send_mode = eCAL::Config::GetPublisherUdpMulticastMode();
udp.sndbuf_size_bytes = eCAL::Config::GetUdpMulticastSndBufSizeBytes();

// tcp config
tcp.send_mode = eCAL::Config::GetPublisherTcpMode();
tcp.send_mode = eCAL::Config::GetPublisherTcpMode();
}

CPublisher::CPublisher() :
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include "ecal_reader_layer.h"

#if ECAL_CORE_TRANSPORT_UDP
#include "udp/ecal_reader_udp_mc.h"
#include "udp/ecal_reader_udp.h"
#endif

#if ECAL_CORE_TRANSPORT_SHM
Expand Down
74 changes: 27 additions & 47 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include <string>
#include <utility>

#include "ecal_def.h"
#include "config/ecal_config_reader_hlp.h"

#if ECAL_CORE_REGISTRATION
Expand Down Expand Up @@ -76,21 +75,27 @@ namespace eCAL
m_topic_name(topic_name_),
m_topic_info(topic_info_),
m_topic_size(0),
m_buffering_shm(PUB_MEMFILE_BUF_COUNT),
m_zero_copy(PUB_MEMFILE_ZERO_COPY),
m_acknowledge_timeout_ms(PUB_MEMFILE_ACK_TO),
m_config(config_),
m_connected(false),
m_id(0),
m_clock(0),
m_frequency_calculator(0.0f),
m_loc_subscribed(false),
m_ext_subscribed(false),
m_share_ttype(true),
m_share_tdesc(true),
m_created(false)
{
// configure
Configure(config_);
// shm config
#if ECAL_CORE_TRANSPORT_SHM
m_writer.shm_mode.requested = config_.shm.send_mode;
#endif
// udp config
#if ECAL_CORE_TRANSPORT_UDP
m_writer.udp_mode.requested = config_.udp.send_mode;
#endif
// tcp config
#if ECAL_CORE_TRANSPORT_TCP
m_writer.tcp_mode.requested = config_.tcp.send_mode;
#endif

// build topic id
std::stringstream counter;
Expand Down Expand Up @@ -165,31 +170,6 @@ namespace eCAL
Unregister();
}

void CDataWriter::Configure(const CPublisher::Config& config_)
{
// shm config
#if ECAL_CORE_TRANSPORT_SHM
m_writer.shm_mode.requested = config_.shm.send_mode;
m_buffering_shm = config_.shm.buffer_count;
m_zero_copy = config_.shm.zero_copy_mode;
m_acknowledge_timeout_ms = config_.shm.acknowledge_timeout_ms;
#endif

// udp config
#if ECAL_CORE_TRANSPORT_UDP
m_writer.udp_mode.requested = config_.udp.send_mode;
#endif

// tcp config
#if ECAL_CORE_TRANSPORT_TCP
m_writer.tcp_mode.requested = config_.tcp.send_mode;
#endif

// allow to share topic type/description
m_share_ttype = config_.share_topic_type;
m_share_tdesc = config_.share_topic_description;
}

bool CDataWriter::SetDataTypeInformation(const SDataTypeInformation& topic_info_)
{
// Does it even make sense to register if the info is the same???
Expand Down Expand Up @@ -297,7 +277,7 @@ namespace eCAL

// can we do a zero copy write ?
const bool allow_zero_copy =
m_zero_copy // zero copy mode activated by user
m_config.shm.zero_copy_mode // zero copy mode activated by user
&& m_writer.shm_mode.activated // shm layer active
&& !m_writer.udp_mode.activated
&& !m_writer.tcp_mode.activated;
Expand Down Expand Up @@ -336,9 +316,9 @@ namespace eCAL
wattr.clock = m_clock;
wattr.hash = snd_hash;
wattr.time = time_;
wattr.buffering = m_buffering_shm;
wattr.zero_copy = m_zero_copy;
wattr.acknowledge_timeout_ms = m_acknowledge_timeout_ms;
wattr.buffering = m_config.shm.memfile_buffer_count;
wattr.zero_copy = m_config.shm.zero_copy_mode;
wattr.acknowledge_timeout_ms = m_config.shm.acknowledge_timeout_ms;

// prepare send
if (m_writer.shm->PrepareWrite(wattr))
Expand Down Expand Up @@ -663,12 +643,12 @@ namespace eCAL
// topic_information
{
auto& ecal_reg_sample_tdatatype = ecal_reg_sample_topic.tdatatype;
if (m_share_ttype)
if (m_config.share_topic_type)
{
ecal_reg_sample_tdatatype.encoding = m_topic_info.encoding;
ecal_reg_sample_tdatatype.name = m_topic_info.name;
}
if (m_share_tdesc)
if (m_config.share_topic_description)
{
ecal_reg_sample_tdatatype.descriptor = m_topic_info.descriptor;
}
Expand Down Expand Up @@ -847,7 +827,7 @@ namespace eCAL
{
case TLayer::eSendMode::smode_auto:
case TLayer::eSendMode::smode_on:
m_writer.udp = std::make_unique<CDataWriterUdpMC>(m_host_name, m_topic_name, m_topic_id);
m_writer.udp = std::make_unique<CDataWriterUdpMC>(m_host_name, m_topic_name, m_topic_id, m_config.udp);
#ifndef NDEBUG
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::Create::UDP_MC_WRITER");
#endif
Expand All @@ -873,8 +853,8 @@ namespace eCAL
{
case TLayer::eSendMode::smode_auto:
case TLayer::eSendMode::smode_on:
m_writer.shm = std::make_unique<CDataWriterSHM>(m_host_name, m_topic_name, m_topic_id);
m_writer.shm->SetBufferCount(m_buffering_shm);
m_writer.shm = std::make_unique<CDataWriterSHM>(m_host_name, m_topic_name, m_topic_id, m_config.shm);
m_writer.shm->SetBufferCount(m_config.shm.memfile_buffer_count);
#ifndef NDEBUG
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::Create::SHM_WRITER");
#endif
Expand All @@ -900,7 +880,7 @@ namespace eCAL
{
case TLayer::eSendMode::smode_auto:
case TLayer::eSendMode::smode_on:
m_writer.tcp = std::make_unique<CDataWriterTCP>(m_host_name, m_topic_name, m_topic_id);
m_writer.tcp = std::make_unique<CDataWriterTCP>(m_host_name, m_topic_name, m_topic_id, m_config.tcp);
#ifndef NDEBUG
Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::Create::TCP_WRITER - SUCCESS");
#endif
Expand All @@ -919,15 +899,15 @@ namespace eCAL
{
// if nothing is activated, we use defaults shm = auto, udp = auto
if ((m_writer.udp_mode.requested == TLayer::smode_off)
&& (m_writer.shm_mode.requested == TLayer::smode_off)
&& (m_writer.tcp_mode.requested == TLayer::smode_off)
&& (m_writer.shm_mode.requested == TLayer::smode_off)
&& (m_writer.tcp_mode.requested == TLayer::smode_off)
)
{
#if ECAL_CORE_TRANSPORT_UDP
m_writer.udp_mode.requested = TLayer::smode_auto;
#endif
#if ECAL_CORE_TRANSPORT_SHM
m_writer.shm_mode.requested = TLayer::smode_auto;
m_writer.shm_mode.requested = TLayer::smode_auto;
#endif
}

Expand Down Expand Up @@ -970,7 +950,7 @@ namespace eCAL
}
}

if ( (m_writer.tcp_mode.requested == TLayer::smode_auto)
if ( (m_writer.tcp_mode.requested == TLayer::smode_auto)
&& (m_writer.udp_mode.requested == TLayer::smode_auto)
)
{
Expand Down
12 changes: 2 additions & 10 deletions ecal/core/src/readwrite/ecal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@
#include <ecal/ecal_types.h>
#include <tuple>

#include "ecal_def.h"
#include "util/ecal_expmap.h"
#include <util/frequency_calculator.h>

#if ECAL_CORE_TRANSPORT_UDP
#include "udp/ecal_writer_udp_mc.h"
#include "udp/ecal_writer_udp.h"
#endif

#if ECAL_CORE_TRANSPORT_SHM
Expand Down Expand Up @@ -123,8 +122,6 @@ namespace eCAL
const SDataTypeInformation& GetDataTypeInformation() const { return m_topic_info; }

protected:
void Configure(const CPublisher::Config& config_);

bool Register(bool force_);
bool Unregister();

Expand All @@ -151,10 +148,7 @@ namespace eCAL
SDataTypeInformation m_topic_info;
std::map<std::string, std::string> m_attr;
size_t m_topic_size;

size_t m_buffering_shm;
bool m_zero_copy;
long long m_acknowledge_timeout_ms;
CPublisher::Config m_config;

std::vector<char> m_payload_buffer;

Expand Down Expand Up @@ -204,8 +198,6 @@ namespace eCAL
};
SWriter m_writer;

bool m_share_ttype;
bool m_share_tdesc;
std::atomic<bool> m_created;
};
}
27 changes: 14 additions & 13 deletions ecal/core/src/readwrite/shm/ecal_writer_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
**/

#include <ecal/ecal.h>
#include <ecal/ecal_config.h>
#include <ecal/ecal_log.h>
#include <string>

Expand All @@ -33,18 +32,13 @@ namespace eCAL
{
const std::string CDataWriterSHM::m_memfile_base_name = "ecal_";

CDataWriterSHM::CDataWriterSHM(const std::string& host_name_, const std::string& topic_name_, const std::string& topic_id_)
CDataWriterSHM::CDataWriterSHM(const std::string& host_name_, const std::string& topic_name_, const std::string& topic_id_, const CPublisher::SHMConfig& shm_config_) :
m_config(shm_config_)
{
m_topic_name = topic_name_;

// set attributes
m_memory_file_attr.min_size = Config::GetMemfileMinsizeBytes();
m_memory_file_attr.reserve = Config::GetMemfileOverprovisioningPercentage();
m_memory_file_attr.timeout_open_ms = PUB_MEMFILE_OPEN_TO;
m_memory_file_attr.timeout_ack_ms = Config::GetMemfileAckTimeoutMs();

// initialize memory file buffer
SetBufferCount(m_buffer_count /*= 1*/);
SetBufferCount(m_config.memfile_buffer_count);
}

SWriterInfo CDataWriterSHM::GetInfo()
Expand Down Expand Up @@ -74,6 +68,13 @@ namespace eCAL
return false;
}

// prepare memfile attributes
SSyncMemoryFileAttr memory_file_attr;
memory_file_attr.min_size = m_config.memfile_min_size_bytes;
memory_file_attr.reserve = m_config.memfile_reserve_percent;
memory_file_attr.timeout_open_ms = PUB_MEMFILE_OPEN_TO;
memory_file_attr.timeout_ack_ms = m_config.acknowledge_timeout_ms;

// retrieve the memory file size of existing files
size_t memory_file_size(0);
if (!m_memory_file_vec.empty())
Expand All @@ -82,14 +83,14 @@ namespace eCAL
}
else
{
memory_file_size = m_memory_file_attr.min_size;
memory_file_size = memory_file_attr.min_size;
}

// create memory file vector
m_memory_file_vec.clear();
while (m_memory_file_vec.size() < buffer_count_)
{
auto sync_memfile = std::make_shared<CSyncMemoryFile>(m_memfile_base_name, memory_file_size, m_memory_file_attr);
auto sync_memfile = std::make_shared<CSyncMemoryFile>(m_memfile_base_name, memory_file_size, memory_file_attr);
if (sync_memfile->IsCreated())
{
m_memory_file_vec.push_back(sync_memfile);
Expand All @@ -112,12 +113,12 @@ namespace eCAL
bool ret_state(false);

// adapt number of used memory files if needed
if (attr_.buffering != m_buffer_count)
if (attr_.buffering != m_config.memfile_buffer_count)
{
SetBufferCount(attr_.buffering);

// store new buffer count and flag change
m_buffer_count = attr_.buffering;
m_config.memfile_buffer_count = attr_.buffering;
ret_state |= true;
}

Expand Down
Loading

0 comments on commit be6cf11

Please sign in to comment.