Skip to content

Commit

Permalink
Control SD sync/async behaviour with env var on QNX
Browse files Browse the repository at this point in the history
This is a change intended for systems like QNX which do not generally
have a service like netlink monitoring the network status that can
easily be tied into.  This is not setup to be used/enabled on Linux/Android.

SD will use the new asynchronous behaviour if the env var
VSOMEIP_USE_ASYNCHRONOUS_SD is set.  If not set, SD will use a
synchronous behaviour and only wait for the interface if the env var
VSOMEIP_WAIT_FOR_INTERFACE exists.  The latter (without waiting) is very
close to the upstream behaviour, the difference being that a callback is
still sent to sd::start() from routing_manager_impl, rather than
executing that code directly in routing_manager_impl.

Notes:
- mutexes
  - sd_impl::endpoint_ is now mutex protected
  - rm_impl::pending_sd_offers_mutex_ is a recursive mutex, as now it
    can be called in its own thread and the new thread in SD
- There is no timeout on the waitfor.  The original implementation had a
  configurable timeout, however because timing out left us in an error
  state anyways, this timeout was removed (raised to
  numeric_limits<int>::max() = ~45 days, give or take.)
  • Loading branch information
kheaactua committed May 28, 2024
1 parent 6c0e9db commit 7522cf0
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 34 deletions.
19 changes: 19 additions & 0 deletions implementation/configuration/include/internal.hpp.in
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@

#define VSOMEIP_ROUTING_HOST_PORT_DEFAULT 31490

//
// Defines related to running SD asynchronously - this is not available upstream

// Env var that if exists will cause SD setup to be performed asynchronously and
// wait on network availability. This wait also impacts the routing_manager to
// also wait until a network interface is available before issuring OFFERs
#define VSOMEIP_ENV_USE_ASYNCHRONOUS_SD "VSOMEIP_USE_ASYNCHRONOUS_SD"

// Iff SD is running synchronously, the existance of this env var will cause the
// SD setup to still block on network availability. (mostly a testing scenario)
#define VSOMEIP_ENV_WAIT_FOR_INTERFACE "VSOMEIP_WAIT_FOR_INTERFACE"

// The current waiting mechanism is to block until a file (specified by this
// define)
#define VSOMEIP_NETWORK_INT_READY_FILE "@VSOMEIP_NETWORK_INT_READY_FILE@"

// /end of async change
//

#ifdef _WIN32
#define VSOMEIP_CFG_LIBRARY "vsomeip3-cfg.dll"
#else
Expand Down
2 changes: 1 addition & 1 deletion implementation/routing/include/routing_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ class routing_manager_impl: public routing_manager_base,
bool if_state_running_;
bool sd_route_set_;
bool routing_running_;
std::mutex pending_sd_offers_mutex_;
std::recursive_mutex pending_sd_offers_mutex_;
std::vector<std::pair<service_t, instance_t>> pending_sd_offers_;
#if defined(__linux__) || defined(ANDROID)
std::shared_ptr<netlink_connector> netlink_connector_;
Expand Down
40 changes: 22 additions & 18 deletions implementation/routing/src/routing_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ void routing_manager_impl::start() {
netlink_connector_->start();
#else
{
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
std::lock_guard its_lock(pending_sd_offers_mutex_);
start_ip_routing();
}
#endif
Expand Down Expand Up @@ -448,7 +448,7 @@ bool routing_manager_impl::offer_service(client_t _client,
}

{
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
std::lock_guard its_lock(pending_sd_offers_mutex_);
if (if_state_running_) {
init_service_info(_service, _instance, true);
} else {
Expand Down Expand Up @@ -542,7 +542,7 @@ void routing_manager_impl::stop_offer_service(client_t _client,
}
if (is_local) {
{
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
std::lock_guard its_lock(pending_sd_offers_mutex_);
for (auto it = pending_sd_offers_.begin(); it != pending_sd_offers_.end(); ) {
if (it->first == _service && it->second == _instance) {
it = pending_sd_offers_.erase(it);
Expand Down Expand Up @@ -3881,7 +3881,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
}

// start processing of SD messages (incoming remote offers should lead to new subscribe messages)
discovery_->start();
discovery_->start([]() { });

// Trigger initial offer phase for relevant services
for (const auto &its_service : get_offered_services()) {
Expand Down Expand Up @@ -3950,7 +3950,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {

void routing_manager_impl::on_net_interface_or_route_state_changed(
bool _is_interface, const std::string &_if, bool _available) {
std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
std::lock_guard its_lock(pending_sd_offers_mutex_);
auto log_change_message = [&_if, _available, _is_interface](bool _warning) {
std::stringstream ss;
ss << (_is_interface ? "Network interface" : "Route") << " \"" << _if
Expand Down Expand Up @@ -4000,26 +4000,30 @@ void routing_manager_impl::on_net_interface_or_route_state_changed(
}

void routing_manager_impl::start_ip_routing() {
#if defined(_WIN32) || defined(__QNX__)
#if defined(_WIN32)
if_state_running_ = true;
#endif

if (routing_ready_handler_) {
routing_ready_handler_();
}
auto on_routing_started = [this]() -> void
{
std::lock_guard its_lock(pending_sd_offers_mutex_);

if_state_running_ = true;
for (auto const& its_service : pending_sd_offers_) {
init_service_info(its_service.first, its_service.second, true);
}
pending_sd_offers_.clear();

routing_running_ = true;
VSOMEIP_INFO << VSOMEIP_ROUTING_READY_MESSAGE;
};

if (discovery_) {
discovery_->start();
discovery_->start(on_routing_started);
} else {
init_routing_info();
on_routing_started();
}

for (auto its_service : pending_sd_offers_) {
init_service_info(its_service.first, its_service.second, true);
}
pending_sd_offers_.clear();

routing_running_ = true;
VSOMEIP_INFO << VSOMEIP_ROUTING_READY_MESSAGE;
}

void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class service_discovery {
virtual boost::asio::io_context &get_io() = 0;

virtual void init() = 0;
virtual void start() = 0;
virtual void start(std::function<void(void)> on_routing_started) = 0;
virtual void stop() = 0;

virtual void request_service(service_t _service, instance_t _instance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <set>
#include <forward_list>
#include <atomic>
#include <thread>
#include <tuple>

#include <boost/asio/steady_timer.hpp>
Expand Down Expand Up @@ -67,7 +68,12 @@ class service_discovery_impl: public service_discovery,
std::recursive_mutex& get_subscribed_mutex();

void init();
void start();
void start(std::function<void(void)> on_routing_started);

// Function that'll do the startup work of SD, either in a new thread or
// in the current thread depending on the user's choice
void do_start_sd(std::function<void(void)> on_complete, bool wait_for_if);

void stop();

void request_service(service_t _service, instance_t _instance,
Expand Down Expand Up @@ -367,7 +373,9 @@ class service_discovery_impl: public service_discovery,
boost::asio::ip::address unicast_;
uint16_t port_;
bool reliable_;
std::mutex endpoint_mutex_;
std::shared_ptr<endpoint> endpoint_;
std::thread endpoint_getter_thread_;

std::shared_ptr<serializer> serializer_;
std::shared_ptr<deserializer> deserializer_;
Expand Down
125 changes: 112 additions & 13 deletions implementation/service_discovery/src/service_discovery_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,23 @@

#include <vsomeip/constants.hpp>

#if defined(__linux__) || defined(ANDROID) || defined(__QNX__)
#include <pthread.h>
#endif

#include <chrono>
#include <iomanip>
#include <forward_list>
#include <random>
#include <thread>

#include <chrono>

#ifdef __QNX__
#include <string_view>
#include <libgen.h>
#endif

#include <vsomeip/internal/logger.hpp>

#include "../include/constants.hpp"
Expand Down Expand Up @@ -71,7 +82,7 @@ service_discovery_impl::service_discovery_impl(
find_debounce_time_(VSOMEIP_SD_DEFAULT_FIND_DEBOUNCE_TIME),
find_debounce_timer_(_host->get_io()),
main_phase_timer_(_host->get_io()),
is_suspended_(false),
is_suspended_(true), // Start suspended: this is different than upstream as we start before a network interface is available
is_diagnosis_(false),
last_msg_received_timer_(_host->get_io()),
last_msg_received_timer_timeout_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY +
Expand All @@ -80,8 +91,7 @@ service_discovery_impl::service_discovery_impl(
next_subscription_expiration_ = std::chrono::steady_clock::now() + std::chrono::hours(24);
}

service_discovery_impl::~service_discovery_impl() {
}
service_discovery_impl::~service_discovery_impl() = default;

boost::asio::io_context &service_discovery_impl::get_io() {
return io_;
Expand Down Expand Up @@ -158,16 +168,63 @@ service_discovery_impl::init() {
+ (cyclic_offer_delay_ / 10);
}

void
service_discovery_impl::start() {
auto wait_for_interface() -> bool {
#ifdef __QNX__
static std::string_view constexpr path = VSOMEIP_NETWORK_INT_READY_FILE;
if (path.empty()) {
VSOMEIP_ERROR << "No network interface signal path defined, service discovery will effectively be disabled.";
return false;
}
// Indefinite delay. If the condition we're waiting for doesn't occur then
// we are in an error state and thus should not continue.
static auto constexpr delay_ms = std::numeric_limits<int>::max();
static int constexpr poll_ms = 50;

auto const start = std::chrono::steady_clock::now();
VSOMEIP_DEBUG << "Waiting (blocking) indefinitely on network interface (signal=" << path << ")";
auto const r = waitfor(path.data(), delay_ms, poll_ms);
auto const end = std::chrono::steady_clock::now();
auto diff = end - start;
if (0 == r)
{
VSOMEIP_DEBUG << "Waited (blocked) for network interface (signal=" << path << ") for " << std::chrono::duration_cast<std::chrono::milliseconds>(diff).count() << " ms.";
return true;
} else {
VSOMEIP_ERROR << "Timedout waiting for network interface (signal=" << path << ") after " << std::chrono::duration_cast<std::chrono::milliseconds>(diff).count() << " ms: errno=" << errno << ", msg=" << strerror(errno);
return false;
}
#else
// Omitting a Linux implementation.
//
// Currently the Linux implementations use netlink in routing_manager_impl to
// receive network events, the QNX implementation could be modified to use pps
// (if available) to mirror this functionality.
//
// If however an implementation on linux wanted to use a pattern similar to
// this, inotify would likely be the best candidate to monitor the filesystem
// for this file.
return true;
#endif
}

void service_discovery_impl::do_start_sd(std::function<void(void)> on_complete, bool wait_for_if)
{
bool wait_for_result = false;
if (wait_for_if)
{
wait_for_result = wait_for_interface();
}

std::lock_guard<std::mutex> its_lock(endpoint_mutex_);
if (!endpoint_) {
endpoint_ = host_->create_service_discovery_endpoint(
sd_multicast_, port_, reliable_);
if (!endpoint_) {
VSOMEIP_ERROR << "Couldn't start service discovery";
VSOMEIP_ERROR << "Couldn't start service discovery" << (wait_for_result ? "" : " - likely due to timeing out waiting for a network interface");
return;
}
}

{
std::lock_guard<std::mutex> its_lock(sessions_received_mutex_);
sessions_received_.clear();
Expand All @@ -185,20 +242,61 @@ service_discovery_impl::start() {
i.second->set_sent_counter(0);
}
}

// rejoin multicast group
if (endpoint_ && !reliable_) {
auto its_server_endpoint
= std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint_);
if (its_server_endpoint)
its_server_endpoint->join(sd_multicast_);
// rejoin multicast group
dynamic_cast<udp_server_endpoint_impl*>(
endpoint_.get())->join(sd_multicast_);
}
}
is_suspended_ = false;
start_main_phase_timer();
start_offer_debounce_timer(true);
start_find_debounce_timer(true);
start_ttl_timer();

on_complete();
}

void
service_discovery_impl::start(std::function<void(void)> on_routing_started) {
#if defined(__QNX__)
auto* const use_async_sd = getenv(VSOMEIP_ENV_USE_ASYNCHRONOUS_SD);
#else
// Linux/Android uses netlink, so there's less need for this
// asynchroneous service discovery
const char* use_async_sd = nullptr;
#endif

if (use_async_sd)
{
// Perform the SD setup in a new thread, and use a wait in that block
// to wait for the network to be available
VSOMEIP_DEBUG << "Starting service discovery using separate thread";
endpoint_getter_thread_ = std::thread(&service_discovery_impl::do_start_sd, this, on_routing_started, true);
endpoint_getter_thread_.detach();
#if defined(__linux__) || defined(ANDROID) || defined(__QNX__)
{
auto err = pthread_setname_np(endpoint_getter_thread_.native_handle(), "sd_start");
if (err) {
VSOMEIP_ERROR << "Could not rename SD thread: " << errno << ":" << strerror(errno);
}
}
#endif
} else {
// Perform the SD setup in the current thread (synchronously), and
// if VSOMEIP_ENV_WAIT_FOR_INTERFACE exists use a wait in that block
// to wait for the network to be available. Note that
// VSOMEIP_ENV_WAIT_FOR_INTERFACE mostly exists for performance
// testing.

#if defined(__QNX__)
auto* const wait_for_interface_env = getenv(VSOMEIP_ENV_WAIT_FOR_INTERFACE);
#else
const char* wait_for_interface_env = nullptr;
#endif
VSOMEIP_DEBUG << "Starting service discovery using main thread" << (wait_for_interface_env ? ", will block for network interface" : ", will assume network interface pre-exists");
do_start_sd(on_routing_started, wait_for_interface_env != nullptr);
}
}

void
Expand Down Expand Up @@ -3508,6 +3606,7 @@ service_discovery_impl::on_last_msg_received_timer_expired(
std::dec << last_msg_received_timer_timeout_.count() << "ms.";

// Rejoin multicast group
std::lock_guard<std::mutex> its_lock(endpoint_mutex_);
if (endpoint_ && !reliable_) {
auto its_server_endpoint
= std::dynamic_pointer_cast<udp_server_endpoint_impl>(endpoint_);
Expand All @@ -3518,7 +3617,7 @@ service_discovery_impl::on_last_msg_received_timer_expired(
}
{
boost::system::error_code ec;
std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_);
std::lock_guard<std::mutex> its_lock_inner(last_msg_received_timer_mutex_);
last_msg_received_timer_.expires_from_now(last_msg_received_timer_timeout_, ec);
last_msg_received_timer_.async_wait(
std::bind(
Expand Down

0 comments on commit 7522cf0

Please sign in to comment.