From 7522cf0c6b65d416b1f7072a2000119874424b89 Mon Sep 17 00:00:00 2001 From: Matthew Russell Date: Mon, 11 Dec 2023 16:01:18 -0500 Subject: [PATCH] Control SD sync/async behaviour with env var on QNX This is a change intended for systems like QNX which do not generally have a service like netlink monitoring the network status that can easily be tied into. This is not setup to be used/enabled on Linux/Android. SD will use the new asynchronous behaviour if the env var VSOMEIP_USE_ASYNCHRONOUS_SD is set. If not set, SD will use a synchronous behaviour and only wait for the interface if the env var VSOMEIP_WAIT_FOR_INTERFACE exists. The latter (without waiting) is very close to the upstream behaviour, the difference being that a callback is still sent to sd::start() from routing_manager_impl, rather than executing that code directly in routing_manager_impl. Notes: - mutexes - sd_impl::endpoint_ is now mutex protected - rm_impl::pending_sd_offers_mutex_ is a recursive mutex, as now it can be called in its own thread and the new thread in SD - There is no timeout on the waitfor. The original implementation had a configurable timeout, however because timing out left us in an error state anyways, this timeout was removed (raised to numeric_limits::max() = ~45 days, give or take.) --- .../configuration/include/internal.hpp.in | 19 +++ .../routing/include/routing_manager_impl.hpp | 2 +- .../routing/src/routing_manager_impl.cpp | 40 +++--- .../include/service_discovery.hpp | 2 +- .../include/service_discovery_impl.hpp | 10 +- .../src/service_discovery_impl.cpp | 125 ++++++++++++++++-- 6 files changed, 164 insertions(+), 34 deletions(-) diff --git a/implementation/configuration/include/internal.hpp.in b/implementation/configuration/include/internal.hpp.in index 1fcc6b41f..0b97df34c 100644 --- a/implementation/configuration/include/internal.hpp.in +++ b/implementation/configuration/include/internal.hpp.in @@ -33,6 +33,25 @@ #define VSOMEIP_ROUTING_HOST_PORT_DEFAULT 31490 +// +// Defines related to running SD asynchronously - this is not available upstream + +// Env var that if exists will cause SD setup to be performed asynchronously and +// wait on network availability. This wait also impacts the routing_manager to +// also wait until a network interface is available before issuring OFFERs +#define VSOMEIP_ENV_USE_ASYNCHRONOUS_SD "VSOMEIP_USE_ASYNCHRONOUS_SD" + +// Iff SD is running synchronously, the existance of this env var will cause the +// SD setup to still block on network availability. (mostly a testing scenario) +#define VSOMEIP_ENV_WAIT_FOR_INTERFACE "VSOMEIP_WAIT_FOR_INTERFACE" + +// The current waiting mechanism is to block until a file (specified by this +// define) +#define VSOMEIP_NETWORK_INT_READY_FILE "@VSOMEIP_NETWORK_INT_READY_FILE@" + +// /end of async change +// + #ifdef _WIN32 #define VSOMEIP_CFG_LIBRARY "vsomeip3-cfg.dll" #else diff --git a/implementation/routing/include/routing_manager_impl.hpp b/implementation/routing/include/routing_manager_impl.hpp index 987f9d3d2..b13315811 100644 --- a/implementation/routing/include/routing_manager_impl.hpp +++ b/implementation/routing/include/routing_manager_impl.hpp @@ -495,7 +495,7 @@ class routing_manager_impl: public routing_manager_base, bool if_state_running_; bool sd_route_set_; bool routing_running_; - std::mutex pending_sd_offers_mutex_; + std::recursive_mutex pending_sd_offers_mutex_; std::vector> pending_sd_offers_; #if defined(__linux__) || defined(ANDROID) std::shared_ptr netlink_connector_; diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index 79f408045..1d1e269a4 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -236,7 +236,7 @@ void routing_manager_impl::start() { netlink_connector_->start(); #else { - std::lock_guard its_lock(pending_sd_offers_mutex_); + std::lock_guard its_lock(pending_sd_offers_mutex_); start_ip_routing(); } #endif @@ -448,7 +448,7 @@ bool routing_manager_impl::offer_service(client_t _client, } { - std::lock_guard its_lock(pending_sd_offers_mutex_); + std::lock_guard its_lock(pending_sd_offers_mutex_); if (if_state_running_) { init_service_info(_service, _instance, true); } else { @@ -542,7 +542,7 @@ void routing_manager_impl::stop_offer_service(client_t _client, } if (is_local) { { - std::lock_guard its_lock(pending_sd_offers_mutex_); + std::lock_guard its_lock(pending_sd_offers_mutex_); for (auto it = pending_sd_offers_.begin(); it != pending_sd_offers_.end(); ) { if (it->first == _service && it->second == _instance) { it = pending_sd_offers_.erase(it); @@ -3881,7 +3881,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { } // start processing of SD messages (incoming remote offers should lead to new subscribe messages) - discovery_->start(); + discovery_->start([]() { }); // Trigger initial offer phase for relevant services for (const auto &its_service : get_offered_services()) { @@ -3950,7 +3950,7 @@ void routing_manager_impl::set_routing_state(routing_state_e _routing_state) { void routing_manager_impl::on_net_interface_or_route_state_changed( bool _is_interface, const std::string &_if, bool _available) { - std::lock_guard its_lock(pending_sd_offers_mutex_); + std::lock_guard its_lock(pending_sd_offers_mutex_); auto log_change_message = [&_if, _available, _is_interface](bool _warning) { std::stringstream ss; ss << (_is_interface ? "Network interface" : "Route") << " \"" << _if @@ -4000,26 +4000,30 @@ void routing_manager_impl::on_net_interface_or_route_state_changed( } void routing_manager_impl::start_ip_routing() { -#if defined(_WIN32) || defined(__QNX__) +#if defined(_WIN32) if_state_running_ = true; #endif - if (routing_ready_handler_) { - routing_ready_handler_(); - } + auto on_routing_started = [this]() -> void + { + std::lock_guard its_lock(pending_sd_offers_mutex_); + + if_state_running_ = true; + for (auto const& its_service : pending_sd_offers_) { + init_service_info(its_service.first, its_service.second, true); + } + pending_sd_offers_.clear(); + + routing_running_ = true; + VSOMEIP_INFO << VSOMEIP_ROUTING_READY_MESSAGE; + }; + if (discovery_) { - discovery_->start(); + discovery_->start(on_routing_started); } else { init_routing_info(); + on_routing_started(); } - - for (auto its_service : pending_sd_offers_) { - init_service_info(its_service.first, its_service.second, true); - } - pending_sd_offers_.clear(); - - routing_running_ = true; - VSOMEIP_INFO << VSOMEIP_ROUTING_READY_MESSAGE; } void diff --git a/implementation/service_discovery/include/service_discovery.hpp b/implementation/service_discovery/include/service_discovery.hpp index d180960f8..73a0cba29 100644 --- a/implementation/service_discovery/include/service_discovery.hpp +++ b/implementation/service_discovery/include/service_discovery.hpp @@ -30,7 +30,7 @@ class service_discovery { virtual boost::asio::io_context &get_io() = 0; virtual void init() = 0; - virtual void start() = 0; + virtual void start(std::function on_routing_started) = 0; virtual void stop() = 0; virtual void request_service(service_t _service, instance_t _instance, diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp index 200627ccf..6143e698e 100644 --- a/implementation/service_discovery/include/service_discovery_impl.hpp +++ b/implementation/service_discovery/include/service_discovery_impl.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -67,7 +68,12 @@ class service_discovery_impl: public service_discovery, std::recursive_mutex& get_subscribed_mutex(); void init(); - void start(); + void start(std::function on_routing_started); + + // Function that'll do the startup work of SD, either in a new thread or + // in the current thread depending on the user's choice + void do_start_sd(std::function on_complete, bool wait_for_if); + void stop(); void request_service(service_t _service, instance_t _instance, @@ -367,7 +373,9 @@ class service_discovery_impl: public service_discovery, boost::asio::ip::address unicast_; uint16_t port_; bool reliable_; + std::mutex endpoint_mutex_; std::shared_ptr endpoint_; + std::thread endpoint_getter_thread_; std::shared_ptr serializer_; std::shared_ptr deserializer_; diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index c38804575..0d5ce921b 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -5,12 +5,23 @@ #include +#if defined(__linux__) || defined(ANDROID) || defined(__QNX__) +#include +#endif + #include #include #include #include #include +#include + +#ifdef __QNX__ +#include +#include +#endif + #include #include "../include/constants.hpp" @@ -71,7 +82,7 @@ service_discovery_impl::service_discovery_impl( find_debounce_time_(VSOMEIP_SD_DEFAULT_FIND_DEBOUNCE_TIME), find_debounce_timer_(_host->get_io()), main_phase_timer_(_host->get_io()), - is_suspended_(false), + is_suspended_(true), // Start suspended: this is different than upstream as we start before a network interface is available is_diagnosis_(false), last_msg_received_timer_(_host->get_io()), last_msg_received_timer_timeout_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY + @@ -80,8 +91,7 @@ service_discovery_impl::service_discovery_impl( next_subscription_expiration_ = std::chrono::steady_clock::now() + std::chrono::hours(24); } -service_discovery_impl::~service_discovery_impl() { -} +service_discovery_impl::~service_discovery_impl() = default; boost::asio::io_context &service_discovery_impl::get_io() { return io_; @@ -158,16 +168,63 @@ service_discovery_impl::init() { + (cyclic_offer_delay_ / 10); } -void -service_discovery_impl::start() { +auto wait_for_interface() -> bool { +#ifdef __QNX__ + static std::string_view constexpr path = VSOMEIP_NETWORK_INT_READY_FILE; + if (path.empty()) { + VSOMEIP_ERROR << "No network interface signal path defined, service discovery will effectively be disabled."; + return false; + } + // Indefinite delay. If the condition we're waiting for doesn't occur then + // we are in an error state and thus should not continue. + static auto constexpr delay_ms = std::numeric_limits::max(); + static int constexpr poll_ms = 50; + + auto const start = std::chrono::steady_clock::now(); + VSOMEIP_DEBUG << "Waiting (blocking) indefinitely on network interface (signal=" << path << ")"; + auto const r = waitfor(path.data(), delay_ms, poll_ms); + auto const end = std::chrono::steady_clock::now(); + auto diff = end - start; + if (0 == r) + { + VSOMEIP_DEBUG << "Waited (blocked) for network interface (signal=" << path << ") for " << std::chrono::duration_cast(diff).count() << " ms."; + return true; + } else { + VSOMEIP_ERROR << "Timedout waiting for network interface (signal=" << path << ") after " << std::chrono::duration_cast(diff).count() << " ms: errno=" << errno << ", msg=" << strerror(errno); + return false; + } +#else + // Omitting a Linux implementation. + // + // Currently the Linux implementations use netlink in routing_manager_impl to + // receive network events, the QNX implementation could be modified to use pps + // (if available) to mirror this functionality. + // + // If however an implementation on linux wanted to use a pattern similar to + // this, inotify would likely be the best candidate to monitor the filesystem + // for this file. + return true; +#endif +} + +void service_discovery_impl::do_start_sd(std::function on_complete, bool wait_for_if) +{ + bool wait_for_result = false; + if (wait_for_if) + { + wait_for_result = wait_for_interface(); + } + + std::lock_guard its_lock(endpoint_mutex_); if (!endpoint_) { endpoint_ = host_->create_service_discovery_endpoint( sd_multicast_, port_, reliable_); if (!endpoint_) { - VSOMEIP_ERROR << "Couldn't start service discovery"; + VSOMEIP_ERROR << "Couldn't start service discovery" << (wait_for_result ? "" : " - likely due to timeing out waiting for a network interface"); return; } } + { std::lock_guard its_lock(sessions_received_mutex_); sessions_received_.clear(); @@ -185,13 +242,10 @@ service_discovery_impl::start() { i.second->set_sent_counter(0); } } - - // rejoin multicast group if (endpoint_ && !reliable_) { - auto its_server_endpoint - = std::dynamic_pointer_cast(endpoint_); - if (its_server_endpoint) - its_server_endpoint->join(sd_multicast_); + // rejoin multicast group + dynamic_cast( + endpoint_.get())->join(sd_multicast_); } } is_suspended_ = false; @@ -199,6 +253,50 @@ service_discovery_impl::start() { start_offer_debounce_timer(true); start_find_debounce_timer(true); start_ttl_timer(); + + on_complete(); +} + +void +service_discovery_impl::start(std::function on_routing_started) { +#if defined(__QNX__) + auto* const use_async_sd = getenv(VSOMEIP_ENV_USE_ASYNCHRONOUS_SD); +#else + // Linux/Android uses netlink, so there's less need for this + // asynchroneous service discovery + const char* use_async_sd = nullptr; +#endif + + if (use_async_sd) + { + // Perform the SD setup in a new thread, and use a wait in that block + // to wait for the network to be available + VSOMEIP_DEBUG << "Starting service discovery using separate thread"; + endpoint_getter_thread_ = std::thread(&service_discovery_impl::do_start_sd, this, on_routing_started, true); + endpoint_getter_thread_.detach(); +#if defined(__linux__) || defined(ANDROID) || defined(__QNX__) + { + auto err = pthread_setname_np(endpoint_getter_thread_.native_handle(), "sd_start"); + if (err) { + VSOMEIP_ERROR << "Could not rename SD thread: " << errno << ":" << strerror(errno); + } + } +#endif + } else { + // Perform the SD setup in the current thread (synchronously), and + // if VSOMEIP_ENV_WAIT_FOR_INTERFACE exists use a wait in that block + // to wait for the network to be available. Note that + // VSOMEIP_ENV_WAIT_FOR_INTERFACE mostly exists for performance + // testing. + +#if defined(__QNX__) + auto* const wait_for_interface_env = getenv(VSOMEIP_ENV_WAIT_FOR_INTERFACE); +#else + const char* wait_for_interface_env = nullptr; +#endif + VSOMEIP_DEBUG << "Starting service discovery using main thread" << (wait_for_interface_env ? ", will block for network interface" : ", will assume network interface pre-exists"); + do_start_sd(on_routing_started, wait_for_interface_env != nullptr); + } } void @@ -3508,6 +3606,7 @@ service_discovery_impl::on_last_msg_received_timer_expired( std::dec << last_msg_received_timer_timeout_.count() << "ms."; // Rejoin multicast group + std::lock_guard its_lock(endpoint_mutex_); if (endpoint_ && !reliable_) { auto its_server_endpoint = std::dynamic_pointer_cast(endpoint_); @@ -3518,7 +3617,7 @@ service_discovery_impl::on_last_msg_received_timer_expired( } { boost::system::error_code ec; - std::lock_guard its_lock(last_msg_received_timer_mutex_); + std::lock_guard its_lock_inner(last_msg_received_timer_mutex_); last_msg_received_timer_.expires_from_now(last_msg_received_timer_timeout_, ec); last_msg_received_timer_.async_wait( std::bind(