diff --git a/examples/quick_start_cs_lc_hybrid/main.cpp b/examples/quick_start_cs_lc_hybrid/main.cpp index 207d79fc..2165735d 100644 --- a/examples/quick_start_cs_lc_hybrid/main.cpp +++ b/examples/quick_start_cs_lc_hybrid/main.cpp @@ -25,6 +25,7 @@ SOFTWARE. #include "HybridCentralSystemEventsHandler.h" #include "ICentralSystem.h" #include "LocalControllerDemoConfig.h" +#include "WebsocketFactory.h" #include #include @@ -95,6 +96,10 @@ int main(int argc, char* argv[]) path /= "quick_start_cs_lc_hybrid.ini"; LocalControllerDemoConfig config(path.string()); + // Configure websocket pools => mandatory for local controller + ocpp::websockets::WebsocketFactory::setClientPoolCount(2u); + ocpp::websockets::WebsocketFactory::startClientPools(); + // Event handler HybridCentralSystemEventsHandler event_handler(config.stackConfig()); diff --git a/examples/quick_start_localcontroller/main.cpp b/examples/quick_start_localcontroller/main.cpp index c9265cd8..b112a008 100644 --- a/examples/quick_start_localcontroller/main.cpp +++ b/examples/quick_start_localcontroller/main.cpp @@ -25,6 +25,7 @@ SOFTWARE. #include "DefaultLocalControllerEventsHandler.h" #include "ILocalController.h" #include "LocalControllerDemoConfig.h" +#include "WebsocketFactory.h" #include #include @@ -98,6 +99,10 @@ int main(int argc, char* argv[]) // Event handler DefaultLocalControllerEventsHandler event_handler(config.stackConfig()); + // Configure websocket pools => mandatory for local controller + ocpp::websockets::WebsocketFactory::setClientPoolCount(2u); + ocpp::websockets::WebsocketFactory::startClientPools(); + // Instanciate local controller std::unique_ptr local_controller = ILocalController::create(config.stackConfig(), event_handler); if (reset_all) diff --git a/src/chargepoint/metervalues/MeterValuesManager.cpp b/src/chargepoint/metervalues/MeterValuesManager.cpp index be9157cb..2cc76861 100644 --- a/src/chargepoint/metervalues/MeterValuesManager.cpp +++ b/src/chargepoint/metervalues/MeterValuesManager.cpp @@ -412,7 +412,7 @@ void MeterValuesManager::processTriggered(unsigned int connector_id) auto measurands = computeMeasurandList(meter_values, measurands_max_size); if (!measurands.empty()) { - LOG_INFO << "Triggered mater values : " << meter_values; + LOG_INFO << "Triggered meter values : " << meter_values; // Get connector Connector* connector = m_connectors.getConnector(connector_id); diff --git a/src/localcontroller/centralsystem/CentralSystemProxy.cpp b/src/localcontroller/centralsystem/CentralSystemProxy.cpp index 63816e48..fd7abf7e 100644 --- a/src/localcontroller/centralsystem/CentralSystemProxy.cpp +++ b/src/localcontroller/centralsystem/CentralSystemProxy.cpp @@ -37,7 +37,7 @@ CentralSystemProxy::CentralSystemProxy(const std::string& const ocpp::config::ILocalControllerConfig& stack_config) : m_identifier(identifier), m_stack_config(stack_config), - m_websocket(ocpp::websockets::WebsocketFactory::newClient()), + m_websocket(ocpp::websockets::WebsocketFactory::newClientFromPool()), m_rpc(*m_websocket, "ocpp1.6"), m_messages_converter(messages_converter), m_msg_dispatcher(messages_validator), diff --git a/src/messages/GetCompositeSchedule.cpp b/src/messages/GetCompositeSchedule.cpp index 36d1c839..8365c3ec 100644 --- a/src/messages/GetCompositeSchedule.cpp +++ b/src/messages/GetCompositeSchedule.cpp @@ -78,7 +78,7 @@ bool GetCompositeScheduleConfConverter::fromJson(const rapidjson::Value& json, if (json.HasMember("chargingSchedule")) { ChargingScheduleConverter charging_schedule_converter; - ret = charging_schedule_converter.fromJson(json[""], data.chargingSchedule.value(), error_code, error_message); + ret = charging_schedule_converter.fromJson(json["chargingSchedule"], data.chargingSchedule.value(), error_code, error_message); } if (!ret && error_code.empty()) { diff --git a/src/tools/helpers/Queue.h b/src/tools/helpers/Queue.h index 45907427..a6048aee 100644 --- a/src/tools/helpers/Queue.h +++ b/src/tools/helpers/Queue.h @@ -160,7 +160,7 @@ class Queue // Retrieve item if (m_enabled) { - item = m_queue.front(); + item = std::move(m_queue.front()); m_queue.pop(); ret = true; } diff --git a/src/websockets/CMakeLists.txt b/src/websockets/CMakeLists.txt index 06112a6c..0aaab291 100644 --- a/src/websockets/CMakeLists.txt +++ b/src/websockets/CMakeLists.txt @@ -9,6 +9,7 @@ add_library(ws OBJECT Url.cpp WebsocketFactory.cpp libwebsockets/LibWebsocketClient.cpp + libwebsockets/LibWebsocketClientPool.cpp libwebsockets/LibWebsocketServer.cpp ) diff --git a/src/websockets/WebsocketFactory.cpp b/src/websockets/WebsocketFactory.cpp index b4ee9f3b..6858f654 100644 --- a/src/websockets/WebsocketFactory.cpp +++ b/src/websockets/WebsocketFactory.cpp @@ -18,17 +18,57 @@ along with OpenOCPP. If not, see . #include "WebsocketFactory.h" #include "LibWebsocketClient.h" +#include "LibWebsocketClientPool.h" #include "LibWebsocketServer.h" +#include +#include +#include + namespace ocpp { namespace websockets { +/** @brief Mutex to protect the access to the client pools */ +static std::mutex s_client_pools_mutex; +/** @brief Client pools */ +static std::vector> s_client_pools; +/** @brief Indicate if the standard clients must be allocated from the pools */ +static bool s_force_clients_from_pool = false; + /** @brief Instanciate a client websocket */ IWebsocketClient* WebsocketFactory::newClient() { - return new LibWebsocketClient(); + if (s_force_clients_from_pool) + { + return newClientFromPool(); + } + else + { + return new LibWebsocketClient(); + } +} + +/** @brief Instanciate a client websocket from the pool (the pool must be started first) */ +IWebsocketClient* WebsocketFactory::newClientFromPool() +{ + IWebsocketClient* ret = nullptr; + std::lock_guard lock(s_client_pools_mutex); + if (!s_client_pools.empty()) + { + // Look for the pool with the less associated clients + auto* selected_pool = &s_client_pools[0]; + for (auto& pool : s_client_pools) + { + if (pool->getClientsCount() < (*selected_pool)->getClientsCount()) + { + selected_pool = &pool; + } + } + ret = (*selected_pool)->newClient(); + } + return ret; } /** @brief Instanciate a server websocket */ @@ -37,5 +77,63 @@ IWebsocketServer* WebsocketFactory::newServer() return new LibWebsocketServer(); } +/** @brief Set the number of client pools (can only be done once) */ +bool WebsocketFactory::setClientPoolCount(size_t count) +{ + bool ret = false; + + std::lock_guard lock(s_client_pools_mutex); + if (s_client_pools.empty() && (count > 0)) + { + for (size_t i = 0; i < count; i++) + { + s_client_pools.emplace_back(std::make_unique()); + } + ret = true; + } + + return ret; +} + +/** @brief Start the client pools */ +bool WebsocketFactory::startClientPools() +{ + bool ret = false; + + std::lock_guard lock(s_client_pools_mutex); + if (!s_client_pools.empty()) + { + for (auto& pool : s_client_pools) + { + ret = pool->start() && ret; + } + } + + return ret; +} + +/** @brief Stop the client pools */ +bool WebsocketFactory::stopClientPools() +{ + bool ret = false; + + std::lock_guard lock(s_client_pools_mutex); + if (!s_client_pools.empty()) + { + for (auto& pool : s_client_pools) + { + ret = pool->stop() && ret; + } + } + + return ret; +} + +/** @brief Indicate to use the client pools even for new clients instanciated with the newClient API */ +void WebsocketFactory::forceClientPoolsUsage() +{ + s_force_clients_from_pool = true; +} + } // namespace websockets } // namespace ocpp diff --git a/src/websockets/WebsocketFactory.h b/src/websockets/WebsocketFactory.h index c55efff5..d9bd01bd 100644 --- a/src/websockets/WebsocketFactory.h +++ b/src/websockets/WebsocketFactory.h @@ -33,8 +33,19 @@ class WebsocketFactory public: /** @brief Instanciate a client websocket */ static IWebsocketClient* newClient(); + /** @brief Instanciate a client websocket from the pool (the pool must be started first) */ + static IWebsocketClient* newClientFromPool(); /** @brief Instanciate a server websocket */ static IWebsocketServer* newServer(); + + /** @brief Set the number of client pools (can only be done once) */ + static bool setClientPoolCount(size_t count); + /** @brief Start the client pools */ + static bool startClientPools(); + /** @brief Stop the client pools (all client communications must be terminated first)*/ + static bool stopClientPools(); + /** @brief Indicate to use the client pools even for new clients instanciated with the newClient API */ + static void forceClientPoolsUsage(); }; } // namespace websockets diff --git a/src/websockets/libwebsockets/LibWebsocketClient.cpp b/src/websockets/libwebsockets/LibWebsocketClient.cpp index ba66c598..b7273c42 100644 --- a/src/websockets/libwebsockets/LibWebsocketClient.cpp +++ b/src/websockets/libwebsockets/LibWebsocketClient.cpp @@ -45,6 +45,7 @@ LibWebsocketClient::LibWebsocketClient() m_credentials(), m_connected(false), m_context(nullptr), + m_logs_context(), m_sched_list(), m_wsi(nullptr), m_retry_policy(), @@ -85,6 +86,11 @@ bool LibWebsocketClient::connect(const std::string& url, static const struct lws_protocols protocols[] = { {"LibWebsocketClient", &LibWebsocketClient::eventCallback, 0, 0, 0, nullptr, 0}, {nullptr, nullptr, 0, 0, 0, nullptr, 0}}; + // Initialize log context + memset(&m_logs_context, 0, sizeof(m_logs_context)); + m_logs_context.u.emit = LIBWEBSOCKET_LOG_OUTPUT_FN; + m_logs_context.lll_flags = LIBWEBSOCKET_LOG_FLAGS; + // Fill context information struct lws_context_creation_info info; memset(&info, 0, sizeof info); @@ -92,6 +98,7 @@ bool LibWebsocketClient::connect(const std::string& url, info.port = CONTEXT_PORT_NO_LISTEN; info.protocols = protocols; info.timeout_secs = static_cast(std::chrono::duration_cast(connect_timeout).count()); + info.log_cx = &m_logs_context; m_credentials = credentials; if (m_url.protocol() == "wss") { diff --git a/src/websockets/libwebsockets/LibWebsocketClient.h b/src/websockets/libwebsockets/LibWebsocketClient.h index d2a4d9b8..8ee2e1cb 100644 --- a/src/websockets/libwebsockets/LibWebsocketClient.h +++ b/src/websockets/libwebsockets/LibWebsocketClient.h @@ -109,6 +109,8 @@ class LibWebsocketClient : public IWebsocketClient /** @brief Websocket context */ struct lws_context* m_context; + /** @brief Websocket log context */ + lws_log_cx_t m_logs_context; /** @brief Schedule list */ lws_sorted_usec_list_t m_sched_list; /** @brief Related wsi */ diff --git a/src/websockets/libwebsockets/LibWebsocketClientPool.cpp b/src/websockets/libwebsockets/LibWebsocketClientPool.cpp new file mode 100644 index 00000000..aa6c9160 --- /dev/null +++ b/src/websockets/libwebsockets/LibWebsocketClientPool.cpp @@ -0,0 +1,710 @@ +/* +Copyright (c) 2020 Cedric Jimenez +This file is part of OpenOCPP. + +OpenOCPP is free software: you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation, either version 2.1 of the License, or +(at your option) any later version. + +OpenOCPP is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with OpenOCPP. If not, see . +*/ + +#include "LibWebsocketClientPool.h" + +#include +#include +#include + +namespace ocpp +{ +namespace websockets +{ + +/** @brief Thread local client instance used when callbacks doesn't provide user data */ +thread_local LibWebsocketClientPool* pool; + +/** @brief Constructor */ +LibWebsocketClientPool::LibWebsocketClientPool() + : m_context(nullptr), + m_logs_context(), + m_thread(nullptr), + m_end(false), + m_clients_count(0), + m_clients_count_mutex(), + m_waiting_connect_queue(), + m_waiting_disconnect_queue(), + m_waiting_send_queue() +{ +} + +/** @brief Destructor */ +LibWebsocketClientPool::~LibWebsocketClientPool() +{ + stop(); +} + +/** @brief Start the pool */ +bool LibWebsocketClientPool::start() +{ + bool ret = false; + + // Check if thread is alive and if a listener has been registered + if (!m_thread) + { + // Fill context information + struct lws_context_creation_info info; + memset(&info, 0, sizeof(info)); + info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT | LWS_SERVER_OPTION_EXPLICIT_VHOSTS; + info.user = this; + info.log_cx = &m_logs_context; + + memset(&m_logs_context, 0, sizeof(m_logs_context)); + m_logs_context.u.emit = LIBWEBSOCKET_LOG_OUTPUT_FN; + m_logs_context.lll_flags = LIBWEBSOCKET_LOG_FLAGS; + + // Create context + m_context = lws_create_context(&info); + if (m_context) + { + // Start process thread + m_end = false; + m_thread = new std::thread(std::bind(&LibWebsocketClientPool::process, this)); + ret = true; + } + } + + return ret; +} + +/** @brief Stop the pool */ +bool LibWebsocketClientPool::stop() +{ + bool ret = false; + + // Check if thread is alive + if (m_thread) + { + // Stop thread + m_end = true; + lws_cancel_service(m_context); + if (std::this_thread::get_id() != m_thread->get_id()) + { + m_thread->join(); + } + else + { + m_thread->detach(); + } + + delete m_thread; + m_thread = nullptr; + ret = true; + } + + return ret; +} + +/** @brief Get the number of clients */ +size_t LibWebsocketClientPool::getClientsCount() const +{ + std::lock_guard lock(m_clients_count_mutex); + return m_clients_count; +} + +/** @brief Instanciate a client websocket */ +IWebsocketClient* LibWebsocketClientPool::newClient() +{ + return new Client(*this); +} + +/** @brief Internal thread */ +void LibWebsocketClientPool::process() +{ + // Save this pointer for further callbacks + pool = this; + + // Mask SIG_PIPE signal +#ifndef _MSC_VER + sigset_t set; + sigemptyset(&set); + sigaddset(&set, SIGPIPE); + pthread_sigmask(SIG_BLOCK, &set, NULL); +#endif // _MSC_VER + + // Need to ensure that the context is still valid when a user callback + // has called stop() function + lws_context* context = m_context; + + // Dummy vhost to handle context related events + struct lws_protocols protocols[] = {{"LibWebsocketClientPool", &LibWebsocketClientPool::eventCallback, 0, 0, 0, this, 0}, + LWS_PROTOCOL_LIST_TERM}; + struct lws_context_creation_info vhost_info; + memset(&vhost_info, 0, sizeof(vhost_info)); + vhost_info.protocols = protocols; + vhost_info.log_cx = &m_logs_context; + lws_vhost* vhost = lws_create_vhost(m_context, &vhost_info); + + // Event loop + int ret = 0; + while (!m_end && (ret >= 0)) + { + ret = lws_service(context, 0); + } + if (!m_end) + { + stop(); + } + + // Destroy context + std::this_thread::sleep_for(std::chrono::milliseconds(50)); // Ensure stop caller is joining + lws_vhost_destroy(vhost); + lws_context_destroy(context); +} + +/** @brief libwebsockets event callback */ +int LibWebsocketClientPool::eventCallback(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len) noexcept +{ + int ret = 0; + + (void)wsi; + (void)in; + (void)len; + (void)user; + + // Handle event + switch (reason) + { + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + { + // Schedule first connections + Client* waiting_client = nullptr; + while (pool->m_waiting_connect_queue.pop(waiting_client, 0)) + { + lws_sul_schedule( + pool->m_context, 0, &waiting_client->m_schedule_data.sched_list, LibWebsocketClientPool::Client::connectCallback, 1); + } + + // Handle requested sends + while (pool->m_waiting_send_queue.pop(waiting_client, 0)) + { + if (waiting_client->m_connected) + { + lws_callback_on_writable(waiting_client->m_wsi); + } + } + + // Handle requested disconnections + while (pool->m_waiting_disconnect_queue.pop(waiting_client, 0)) + { + lws_sul_cancel(&waiting_client->m_schedule_data.sched_list); + if (waiting_client->m_connected) + { + lws_set_timeout(waiting_client->m_wsi, static_cast(1), LWS_TO_KILL_SYNC); + } + lws_vhost_destroy(waiting_client->m_vhost); + waiting_client->m_vhost = nullptr; + waiting_client->m_connected = false; + waiting_client->m_disconnect_process_done = true; + waiting_client->m_disconnect_cond_var.notify_all(); + } + } + break; + + default: + break; + } + + return ret; +} + +/** @brief Constructor */ +LibWebsocketClientPool::Client::Client(LibWebsocketClientPool& pool) + : IWebsocketClient(), + m_pool(pool), + m_listener(nullptr), + m_connect_timeout(0), + m_retry_interval(0), + m_ping_interval(0), + m_connection_error_notified(false), + m_url(), + m_protocol(""), + m_credentials(), + m_connected(false), + m_disconnect_cond_var(), + m_disconnect_mutex(), + m_disconnect_process_done(false), + m_context(m_pool.m_context), + m_vhost(nullptr), + m_wsi(nullptr), + m_retry_policy(), + m_retry_count(0), + m_schedule_data(), + m_send_msgs(), + m_fragmented_frame(nullptr), + m_fragmented_frame_size(0), + m_fragmented_frame_index(0) +{ + // Increase client count in the associated pool + std::lock_guard lock(m_pool.m_clients_count_mutex); + m_pool.m_clients_count++; +} + +/** @brief Destructor */ +LibWebsocketClientPool::Client::~Client() +{ + // To prevent keeping an open connection in background + disconnect(); + releaseFragmentedFrame(); + + // Decrease client count in the associated pool + std::lock_guard lock(m_pool.m_clients_count_mutex); + m_pool.m_clients_count--; +} + +/** @copydoc bool IWebsocketClient::connect(const std::string&, const std::string&, const Credentials&, + * std::chrono::milliseconds, std::chrono::milliseconds, std::chrono::milliseconds) */ +bool LibWebsocketClientPool::Client::connect(const std::string& url, + const std::string& protocol, + const Credentials& credentials, + std::chrono::milliseconds connect_timeout, + std::chrono::milliseconds retry_interval, + std::chrono::milliseconds ping_interval) +{ + bool ret = false; + + // Check if thread is alive and if a listener has been registered + if (!m_vhost && m_listener) + { + // Check URL + m_url = url; + if (m_url.isValid() && ((m_url.protocol() == "ws") || (m_url.protocol() == "wss"))) + { + // Save connection parameters + m_protocol = protocol; + m_credentials = credentials; + m_connect_timeout = static_cast(std::chrono::duration_cast(connect_timeout).count()); + m_retry_interval = static_cast(retry_interval.count()); + m_ping_interval = static_cast(std::chrono::duration_cast(ping_interval).count()); + + // Configure retry policy +#ifdef _MSC_VER + m_retry_policy = {&m_retry_interval, 1, 1, m_ping_interval, static_cast(2u * m_ping_interval), 20}; +#else + m_retry_policy = { + .retry_ms_table = &m_retry_interval, + .retry_ms_table_count = 1, + .conceal_count = 1, + + .secs_since_valid_ping = m_ping_interval, /* force PINGs after secs idle */ + .secs_since_valid_hangup = static_cast(2u * m_ping_interval), /* hangup after secs idle */ + + .jitter_percent = 20, + }; +#endif // _MSC_VER + + // Initialize schedule data + memset(&m_schedule_data, 0, sizeof(m_schedule_data)); + m_schedule_data.client = this; + + // Start connection process + m_connection_error_notified = false; + m_connected = false; + m_pool.m_waiting_connect_queue.push(this); + lws_cancel_service(m_context); + + ret = true; + } + } + + return ret; +} + +/** @copydoc bool IWebsocketClient::disconnect() */ +bool LibWebsocketClientPool::Client::disconnect() +{ + bool ret = false; + + // Check if connected + if (m_vhost) + { + // Schedule disconnection + m_retry_interval = 0; + m_disconnect_process_done = false; + m_pool.m_waiting_disconnect_queue.push(this); + lws_cancel_service(m_context); + + // Wait actual disconnection + if (std::this_thread::get_id() != m_pool.m_thread->get_id()) + { + std::unique_lock lock(m_disconnect_mutex); + m_disconnect_cond_var.wait(lock, [&] { return m_disconnect_process_done; }); + } + } + + // Clear message queue + SendMsg* msg; + while (m_send_msgs.pop(msg, 0)) + { + delete msg; + } + + ret = true; + + return ret; +} + +/** @copydoc bool IWebsocketClient::isConnected() */ +bool LibWebsocketClientPool::Client::isConnected() +{ + return m_connected; +} + +/** @copydoc bool IWebsocketClient::send(const void*, size_t) */ +bool LibWebsocketClientPool::Client::send(const void* data, size_t size) +{ + bool ret = false; + + // Check if connected + if (m_connected) + { + // Prepare data to send + SendMsg* msg = new SendMsg(data, size); + ret = m_send_msgs.push(msg); + + // Schedule a send + m_pool.m_waiting_send_queue.push(this); + lws_cancel_service(m_context); + } + + return ret; +} + +/** @copydoc void IWebsocketClient::registerListener(IListener&) */ +void LibWebsocketClientPool::Client::registerListener(IListener& listener) +{ + m_listener = &listener; +} + +/** @brief Prepare the buffer to store a new fragmented frame */ +void LibWebsocketClientPool::Client::beginFragmentedFrame(size_t frame_size) +{ + // Release previously allocated data + releaseFragmentedFrame(); + + // Allocate new buffer + m_fragmented_frame = new uint8_t[frame_size]; + m_fragmented_frame_size = frame_size; +} + +/** @brief Append data to the fragmented frame */ +void LibWebsocketClientPool::Client::appendFragmentedData(const void* data, size_t size) +{ + size_t copy_len = size; + if ((m_fragmented_frame_index + size) >= m_fragmented_frame_size) + { + copy_len = m_fragmented_frame_size - m_fragmented_frame_index; + } + memcpy(&m_fragmented_frame[m_fragmented_frame_index], data, copy_len); + m_fragmented_frame_index += copy_len; +} + +/** @brief Release the memory associated with the fragmented frame */ +void LibWebsocketClientPool::Client::releaseFragmentedFrame() +{ + delete[] m_fragmented_frame; + m_fragmented_frame = nullptr; + m_fragmented_frame_size = 0; + m_fragmented_frame_index = 0; +} + +/** @brief libwebsockets connection callback */ +void LibWebsocketClientPool::Client::connectCallback(struct lws_sorted_usec_list* sul) noexcept +{ + // Get next client to connect + ScheduleData* schedule_data = lws_container_of(sul, ScheduleData, sched_list); + if (schedule_data) + { + // Check if vhost has been created + Client* client = schedule_data->client; + if (!client->m_vhost) + { + // Define callback + struct lws_protocols protocols[] = { + {"LibWebsocketClientPoolClient", &LibWebsocketClientPool::Client::eventCallback, 0, 0, 0, client, 0}, + LWS_PROTOCOL_LIST_TERM}; + + // Fill vhost information + struct lws_context_creation_info vhost_info; + memset(&vhost_info, 0, sizeof(vhost_info)); + vhost_info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + vhost_info.port = CONTEXT_PORT_NO_LISTEN; + vhost_info.timeout_secs = client->m_connect_timeout; + vhost_info.protocols = protocols; + vhost_info.log_cx = &pool->m_logs_context; + if (client->m_url.protocol() == "wss") + { + if (!client->m_credentials.tls12_cipher_list.empty()) + { + vhost_info.client_ssl_cipher_list = client->m_credentials.tls12_cipher_list.c_str(); + } + if (!client->m_credentials.tls13_cipher_list.empty()) + { + vhost_info.client_tls_1_3_plus_cipher_list = client->m_credentials.tls13_cipher_list.c_str(); + } + if (client->m_credentials.encoded_pem_certificates) + { + // Use PEM encoded data + if (!client->m_credentials.server_certificate_ca.empty()) + { + vhost_info.client_ssl_ca_mem = client->m_credentials.server_certificate_ca.c_str(); + vhost_info.client_ssl_ca_mem_len = static_cast(client->m_credentials.server_certificate_ca.size()); + } + if (!client->m_credentials.client_certificate.empty()) + { + vhost_info.client_ssl_cert_mem = client->m_credentials.client_certificate.c_str(); + vhost_info.client_ssl_cert_mem_len = static_cast(client->m_credentials.client_certificate.size()); + } + if (!client->m_credentials.client_certificate_private_key.empty()) + { + vhost_info.client_ssl_key_mem = client->m_credentials.client_certificate_private_key.c_str(); + vhost_info.client_ssl_key_mem_len = + static_cast(client->m_credentials.client_certificate_private_key.size()); + } + } + else + { + // Load PEM files from filesystem + if (!client->m_credentials.server_certificate_ca.empty()) + { + vhost_info.client_ssl_ca_filepath = client->m_credentials.server_certificate_ca.c_str(); + } + if (!client->m_credentials.client_certificate.empty()) + { + vhost_info.client_ssl_cert_filepath = client->m_credentials.client_certificate.c_str(); + } + if (!client->m_credentials.client_certificate_private_key.empty()) + { + vhost_info.client_ssl_private_key_filepath = client->m_credentials.client_certificate_private_key.c_str(); + } + } + if (!client->m_credentials.client_certificate_private_key_passphrase.empty()) + { + vhost_info.client_ssl_private_key_password = client->m_credentials.client_certificate_private_key_passphrase.c_str(); + } + } + + // Create vhost + client->m_vhost = lws_create_vhost(client->m_context, &vhost_info); + } + if (client->m_vhost) + { + // Connexion parameters + struct lws_client_connect_info connect_info; + memset(&connect_info, 0, sizeof(connect_info)); + connect_info.context = client->m_context; + connect_info.vhost = client->m_vhost; + connect_info.address = client->m_url.address().c_str(); + connect_info.path = client->m_url.path().c_str(); + connect_info.host = connect_info.address; + connect_info.origin = connect_info.address; + if (client->m_url.protocol() == "wss") + { + connect_info.ssl_connection = LCCSCF_USE_SSL; + if (client->m_credentials.allow_selfsigned_certificates) + { + connect_info.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED; + } + if (client->m_credentials.allow_expired_certificates) + { + connect_info.ssl_connection |= LCCSCF_ALLOW_EXPIRED; + } + if (client->m_credentials.accept_untrusted_certificates) + { + connect_info.ssl_connection |= LCCSCF_ALLOW_INSECURE; + } + if (client->m_credentials.skip_server_name_check) + { + connect_info.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; + } + connect_info.port = 443; + } + else + { + connect_info.port = 80; + } + if (client->m_url.port()) + { + connect_info.port = static_cast(client->m_url.port()); + } + connect_info.protocol = client->m_protocol.c_str(); + connect_info.local_protocol_name = "LibWebsocketClientPoolClient"; + connect_info.pwsi = &client->m_wsi; + connect_info.retry_and_idle_policy = &client->m_retry_policy; + connect_info.userdata = client; + + // Start connection + if (!lws_client_connect_via_info(&connect_info)) + { + // Schedule a retry + client->m_retry_count = 0; + lws_retry_sul_schedule(pool->m_context, + 0, + sul, + &client->m_retry_policy, + &LibWebsocketClientPool::Client::connectCallback, + &client->m_retry_count); + } + } + } +} + +/** @brief libwebsockets event callback */ +int LibWebsocketClientPool::Client::eventCallback( + struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len) noexcept +{ + int ret = 0; + bool retry = false; + + // Get corresponding client + Client* client = reinterpret_cast(user); + + // Handle event + switch (reason) + { + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + if (!client->m_connection_error_notified) + { + client->m_connection_error_notified = true; + client->m_listener->wsClientFailed(); + } + if (client->m_retry_interval != 0) + { + retry = true; + } + break; + + case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: + { + unsigned char **p = (unsigned char**)in, *end = (*p) + len; + char b[128]; + + if (client->m_credentials.user.empty()) + break; + + if (lws_http_basic_auth_gen(client->m_credentials.user.c_str(), client->m_credentials.password.c_str(), b, sizeof(b))) + break; + if (lws_add_http_header_by_token(wsi, WSI_TOKEN_HTTP_AUTHORIZATION, (unsigned char*)b, (int)strlen(b), p, end)) + return -1; + + break; + } + + case LWS_CALLBACK_CLIENT_ESTABLISHED: + { + client->m_connected = true; + client->m_listener->wsClientConnected(); + break; + } + + case LWS_CALLBACK_CLIENT_RECEIVE: + { + if (client->m_listener) + { + // Get frame info + bool is_first = (lws_is_first_fragment(wsi) == 1); + bool is_last = (lws_is_final_fragment(wsi) == 1); + size_t remaining_length = lws_remaining_packet_payload(wsi); + if (is_first && is_last) + { + // Notify client + client->m_listener->wsClientDataReceived(in, len); + } + else if (is_first) + { + // Prepare frame bufferization + client->beginFragmentedFrame(len + remaining_length); + client->appendFragmentedData(in, len); + } + else + { + // Bufferize data + client->appendFragmentedData(in, len); + if (is_last) + { + // Notify client + client->m_listener->wsClientDataReceived(client->m_fragmented_frame, client->m_fragmented_frame_size); + + // Release resources + client->releaseFragmentedFrame(); + } + } + } + } + break; + + case LWS_CALLBACK_CLIENT_WRITEABLE: + { + // Send data if any ready + bool error = false; + SendMsg* msg = nullptr; + while (client->m_send_msgs.pop(msg, 0) && !error) + { + if (lws_write(wsi, msg->payload, msg->size, LWS_WRITE_TEXT) < static_cast(msg->size)) + { + // Error, close the socket + error = true; + } + + // Free message memory + delete msg; + } + if (error) + { + return -1; + } + } + break; + + case LWS_CALLBACK_CLOSED_CLIENT_HTTP: + if (client->m_retry_interval != 0) + { + retry = true; + } + break; + + case LWS_CALLBACK_CLIENT_CLOSED: + client->m_connected = false; + client->m_listener->wsClientDisconnected(); + if (client->m_retry_interval != 0) + { + retry = true; + } + break; + + default: + break; + } + if (retry) + { + // Schedule a retry + client->m_retry_count = 0; + lws_retry_sul_schedule_retry_wsi( + wsi, &client->m_schedule_data.sched_list, &LibWebsocketClientPool::Client::connectCallback, &client->m_retry_count); + } + else + { + ret = lws_callback_http_dummy(wsi, reason, user, in, len); + } + + return ret; +} + +} // namespace websockets +} // namespace ocpp diff --git a/src/websockets/libwebsockets/LibWebsocketClientPool.h b/src/websockets/libwebsockets/LibWebsocketClientPool.h new file mode 100644 index 00000000..703a3c0f --- /dev/null +++ b/src/websockets/libwebsockets/LibWebsocketClientPool.h @@ -0,0 +1,217 @@ +/* +Copyright (c) 2020 Cedric Jimenez +This file is part of OpenOCPP. + +OpenOCPP is free software: you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation, either version 2.1 of the License, or +(at your option) any later version. + +OpenOCPP is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with OpenOCPP. If not, see . +*/ + +#ifndef OPENOCPP_LIBWEBSOCKETCLIENTPOOL_H +#define OPENOCPP_LIBWEBSOCKETCLIENTPOOL_H + +#include "IWebsocketClient.h" +#include "Queue.h" +#include "Url.h" +#include "websockets.h" + +#include +#include +#include + +namespace ocpp +{ +namespace websockets +{ + +/** @brief Pool of websocket clients */ +class LibWebsocketClientPool +{ + // Forward declaration + class Client; + friend class Client; + + public: + /** @brief Constructor */ + LibWebsocketClientPool(); + /** @brief Destructor */ + virtual ~LibWebsocketClientPool(); + + /** @brief Start the pool */ + bool start(); + /** @brief Stop the pool */ + bool stop(); + /** @brief Get the number of clients */ + size_t getClientsCount() const; + + /** @brief Instanciate a client websocket */ + IWebsocketClient* newClient(); + + private: + /** @brief Websocket context */ + struct lws_context* m_context; + /** @brief Websocket log context */ + lws_log_cx_t m_logs_context; + /** @brief Internal thread */ + std::thread* m_thread; + /** @brief Indicate the end of processing to the thread */ + bool m_end; + /** @brief Number of clients */ + size_t m_clients_count; + /** @brief Mutex for the number of clients */ + mutable std::mutex m_clients_count_mutex; + + /** @brief Clients queued for connection */ + ocpp::helpers::Queue m_waiting_connect_queue; + /** @brief Clients queued for disconnection */ + ocpp::helpers::Queue m_waiting_disconnect_queue; + /** @brief Clients queued for send */ + ocpp::helpers::Queue m_waiting_send_queue; + + /** @brief Internal thread */ + void process(); + + /** @brief libwebsockets event callback */ + static int eventCallback(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len) noexcept; + + /** @brief Websocket client implementation using libwebsockets */ + class Client : public IWebsocketClient + { + friend class LibWebsocketClientPool; + + public: + /** @brief Constructor */ + Client(LibWebsocketClientPool& pool); + /** @brief Destructor */ + virtual ~Client(); + + /** @copydoc bool IWebsocketClient::connect(const std::string&, const std::string&, const Credentials&, + * std::chrono::milliseconds, std::chrono::milliseconds, std::chrono::milliseconds) */ + bool connect(const std::string& url, + const std::string& protocol, + const Credentials& credentials, + std::chrono::milliseconds connect_timeout = std::chrono::seconds(5), + std::chrono::milliseconds retry_interval = std::chrono::seconds(5), + std::chrono::milliseconds ping_interval = std::chrono::seconds(5)) override; + + /** @copydoc bool IWebsocketClient::disconnect() */ + bool disconnect() override; + + /** @copydoc bool IWebsocketClient::isConnected() */ + bool isConnected() override; + + /** @copydoc bool IWebsocketClient::send(const void*, size_t) */ + bool send(const void* data, size_t size) override; + + /** @copydoc void IWebsocketClient::registerListener(IListener&) */ + void registerListener(IListener& listener) override; + + private: + /** @brief Message to send */ + struct SendMsg + { + /** @brief Constructor */ + SendMsg(const void* _data, size_t _size) + { + data = new unsigned char[LWS_PRE + _size]; + size = _size; + payload = &data[LWS_PRE]; + memcpy(payload, _data, size); + } + /** @brief Destructor */ + virtual ~SendMsg() { delete[] data; } + + /** @brief Data buffer */ + unsigned char* data; + /** @brief Payload start */ + unsigned char* payload; + /** @brief Size in bytes */ + size_t size; + }; + + /** @brief Schedule data */ + struct ScheduleData + { + /** @brief Associated client */ + Client* client; + /** @brief Schedule list */ + lws_sorted_usec_list_t sched_list; + }; + + /** @brief Associated client pool */ + LibWebsocketClientPool& m_pool; + /** @brief Listener */ + IListener* m_listener; + /** @brief Connection timeout */ + unsigned int m_connect_timeout; + /** @brief Retry interval in ms */ + uint32_t m_retry_interval; + /** @brief PING interval in s */ + uint16_t m_ping_interval; + /** @brief Indicate if the connection error has been notified at least once */ + bool m_connection_error_notified; + /** @brief Connection URL */ + Url m_url; + /** @brief Name of the protocol to use */ + std::string m_protocol; + /** @brief Credentials */ + Credentials m_credentials; + /** @brief Indicate the connection state */ + bool m_connected; + /** @brief Disconnect condition variable */ + std::condition_variable m_disconnect_cond_var; + /** @brief Disconnect mutex */ + std::mutex m_disconnect_mutex; + /** @brief Indicate that the disconnect process is done */ + bool m_disconnect_process_done; + + /** @brief Websocket context */ + struct lws_context* m_context; + /** @brief Websocket vhost */ + struct lws_vhost* m_vhost; + /** @brief Related wsi */ + struct lws* m_wsi; + /** @brief Retry policy */ + lws_retry_bo_t m_retry_policy; + /** @brief Consecutive retries */ + uint16_t m_retry_count; + /** @brief Schedule data */ + struct ScheduleData m_schedule_data; + + /** @brief Queue of messages to send */ + ocpp::helpers::Queue m_send_msgs; + + /** @brief Buffer to store fragmented frames */ + uint8_t* m_fragmented_frame; + /** @brief Size of the fragmented frame */ + size_t m_fragmented_frame_size; + /** @brief Current index in the fragmented frame */ + size_t m_fragmented_frame_index; + + /** @brief Prepare the buffer to store a new fragmented frame */ + void beginFragmentedFrame(size_t frame_size); + /** @brief Append data to the fragmented frame */ + void appendFragmentedData(const void* data, size_t size); + /** @brief Release the memory associated with the fragmented frame */ + void releaseFragmentedFrame(); + + /** @brief libwebsockets connection callback */ + static void connectCallback(struct lws_sorted_usec_list* sul) noexcept; + /** @brief libwebsockets event callback */ + static int eventCallback(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len) noexcept; + }; +}; + +} // namespace websockets +} // namespace ocpp + +#endif // OPENOCPP_LIBWEBSOCKETCLIENTPOOL_H diff --git a/src/websockets/libwebsockets/LibWebsocketServer.cpp b/src/websockets/libwebsockets/LibWebsocketServer.cpp index bc58802d..5729d61c 100644 --- a/src/websockets/libwebsockets/LibWebsocketServer.cpp +++ b/src/websockets/libwebsockets/LibWebsocketServer.cpp @@ -41,6 +41,7 @@ LibWebsocketServer::LibWebsocketServer() m_protocol(""), m_credentials(), m_context(nullptr), + m_logs_context(), m_wsi(nullptr), m_retry_policy(), m_protocols(), @@ -90,11 +91,17 @@ bool LibWebsocketServer::start(const std::string& url, .jitter_percent = 0}; #endif // _MSC_VER + // Initialize log context + memset(&m_logs_context, 0, sizeof(m_logs_context)); + m_logs_context.u.emit = LIBWEBSOCKET_LOG_OUTPUT_FN; + m_logs_context.lll_flags = LIBWEBSOCKET_LOG_FLAGS; + // Fill context information struct lws_context_creation_info info; memset(&info, 0, sizeof info); info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT | LWS_SERVER_OPTION_SKIP_SERVER_CANONICAL_NAME | LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE; + info.log_cx = &m_logs_context; if (m_url.port() != 0) { info.port = static_cast(m_url.port()); @@ -305,11 +312,14 @@ int LibWebsocketServer::eventCallback(struct lws* wsi, enum lws_callback_reasons // BEGIN SNIPPET // Did he send auth? - int ml = lws_hdr_total_length(wsi, WSI_TOKEN_HTTP_AUTHORIZATION); + bool authorized = false; + int ml = lws_hdr_total_length(wsi, WSI_TOKEN_HTTP_AUTHORIZATION); if (!ml) { - lwsl_err("missing basic authent header\n"); - ret = -1; + lwsl_warn("missing basic authent header\n"); + + // Notify connection without credentials + authorized = server->m_listener->wsCheckCredentials(uri, "", ""); } else { @@ -330,7 +340,9 @@ int LibWebsocketServer::eventCallback(struct lws* wsi, enum lws_callback_reasons #endif // _MSC_VER { lwsl_err("auth missing basic: %s\n", b64); - ret = -1; + + // Notify connection without credentials + authorized = server->m_listener->wsCheckCredentials(uri, "", ""); } else { @@ -356,38 +368,37 @@ int LibWebsocketServer::eventCallback(struct lws* wsi, enum lws_callback_reasons // Check credentials std::string username(plain, static_cast(pcolon - plain)); std::string password(pcolon + 1u); - if (!server->m_listener->wsCheckCredentials(uri, username, password)) - { - // The following code snippet is from the lws_unauthorised_basic_auth() function in server.c file of libwebsockets - // BEGIN SNIPPET - unsigned char frame_buffer[LWS_PRE + 1024u]; - unsigned char* start = &frame_buffer[LWS_PRE]; - unsigned char* p = start; - unsigned char* end = &frame_buffer[sizeof(frame_buffer) - 1u]; - char buf[64]; - if (!lws_add_http_header_status(wsi, HTTP_STATUS_UNAUTHORIZED, &p, end)) - { - int n = lws_snprintf(buf, sizeof(buf), "Basic realm=\"Open OCPP\""); - n = lws_add_http_header_by_token( - wsi, WSI_TOKEN_HTTP_WWW_AUTHENTICATE, (unsigned char*)buf, n, &p, end); - n += lws_add_http_header_content_length(wsi, 0, &p, end); - n += lws_finalize_http_header(wsi, &p, end); - n += lws_write( - wsi, - start, - lws_ptr_diff_size_t(p, start), - static_cast(LWS_WRITE_HTTP_HEADERS | LWS_WRITE_H2_STREAM_END)); - n += lws_http_transaction_completed(wsi); - } - // END SNIPPET - ret = -1; - } + authorized = server->m_listener->wsCheckCredentials(uri, username, password); } } } } } // END SNIPPET + if (!authorized) + { + // The following code snippet is from the lws_unauthorised_basic_auth() function in server.c file of libwebsockets + // BEGIN SNIPPET + unsigned char frame_buffer[LWS_PRE + 1024u]; + unsigned char* start = &frame_buffer[LWS_PRE]; + unsigned char* p = start; + unsigned char* end = &frame_buffer[sizeof(frame_buffer) - 1u]; + char buf[64]; + if (!lws_add_http_header_status(wsi, HTTP_STATUS_UNAUTHORIZED, &p, end)) + { + int n = lws_snprintf(buf, sizeof(buf), "Basic realm=\"Open OCPP\""); + n = lws_add_http_header_by_token(wsi, WSI_TOKEN_HTTP_WWW_AUTHENTICATE, (unsigned char*)buf, n, &p, end); + n += lws_add_http_header_content_length(wsi, 0, &p, end); + n += lws_finalize_http_header(wsi, &p, end); + n += lws_write(wsi, + start, + lws_ptr_diff_size_t(p, start), + static_cast(LWS_WRITE_HTTP_HEADERS | LWS_WRITE_H2_STREAM_END)); + n += lws_http_transaction_completed(wsi); + } + // END SNIPPET + ret = -1; + } } } else diff --git a/src/websockets/libwebsockets/LibWebsocketServer.h b/src/websockets/libwebsockets/LibWebsocketServer.h index b62f85b1..d91dc812 100644 --- a/src/websockets/libwebsockets/LibWebsocketServer.h +++ b/src/websockets/libwebsockets/LibWebsocketServer.h @@ -172,6 +172,8 @@ class LibWebsocketServer : public IWebsocketServer /** @brief Websocket context */ struct lws_context* m_context; + /** @brief Websocket log context */ + lws_log_cx_t m_logs_context; /** @brief Related wsi */ struct lws* m_wsi; /** @brief Retry policy */ diff --git a/src/websockets/libwebsockets/websockets.h b/src/websockets/libwebsockets/websockets.h index 0a453894..37c66501 100644 --- a/src/websockets/libwebsockets/websockets.h +++ b/src/websockets/libwebsockets/websockets.h @@ -35,4 +35,10 @@ along with OpenOCPP. If not, see . #pragma warning(pop) #endif // _MSC_VER +/** @brief Log selection for the websocket library */ +#define LIBWEBSOCKET_LOG_FLAGS (LLL_ERR | LLL_WARN | LLL_NOTICE) + +/** @brief Log ouput function for the websocket library */ +#define LIBWEBSOCKET_LOG_OUTPUT_FN lwsl_emit_stderr + #endif // OPENOCPP_WEBSOCKETS_H diff --git a/tests/websockets/CMakeLists.txt b/tests/websockets/CMakeLists.txt index 464f3c15..a54c5dc5 100644 --- a/tests/websockets/CMakeLists.txt +++ b/tests/websockets/CMakeLists.txt @@ -9,3 +9,11 @@ add_test( NAME test_websockets_url COMMAND test_websockets_url ) + +# Uncomment for debug purpose only +# add_executable(test_websockets test_websockets.cpp) +# target_link_libraries(test_websockets ws ${OPENOCPP_COMMON_TEST_LIBS}) +# add_test( +# NAME test_websockets +# COMMAND test_websockets +# ) diff --git a/tests/websockets/test_websockets.cpp b/tests/websockets/test_websockets.cpp new file mode 100644 index 00000000..d164660e --- /dev/null +++ b/tests/websockets/test_websockets.cpp @@ -0,0 +1,103 @@ +/* +Copyright (c) 2020 Cedric Jimenez +This file is part of OpenOCPP. + +OpenOCPP is free software: you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation, either version 2.1 of the License, or +(at your option) any later version. + +OpenOCPP is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with OpenOCPP. If not, see . +*/ + +#include "WebsocketFactory.h" + +#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN +#include "doctest_wrapper.h" + +#include +#include + +using namespace ocpp::websockets; +using namespace std::chrono_literals; +using namespace std::string_literals; + +class WsListener : public IWebsocketClient::IListener +{ + public: + /** @brief Constructor */ + WsListener(const std::string& name, IWebsocketClient& client) : m_name(name), m_client(client) { } + + /** @brief Called when connection is successfull */ + void wsClientConnected() override { std::cout << "[" << m_name << "] - Connected!" << std::endl; } + + /** @brief Called when connection failed */ + void wsClientFailed() override { std::cout << "[" << m_name << "] - Connection failed!" << std::endl; } + + /** @brief Called when connection is lost */ + void wsClientDisconnected() override { std::cout << "[" << m_name << "] - Disconnected!" << std::endl; } + + /** @brief Called when a critical error occured */ + void wsClientError() override { std::cout << "[" << m_name << "] - Error!" << std::endl; } + + /** @brief Call when data has been received */ + void wsClientDataReceived(const void* data, size_t size) override + { + (void)data; + std::cout << "[" << m_name << "] - Data received (" << size << "bytes)!" << std::endl; + m_client.send(data, size); + m_client.disconnect(); + } + + private: + const std::string m_name; + IWebsocketClient& m_client; +}; + +TEST_SUITE("Nominal") +{ + TEST_CASE("Pool") + { + std::vector client_threads; + + WebsocketFactory::setClientPoolCount(4u); + WebsocketFactory::startClientPools(); + + for (size_t i = 0; i < 200u; i++) + { + client_threads.push_back(new std::thread( + [i] + { + std::string name = "client_" + std::to_string(i); + IWebsocketClient* client = WebsocketFactory::newClientFromPool(); + WsListener client_listener(name, *client); + client->registerListener(client_listener); + + IWebsocketClient::Credentials credentials; + credentials.allow_selfsigned_certificates = true; + credentials.allow_expired_certificates = true; + credentials.accept_untrusted_certificates = true; + credentials.skip_server_name_check = true; + client->connect("wss://127.0.0.1:8080/openocpp/"s + name, "ocpp1.6"s, credentials); + + std::this_thread::sleep_for(30s); + + delete client; + })); + } + + for (auto& thread : client_threads) + { + thread->join(); + delete thread; + } + + WebsocketFactory::stopClientPools(); + } +}