From 5a48d6bcef56a846cfbf0cbd95dae3f946eece48 Mon Sep 17 00:00:00 2001 From: c-jimenez <18682655+c-jimenez@users.noreply.github.com> Date: Thu, 26 Jan 2023 14:27:14 +0100 Subject: [PATCH] =?UTF-8?q?[websocket]=C2=A0Handle=20fragmented=20websocke?= =?UTF-8?q?t=20frames?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../libwebsockets/LibWebsocketClient.cpp | 74 ++++++++++++++++++- .../libwebsockets/LibWebsocketClient.h | 14 ++++ .../libwebsockets/LibWebsocketServer.cpp | 73 +++++++++++++++++- .../libwebsockets/LibWebsocketServer.h | 34 +++++++++ 4 files changed, 188 insertions(+), 7 deletions(-) diff --git a/src/websockets/libwebsockets/LibWebsocketClient.cpp b/src/websockets/libwebsockets/LibWebsocketClient.cpp index 091561ef..7a43403e 100644 --- a/src/websockets/libwebsockets/LibWebsocketClient.cpp +++ b/src/websockets/libwebsockets/LibWebsocketClient.cpp @@ -49,7 +49,10 @@ LibWebsocketClient::LibWebsocketClient() m_wsi(nullptr), m_retry_policy(), m_retry_count(0), - m_send_msgs() + m_send_msgs(), + m_fragmented_frame(nullptr), + m_fragmented_frame_size(0), + m_fragmented_frame_index(0) { } /** @brief Destructor */ @@ -57,6 +60,7 @@ LibWebsocketClient::~LibWebsocketClient() { // To prevent keeping an open connection in background disconnect(); + releaseFragmentedFrame(); } /** @copydoc bool IWebsocketClient::connect(const std::string&, const std::string&, const Credentials&, @@ -264,6 +268,38 @@ void LibWebsocketClient::process() lws_context_destroy(context); } +/** @brief Prepare the buffer to store a new fragmented frame */ +void LibWebsocketClient::beginFragmentedFrame(size_t frame_size) +{ + // Release previously allocated data + releaseFragmentedFrame(); + + // Allocate new buffer + m_fragmented_frame = new uint8_t[frame_size]; + m_fragmented_frame_size = frame_size; +} + +/** @brief Append data to the fragmented frame */ +void LibWebsocketClient::appendFragmentedData(const void* data, size_t size) +{ + size_t copy_len = size; + if ((m_fragmented_frame_index + size) >= m_fragmented_frame_size) + { + copy_len = m_fragmented_frame_size - m_fragmented_frame_index; + } + memcpy(&m_fragmented_frame[m_fragmented_frame_index], data, copy_len); + m_fragmented_frame_index += copy_len; +} + +/** @brief Release the memory associated with the fragmented frame */ +void LibWebsocketClient::releaseFragmentedFrame() +{ + delete[] m_fragmented_frame; + m_fragmented_frame = nullptr; + m_fragmented_frame_size = 0; + m_fragmented_frame_index = 0; +} + /** @brief libwebsockets connection callback */ void LibWebsocketClient::connectCallback(struct lws_sorted_usec_list* sul) noexcept { @@ -380,8 +416,40 @@ int LibWebsocketClient::eventCallback(struct lws* wsi, enum lws_callback_reasons break; case LWS_CALLBACK_CLIENT_RECEIVE: - client->m_listener->wsClientDataReceived(in, len); - break; + { + if (client->m_listener) + { + // Get frame info + bool is_first = (lws_is_first_fragment(wsi) == 1); + bool is_last = (lws_is_final_fragment(wsi) == 1); + size_t remaining_length = lws_remaining_packet_payload(wsi); + if (is_first && is_last) + { + // Notify client + client->m_listener->wsClientDataReceived(in, len); + } + else if (is_first) + { + // Prepare frame bufferization + client->beginFragmentedFrame(len + remaining_length); + client->appendFragmentedData(in, len); + } + else + { + // Bufferize data + client->appendFragmentedData(in, len); + if (is_last) + { + // Notify client + client->m_listener->wsClientDataReceived(client->m_fragmented_frame, client->m_fragmented_frame_size); + + // Release resources + client->releaseFragmentedFrame(); + } + } + } + } + break; case LWS_CALLBACK_EVENT_WAIT_CANCELLED: { diff --git a/src/websockets/libwebsockets/LibWebsocketClient.h b/src/websockets/libwebsockets/LibWebsocketClient.h index f413720f..5b8b9c3e 100644 --- a/src/websockets/libwebsockets/LibWebsocketClient.h +++ b/src/websockets/libwebsockets/LibWebsocketClient.h @@ -121,9 +121,23 @@ class LibWebsocketClient : public IWebsocketClient /** @brief Queue of messages to send */ ocpp::helpers::Queue m_send_msgs; + /** @brief Buffer to store fragmented frames */ + uint8_t* m_fragmented_frame; + /** @brief Size of the fragmented frame */ + size_t m_fragmented_frame_size; + /** @brief Current index in the fragmented frame */ + size_t m_fragmented_frame_index; + /** @brief Internal thread */ void process(); + /** @brief Prepare the buffer to store a new fragmented frame */ + void beginFragmentedFrame(size_t frame_size); + /** @brief Append data to the fragmented frame */ + void appendFragmentedData(const void* data, size_t size); + /** @brief Release the memory associated with the fragmented frame */ + void releaseFragmentedFrame(); + /** @brief libwebsockets connection callback */ static void connectCallback(struct lws_sorted_usec_list* sul) noexcept; /** @brief libwebsockets event callback */ diff --git a/src/websockets/libwebsockets/LibWebsocketServer.cpp b/src/websockets/libwebsockets/LibWebsocketServer.cpp index 23d98bdb..ed4e5b4e 100644 --- a/src/websockets/libwebsockets/LibWebsocketServer.cpp +++ b/src/websockets/libwebsockets/LibWebsocketServer.cpp @@ -511,11 +511,36 @@ int LibWebsocketServer::eventCallback(struct lws* wsi, enum lws_callback_reasons if (iter_client != server->m_clients.end()) { Client* client = dynamic_cast(iter_client->second.get()); - - // Notify client if (client->m_listener) { - client->m_listener->wsClientDataReceived(in, len); + // Get frame info + bool is_first = (lws_is_first_fragment(wsi) == 1); + bool is_last = (lws_is_final_fragment(wsi) == 1); + size_t remaining_length = lws_remaining_packet_payload(wsi); + if (is_first && is_last) + { + // Notify client + client->m_listener->wsClientDataReceived(in, len); + } + else if (is_first) + { + // Prepare frame bufferization + client->beginFragmentedFrame(len + remaining_length); + client->appendFragmentedData(in, len); + } + else + { + // Bufferize data + client->appendFragmentedData(in, len); + if (is_last) + { + // Notify client + client->m_listener->wsClientDataReceived(client->getFragmentedFrame(), client->getFragmentedFrameSize()); + + // Release resources + client->releaseFragmentedFrame(); + } + } } } } @@ -530,13 +555,21 @@ int LibWebsocketServer::eventCallback(struct lws* wsi, enum lws_callback_reasons /** @brief Constructor */ LibWebsocketServer::Client::Client(struct lws* wsi, const char* ip_address) - : m_wsi(wsi), m_ip_address(ip_address), m_connected(true), m_listener(nullptr), m_send_msgs() + : m_wsi(wsi), + m_ip_address(ip_address), + m_connected(true), + m_listener(nullptr), + m_send_msgs(), + m_fragmented_frame(nullptr), + m_fragmented_frame_size(0), + m_fragmented_frame_index(0) { } /** @brief Destructor */ LibWebsocketServer::Client::~Client() { disconnect(true); + releaseFragmentedFrame(); } /** @copydoc const std::string& IClient::ipAddress(bool) const */ @@ -606,5 +639,37 @@ void LibWebsocketServer::Client::registerListener(IClient::IListener& listener) m_listener = &listener; } +/** @brief Prepare the buffer to store a new fragmented frame */ +void LibWebsocketServer::Client::beginFragmentedFrame(size_t frame_size) +{ + // Release previously allocated data + releaseFragmentedFrame(); + + // Allocate new buffer + m_fragmented_frame = new uint8_t[frame_size]; + m_fragmented_frame_size = frame_size; +} + +/** @brief Append data to the fragmented frame */ +void LibWebsocketServer::Client::appendFragmentedData(const void* data, size_t size) +{ + size_t copy_len = size; + if ((m_fragmented_frame_index + size) >= m_fragmented_frame_size) + { + copy_len = m_fragmented_frame_size - m_fragmented_frame_index; + } + memcpy(&m_fragmented_frame[m_fragmented_frame_index], data, copy_len); + m_fragmented_frame_index += copy_len; +} + +/** @brief Release the memory associated with the fragmented frame */ +void LibWebsocketServer::Client::releaseFragmentedFrame() +{ + delete[] m_fragmented_frame; + m_fragmented_frame = nullptr; + m_fragmented_frame_size = 0; + m_fragmented_frame_index = 0; +} + } // namespace websockets } // namespace ocpp diff --git a/src/websockets/libwebsockets/LibWebsocketServer.h b/src/websockets/libwebsockets/LibWebsocketServer.h index 4aeb3bab..8d3defeb 100644 --- a/src/websockets/libwebsockets/LibWebsocketServer.h +++ b/src/websockets/libwebsockets/LibWebsocketServer.h @@ -110,6 +110,34 @@ class LibWebsocketServer : public IWebsocketServer /** @copydoc bool IClient::registerListener(IListener&) */ void registerListener(IClient::IListener& listener) override; + /** + * @brief Get the size in bytes of the fragmented frame + * @return Size in bytes of the fragmented frame + */ + size_t getFragmentedFrameSize() const { return m_fragmented_frame_size; } + + /** + * @brief Get the fragmented frame + * @return Fragmented frame + */ + const void* getFragmentedFrame() const { return m_fragmented_frame; } + + /** + * @brief Prepare the buffer to store a new fragmented frame + * @param frame_size Size of the fragmented frame in bytes + */ + void beginFragmentedFrame(size_t frame_size); + + /** + * @brief Append data to the fragmented frame + * @param data Data to append + * @param size Size of the data in bytes + */ + void appendFragmentedData(const void* data, size_t size); + + /** @brief Release the memory associated with the fragmented frame */ + void releaseFragmentedFrame(); + private: /** @brief Client socket */ struct lws* m_wsi; @@ -121,6 +149,12 @@ class LibWebsocketServer : public IWebsocketServer IClient::IListener* m_listener; /** @brief Queue of messages to send */ ocpp::helpers::Queue m_send_msgs; + /** @brief Buffer to store fragmented frames */ + uint8_t* m_fragmented_frame; + /** @brief Size of the fragmented frame */ + size_t m_fragmented_frame_size; + /** @brief Current index in the fragmented frame */ + size_t m_fragmented_frame_index; }; /** @brief Listener */