diff --git a/libsrc/flatbufserver/FlatBufferClient.cpp b/libsrc/flatbufserver/FlatBufferClient.cpp index 5ca102a72..8c50db9af 100644 --- a/libsrc/flatbufserver/FlatBufferClient.cpp +++ b/libsrc/flatbufserver/FlatBufferClient.cpp @@ -1,5 +1,6 @@ #include "FlatBufferClient.h" #include +#include // qt #include @@ -28,7 +29,7 @@ FlatBufferClient::FlatBufferClient(QTcpSocket* socket, int timeout, QObject *par , _processingMessage(false) { _imageResampler.setPixelDecimation(1); - + // timer setup _timeoutTimer.reset(new QTimer(this)); _timeoutTimer->setSingleShot(true); @@ -57,59 +58,71 @@ void FlatBufferClient::readyRead() } } -void FlatBufferClient::processNextMessage() +bool FlatBufferClient::processNextMessageInline() { - if (_processingMessage) { return; } // Avoid re-entrancy + if (_processingMessage) { return false; } // Avoid re-entrancy // Wait for at least 4 bytes to read the message size if (_receiveBuffer.size() < 4) { - return; + return false; } _processingMessage = true; - uint32_t messageSize; - memcpy(&messageSize, _receiveBuffer.constData(), sizeof(uint32_t)); - messageSize = qFromBigEndian(messageSize); + // Directly read message size (no memcpy) + const uint8_t* raw = reinterpret_cast(_receiveBuffer.constData()); + uint32_t messageSize = (raw[0] << 24) | (raw[1] << 16) | (raw[2] << 8) | raw[3]; // Validate message size if (messageSize == 0 || messageSize > FLATBUFFER_MAX_MSG_LENGTH) { Warning(_log, "Invalid message size: %d - dropping received data", messageSize); - _receiveBuffer.clear(); _processingMessage = false; - return; + return true; } // Wait for full message if (_receiveBuffer.size() < static_cast(messageSize + 4)) { _processingMessage = false; - return; + return false; } - // Extract the message and remove it from the buffer - _lastMessage = _receiveBuffer.mid(4, messageSize); - _receiveBuffer.remove(0, messageSize + 4); - - const uint8_t* msgData = reinterpret_cast(_lastMessage.constData()); + // Extract the message and remove it from the buffer (no copying) + const uint8_t* msgData = reinterpret_cast(_receiveBuffer.constData() + 4); flatbuffers::Verifier verifier(msgData, messageSize); if (!hyperionnet::VerifyRequestBuffer(verifier)) { Error(_log, "Invalid FlatBuffer message received"); sendErrorReply("Invalid FlatBuffer message received"); _processingMessage = false; - QMetaObject::invokeMethod(this, &FlatBufferClient::processNextMessage, Qt::QueuedConnection); - return; + + // Clear the buffer in case of an invalid message + _receiveBuffer.clear(); + return true; } // Invoke message handling - QMetaObject::invokeMethod(this, [this]() { - const auto* msgData = reinterpret_cast(_lastMessage.constData()); + QMetaObject::invokeMethod(this, [this, msgData, messageSize]() { handleMessage(hyperionnet::GetRequest(msgData)); _processingMessage = false; - QMetaObject::invokeMethod(this, &FlatBufferClient::processNextMessage, Qt::QueuedConnection); + + // Remove the processed message from the buffer (header + body) + _receiveBuffer.remove(0, messageSize + 4); // Clear the processed message + header + + // Continue processing the next message + processNextMessage(); }); + + return true; +} + +void FlatBufferClient::processNextMessage() +{ + // Run the message processing inline until the buffer is empty or we can't process further + while (processNextMessageInline()) { + // Keep processing as long as we can + } } void FlatBufferClient::noDataReceived() @@ -201,55 +214,80 @@ void FlatBufferClient::handleRegisterCommand(const hyperionnet::Register *regReq void FlatBufferClient::handleImageCommand(const hyperionnet::Image *image) { - Image imageRGB; - // extract parameters int const duration = image->duration(); + if (image->data_as_RawImage() != nullptr) { const auto* img = static_cast(image->data_as_RawImage()); - hyperionnet::RawImageT rawImageNative; - img->UnPackTo(&rawImageNative); - - const int width = rawImageNative.width; - const int height = rawImageNative.height; + // Read image properties directly from FlatBuffer + const int width = img->width(); + const int height = img->height(); + const auto* data = img->data(); - if (width <= 0 || height <= 0 || rawImageNative.data.empty()) + if (width <= 0 || height <= 0 || data == nullptr || data->size() == 0) { sendErrorReply("Invalid width and/or height or no raw image data provided"); return; } - // check consistency of the size of the received data - int const bytesPerPixel = rawImageNative.data.size() / (width * height); + // Check consistency of image data size + const int dataSize = data->size(); + const int bytesPerPixel = dataSize / (width * height); if (bytesPerPixel != 3 && bytesPerPixel != 4) { sendErrorReply("Size of image data does not match with the width and height"); return; } - imageRGB.resize(width, height); - processRawImage(rawImageNative, bytesPerPixel, _imageResampler, imageRGB); + // Only resize if needed (reuse memory) + if (_imageOutputBuffer.width() != width || _imageOutputBuffer.height() != height) + { + _imageOutputBuffer.resize(width, height); + } + + processRawImage(data->data(), width, height, bytesPerPixel, _imageResampler, _imageOutputBuffer); } else if (image->data_as_NV12Image() != nullptr) { const auto* img = static_cast(image->data_as_NV12Image()); - hyperionnet::NV12ImageT nv12ImageNative; - img->UnPackTo(&nv12ImageNative); - - const int width = nv12ImageNative.width; - const int height = nv12ImageNative.height; - - if (width <= 0 || height <= 0 || nv12ImageNative.data_y.empty() || nv12ImageNative.data_uv.empty()) - { - sendErrorReply("Invalid width and/or height or no complete NV12 image data provided"); - return; - } - - imageRGB.resize(width, height); - processNV12Image(nv12ImageNative, _imageResampler, imageRGB); + const int width = img->width(); + const int height = img->height(); + const auto* data_y = img->data_y(); + const auto* data_uv = img->data_uv(); + + if (width <= 0 || height <= 0 || data_y == nullptr || data_uv == nullptr || + data_y->size() == 0 || data_uv->size() == 0) + { + sendErrorReply("Invalid width and/or height or no complete NV12 image data provided"); + return; + } + + // Combine Y and UV into one contiguous buffer (reuse class member buffer) + const size_t y_size = data_y->size(); + const size_t uv_size = data_uv->size(); + + size_t required_size = y_size + uv_size; + if (_combinedNv12Buffer.capacity() < required_size) + { + _combinedNv12Buffer.reserve(required_size); + } + std::memcpy(_combinedNv12Buffer.data(), data_y->data(), y_size); + std::memcpy(_combinedNv12Buffer.data() + y_size, data_uv->data(), uv_size); + + // Determine stride for Y + const int stride_y = img->stride_y() > 0 ? img->stride_y() : width; + + // Resize only when needed + if (_imageOutputBuffer.width() != width || _imageOutputBuffer.height() != height) + { + _imageOutputBuffer.resize(width, height); + } + + // Process image + processNV12Image(_combinedNv12Buffer.data(), width, height, stride_y, _imageResampler, _imageOutputBuffer); } else { @@ -257,8 +295,8 @@ void FlatBufferClient::handleImageCommand(const hyperionnet::Image *image) return; } - emit setGlobalInputImage(_priority, imageRGB, duration); - emit setBufferImage("FlatBuffer", imageRGB); + emit setGlobalInputImage(_priority, _imageOutputBuffer, duration); + emit setBufferImage("FlatBuffer", _imageOutputBuffer); // send reply sendSuccessReply(); @@ -319,49 +357,41 @@ void FlatBufferClient::sendErrorReply(const QString& error) sendMessage(_builder.GetBufferPointer(), _builder.GetSize()); } -inline void FlatBufferClient::processRawImage(const hyperionnet::RawImageT& raw_image, int bytesPerPixel, ImageResampler& resampler, Image& outputImage) { - - int const width = raw_image.width; - int const height = raw_image.height; - +inline void FlatBufferClient::processRawImage(const uint8_t* buffer, + int width, + int height, + int bytesPerPixel, + ImageResampler& resampler, + Image& outputImage) +{ int const lineLength = width * bytesPerPixel; PixelFormat const pixelFormat = (bytesPerPixel == 4) ? PixelFormat::RGB32 : PixelFormat::RGB24; - // Process the image resampler.processImage( - raw_image.data.data(), // Raw RGB/RGBA buffer - width, // Image width - height, // Image height - lineLength, // Line length - pixelFormat, // Pixel format (RGB24/RGB32) - outputImage // Output image - ); + buffer, // Raw buffer + width, + height, + lineLength, + pixelFormat, + outputImage + ); } -inline void FlatBufferClient::processNV12Image(const hyperionnet::NV12ImageT& nv12_image, ImageResampler& resampler, Image& outputImage) { - // Combine data_y and data_uv into a single buffer - int const width = nv12_image.width; - int const height = nv12_image.height; - - size_t const y_size = nv12_image.data_y.size(); - size_t const uv_size = nv12_image.data_uv.size(); - std::vector combined_buffer(y_size + uv_size); - - std::memcpy(combined_buffer.data(), nv12_image.data_y.data(), y_size); - std::memcpy(combined_buffer.data() + y_size, nv12_image.data_uv.data(), uv_size); - - // Determine line length (stride_y) - int const lineLength = nv12_image.stride_y > 0 ? nv12_image.stride_y : width; - - PixelFormat const pixelFormat = PixelFormat::NV12; +inline void FlatBufferClient::processNV12Image(const uint8_t* nv12_data, + int width, + int height, + int stride_y, + ImageResampler& resampler, + Image& outputImage) +{ + PixelFormat pixelFormat = PixelFormat::NV12; - // Process the image resampler.processImage( - combined_buffer.data(), // Combined NV12 buffer - width, // Image width - height, // Image height - lineLength, // Line length for Y plane - pixelFormat, // Pixel format (NV12) - outputImage // Output image - ); + nv12_data, // Combined NV12 buffer + width, + height, + stride_y, + pixelFormat, + outputImage + ); } diff --git a/libsrc/flatbufserver/FlatBufferClient.h b/libsrc/flatbufserver/FlatBufferClient.h index d6064349e..c4271459c 100644 --- a/libsrc/flatbufserver/FlatBufferClient.h +++ b/libsrc/flatbufserver/FlatBufferClient.h @@ -81,7 +81,6 @@ private slots: /// @brief Is called whenever the socket got new data to read /// void readyRead(); - void processNextMessage(); /// /// @brief Is called when the socket closed the connection, also requests thread exit @@ -123,6 +122,9 @@ private slots: /// void handleNotImplemented(); + void processNextMessage(); + bool processNextMessageInline(); + /// /// Send a message to the connected client /// @param data to be send @@ -142,8 +144,8 @@ private slots: /// void sendErrorReply(const QString& error); - void processRawImage(const hyperionnet::RawImageT& raw_image, int bytesPerPixel, ImageResampler& resampler, Image& outputImage); - void processNV12Image(const hyperionnet::NV12ImageT& nv12_image, ImageResampler& resampler, Image& outputImage); + void processRawImage(const uint8_t* buffer, int width, int height, int bytesPerPixel, ImageResampler& resampler, Image& outputImage); + void processNV12Image(const uint8_t* nv12_data, int width, int height, int stride_y, ImageResampler& resampler, Image& outputImage); private: Logger * _log; @@ -156,12 +158,13 @@ private slots: QByteArray _receiveBuffer; + Image _imageOutputBuffer; ImageResampler _imageResampler; + std::vector _combinedNv12Buffer; // Flatbuffers builder flatbuffers::FlatBufferBuilder _builder; bool _processingMessage; - QByteArray _lastMessage; }; #endif // FLATBUFFERCLIENT_H diff --git a/libsrc/flatbufserver/FlatBufferConnection.cpp b/libsrc/flatbufserver/FlatBufferConnection.cpp index fcba8ffaa..fb6d3ef62 100644 --- a/libsrc/flatbufserver/FlatBufferConnection.cpp +++ b/libsrc/flatbufserver/FlatBufferConnection.cpp @@ -30,16 +30,21 @@ FlatBufferConnection::FlatBufferConnection(const QString& origin, const QHostAdd // init connect connectToRemoteHost(); - // start the connection timer _timer.setInterval(5000); - connect(&_timer, &QTimer::timeout, this, &FlatBufferConnection::connectToRemoteHost); + + //Trigger the retry timer when connection dropped + connect(this, &FlatBufferConnection::isDisconnected, &_timer, static_cast(&QTimer::start)); _timer.start(); } FlatBufferConnection::~FlatBufferConnection() { _timer.stop(); + + //Stop retrying on disconnect + disconnect(this, &FlatBufferConnection::isDisconnected, &_timer, static_cast(&QTimer::start)); + Debug(_log, "Closing connection with host: %s, port [%u]", QSTRING_CSTR(_host.toString()), _port); _socket.close(); } @@ -58,7 +63,6 @@ void FlatBufferConnection::onDisconnected() _isRegistered = false, Info(_log, "Disconnected from target host: %s, port [%u]", QSTRING_CSTR(_host.toString()), _port); emit isDisconnected(); - _timer.start(); } @@ -225,8 +229,8 @@ bool FlatBufferConnection::parseReply(const hyperionnet::Reply *reply) } else { - _timer.stop(); _isRegistered = true; + _timer.stop(); Debug(_log,"Client \"%s\" registered successfully with target host: %s, port [%u]", QSTRING_CSTR(_origin), QSTRING_CSTR(_host.toString()), _port); emit isReadyToSend(); }