Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 71 additions & 3 deletions src/websockets/libwebsockets/LibWebsocketClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,18 @@ LibWebsocketClient::LibWebsocketClient()
m_wsi(nullptr),
m_retry_policy(),
m_retry_count(0),
m_send_msgs()
m_send_msgs(),
m_fragmented_frame(nullptr),
m_fragmented_frame_size(0),
m_fragmented_frame_index(0)
{
}
/** @brief Destructor */
LibWebsocketClient::~LibWebsocketClient()
{
// To prevent keeping an open connection in background
disconnect();
releaseFragmentedFrame();
}

/** @copydoc bool IWebsocketClient::connect(const std::string&, const std::string&, const Credentials&,
Expand Down Expand Up @@ -264,6 +268,38 @@ void LibWebsocketClient::process()
lws_context_destroy(context);
}

/** @brief Prepare the buffer to store a new fragmented frame */
void LibWebsocketClient::beginFragmentedFrame(size_t frame_size)
{
// Release previously allocated data
releaseFragmentedFrame();

// Allocate new buffer
m_fragmented_frame = new uint8_t[frame_size];
m_fragmented_frame_size = frame_size;
}

/** @brief Append data to the fragmented frame */
void LibWebsocketClient::appendFragmentedData(const void* data, size_t size)
{
size_t copy_len = size;
if ((m_fragmented_frame_index + size) >= m_fragmented_frame_size)
{
copy_len = m_fragmented_frame_size - m_fragmented_frame_index;
}
memcpy(&m_fragmented_frame[m_fragmented_frame_index], data, copy_len);
m_fragmented_frame_index += copy_len;
}

/** @brief Release the memory associated with the fragmented frame */
void LibWebsocketClient::releaseFragmentedFrame()
{
delete[] m_fragmented_frame;
m_fragmented_frame = nullptr;
m_fragmented_frame_size = 0;
m_fragmented_frame_index = 0;
}

/** @brief libwebsockets connection callback */
void LibWebsocketClient::connectCallback(struct lws_sorted_usec_list* sul) noexcept
{
Expand Down Expand Up @@ -380,8 +416,40 @@ int LibWebsocketClient::eventCallback(struct lws* wsi, enum lws_callback_reasons
break;

case LWS_CALLBACK_CLIENT_RECEIVE:
client->m_listener->wsClientDataReceived(in, len);
break;
{
if (client->m_listener)
{
// Get frame info
bool is_first = (lws_is_first_fragment(wsi) == 1);
bool is_last = (lws_is_final_fragment(wsi) == 1);
size_t remaining_length = lws_remaining_packet_payload(wsi);
if (is_first && is_last)
{
// Notify client
client->m_listener->wsClientDataReceived(in, len);
}
else if (is_first)
{
// Prepare frame bufferization
client->beginFragmentedFrame(len + remaining_length);
client->appendFragmentedData(in, len);
}
else
{
// Bufferize data
client->appendFragmentedData(in, len);
if (is_last)
{
// Notify client
client->m_listener->wsClientDataReceived(client->m_fragmented_frame, client->m_fragmented_frame_size);

// Release resources
client->releaseFragmentedFrame();
}
}
}
}
break;

case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
{
Expand Down
14 changes: 14 additions & 0 deletions src/websockets/libwebsockets/LibWebsocketClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,23 @@ class LibWebsocketClient : public IWebsocketClient
/** @brief Queue of messages to send */
ocpp::helpers::Queue<SendMsg*> m_send_msgs;

/** @brief Buffer to store fragmented frames */
uint8_t* m_fragmented_frame;
/** @brief Size of the fragmented frame */
size_t m_fragmented_frame_size;
/** @brief Current index in the fragmented frame */
size_t m_fragmented_frame_index;

/** @brief Internal thread */
void process();

/** @brief Prepare the buffer to store a new fragmented frame */
void beginFragmentedFrame(size_t frame_size);
/** @brief Append data to the fragmented frame */
void appendFragmentedData(const void* data, size_t size);
/** @brief Release the memory associated with the fragmented frame */
void releaseFragmentedFrame();

/** @brief libwebsockets connection callback */
static void connectCallback(struct lws_sorted_usec_list* sul) noexcept;
/** @brief libwebsockets event callback */
Expand Down
73 changes: 69 additions & 4 deletions src/websockets/libwebsockets/LibWebsocketServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,36 @@ int LibWebsocketServer::eventCallback(struct lws* wsi, enum lws_callback_reasons
if (iter_client != server->m_clients.end())
{
Client* client = dynamic_cast<Client*>(iter_client->second.get());

// Notify client
if (client->m_listener)
{
client->m_listener->wsClientDataReceived(in, len);
// Get frame info
bool is_first = (lws_is_first_fragment(wsi) == 1);
bool is_last = (lws_is_final_fragment(wsi) == 1);
size_t remaining_length = lws_remaining_packet_payload(wsi);
if (is_first && is_last)
{
// Notify client
client->m_listener->wsClientDataReceived(in, len);
}
else if (is_first)
{
// Prepare frame bufferization
client->beginFragmentedFrame(len + remaining_length);
client->appendFragmentedData(in, len);
}
else
{
// Bufferize data
client->appendFragmentedData(in, len);
if (is_last)
{
// Notify client
client->m_listener->wsClientDataReceived(client->getFragmentedFrame(), client->getFragmentedFrameSize());

// Release resources
client->releaseFragmentedFrame();
}
}
}
}
}
Expand All @@ -530,13 +555,21 @@ int LibWebsocketServer::eventCallback(struct lws* wsi, enum lws_callback_reasons

/** @brief Constructor */
LibWebsocketServer::Client::Client(struct lws* wsi, const char* ip_address)
: m_wsi(wsi), m_ip_address(ip_address), m_connected(true), m_listener(nullptr), m_send_msgs()
: m_wsi(wsi),
m_ip_address(ip_address),
m_connected(true),
m_listener(nullptr),
m_send_msgs(),
m_fragmented_frame(nullptr),
m_fragmented_frame_size(0),
m_fragmented_frame_index(0)
{
}
/** @brief Destructor */
LibWebsocketServer::Client::~Client()
{
disconnect(true);
releaseFragmentedFrame();
}

/** @copydoc const std::string& IClient::ipAddress(bool) const */
Expand Down Expand Up @@ -606,5 +639,37 @@ void LibWebsocketServer::Client::registerListener(IClient::IListener& listener)
m_listener = &listener;
}

/** @brief Prepare the buffer to store a new fragmented frame */
void LibWebsocketServer::Client::beginFragmentedFrame(size_t frame_size)
{
// Release previously allocated data
releaseFragmentedFrame();

// Allocate new buffer
m_fragmented_frame = new uint8_t[frame_size];
m_fragmented_frame_size = frame_size;
}

/** @brief Append data to the fragmented frame */
void LibWebsocketServer::Client::appendFragmentedData(const void* data, size_t size)
{
size_t copy_len = size;
if ((m_fragmented_frame_index + size) >= m_fragmented_frame_size)
{
copy_len = m_fragmented_frame_size - m_fragmented_frame_index;
}
memcpy(&m_fragmented_frame[m_fragmented_frame_index], data, copy_len);
m_fragmented_frame_index += copy_len;
}

/** @brief Release the memory associated with the fragmented frame */
void LibWebsocketServer::Client::releaseFragmentedFrame()
{
delete[] m_fragmented_frame;
m_fragmented_frame = nullptr;
m_fragmented_frame_size = 0;
m_fragmented_frame_index = 0;
}

} // namespace websockets
} // namespace ocpp
34 changes: 34 additions & 0 deletions src/websockets/libwebsockets/LibWebsocketServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,34 @@ class LibWebsocketServer : public IWebsocketServer
/** @copydoc bool IClient::registerListener(IListener&) */
void registerListener(IClient::IListener& listener) override;

/**
* @brief Get the size in bytes of the fragmented frame
* @return Size in bytes of the fragmented frame
*/
size_t getFragmentedFrameSize() const { return m_fragmented_frame_size; }

/**
* @brief Get the fragmented frame
* @return Fragmented frame
*/
const void* getFragmentedFrame() const { return m_fragmented_frame; }

/**
* @brief Prepare the buffer to store a new fragmented frame
* @param frame_size Size of the fragmented frame in bytes
*/
void beginFragmentedFrame(size_t frame_size);

/**
* @brief Append data to the fragmented frame
* @param data Data to append
* @param size Size of the data in bytes
*/
void appendFragmentedData(const void* data, size_t size);

/** @brief Release the memory associated with the fragmented frame */
void releaseFragmentedFrame();

private:
/** @brief Client socket */
struct lws* m_wsi;
Expand All @@ -121,6 +149,12 @@ class LibWebsocketServer : public IWebsocketServer
IClient::IListener* m_listener;
/** @brief Queue of messages to send */
ocpp::helpers::Queue<SendMsg*> m_send_msgs;
/** @brief Buffer to store fragmented frames */
uint8_t* m_fragmented_frame;
/** @brief Size of the fragmented frame */
size_t m_fragmented_frame_size;
/** @brief Current index in the fragmented frame */
size_t m_fragmented_frame_index;
};

/** @brief Listener */
Expand Down