From 5910762719ac944e47e13441ddf5ce485f6670fb Mon Sep 17 00:00:00 2001 From: Lucio Rossi Date: Wed, 1 Oct 2025 15:32:56 +0200 Subject: [PATCH 1/3] impr: bridge, monitor thread protection of bool flags and initialization methods --- src/bridge.h | 48 ++++++++++++++++++++++++++++-------------------- src/monitor.h | 38 ++++++++++++++++++++++++++++---------- 2 files changed, 56 insertions(+), 30 deletions(-) diff --git a/src/bridge.h b/src/bridge.h index 0ae012f..4756e7c 100644 --- a/src/bridge.h +++ b/src/bridge.h @@ -106,6 +106,7 @@ class BridgeClass { struct k_mutex read_mutex{}; struct k_mutex write_mutex{}; + struct k_mutex bridge_mutex{}; k_tid_t upd_tid{}; k_thread_stack_t *upd_stack_area{}; @@ -119,17 +120,27 @@ class BridgeClass { serial_ptr = &serial; } - operator bool() const { - return started; + operator bool() { + return is_started(); + } + + bool is_started() { + k_mutex_lock(&bridge_mutex, K_FOREVER); + bool out = started; + k_mutex_unlock(&bridge_mutex); + return out; } // Initialize the bridge bool begin(unsigned long baud=DEFAULT_SERIAL_BAUD) { - serial_ptr->begin(baud); - transport = new SerialTransport(*serial_ptr); - k_mutex_init(&read_mutex); k_mutex_init(&write_mutex); + k_mutex_init(&bridge_mutex); + + if (is_started()) return true; + + serial_ptr->begin(baud); + transport = new SerialTransport(*serial_ptr); client = new RPCClient(*transport); server = new RPCServer(*transport); @@ -141,32 +152,29 @@ class BridgeClass { NULL, NULL, NULL, UPDATE_THREAD_PRIORITY, 0, K_NO_WAIT); - bool res; - call(RESET_METHOD).result(res); - if (res) { - started = true; - } + k_mutex_lock(&bridge_mutex, K_FOREVER); + bool res = false; + started = call(RESET_METHOD).result(res) && res; + k_mutex_unlock(&bridge_mutex); return res; } template bool provide(const MsgPack::str_t& name, F&& func) { + k_mutex_lock(&bridge_mutex, K_FOREVER); bool res; - if (!call(BIND_METHOD, name).result(res)) { - return false; - } - return server->bind(name, func); + bool out = call(BIND_METHOD, name).result(res) && res && server->bind(name, func); + k_mutex_unlock(&bridge_mutex); + return out; } template bool provide_safe(const MsgPack::str_t& name, F&& func) { + k_mutex_lock(&bridge_mutex, K_FOREVER); bool res; - if (!call(BIND_METHOD, name).result(res)) { - return false; - } - - return server->bind(name, func, "__safe__"); - + bool out = call(BIND_METHOD, name).result(res) && res && server->bind(name, func, "__safe__"); + k_mutex_unlock(&bridge_mutex); + return out; } void update() { diff --git a/src/monitor.h b/src/monitor.h index 0c7126c..68ca858 100644 --- a/src/monitor.h +++ b/src/monitor.h @@ -30,7 +30,7 @@ class BridgeMonitor: public Stream { BridgeClass* bridge; RingBufferN temp_buffer; struct k_mutex monitor_mutex{}; - bool is_connected = false; + bool _connected = false; public: explicit BridgeMonitor(BridgeClass& bridge): bridge(&bridge) {} @@ -40,15 +40,31 @@ class BridgeMonitor: public Stream { bool begin(unsigned long _legacy_baud=0, uint16_t _legacy_config=0) { k_mutex_init(&monitor_mutex); + if (is_connected()) return true; + bool bridge_started = (*bridge); if (!bridge_started) { bridge_started = bridge->begin(); } - return bridge_started && bridge->call(MON_CONNECTED_METHOD).result(is_connected); + + if (!bridge_started) return false; + + k_mutex_lock(&monitor_mutex, K_FOREVER); + bool out = false; + _connected = bridge->call(MON_CONNECTED_METHOD).result(out) && out; + k_mutex_unlock(&monitor_mutex); + return out; + } + + bool is_connected() { + k_mutex_lock(&monitor_mutex, K_FOREVER); + bool out = _connected; + k_mutex_unlock(&monitor_mutex); + return out; } - explicit operator bool() const { - return is_connected; + explicit operator bool() { + return is_connected(); } int read() override { @@ -109,11 +125,11 @@ class BridgeMonitor: public Stream { bool reset() { bool res; - bool ok = bridge->call(MON_RESET_METHOD).result(res); - if (ok && res) { - is_connected = false; - } - return (ok && res); + bool ok = bridge->call(MON_RESET_METHOD).result(res) && res; + k_mutex_lock(&monitor_mutex, K_FOREVER); + _connected = !ok; + k_mutex_unlock(&monitor_mutex); + return ok; } private: @@ -135,7 +151,9 @@ class BridgeMonitor: public Stream { } // if (async_rpc.error.code > NO_ERR) { - // is_connected = false; + // k_mutex_lock(&monitor_mutex, K_FOREVER); + // _connected = false; + // k_mutex_unlock(&monitor_mutex); // } } From 91d50b3be4f176f0b6d986bdf210d6966e44eab7 Mon Sep 17 00:00:00 2001 From: Lucio Rossi Date: Wed, 1 Oct 2025 18:03:12 +0200 Subject: [PATCH 2/3] impr: protecting tcp_server flags, initialization, write and getters fix: monitor.peek unsafe --- src/monitor.h | 3 +- src/tcp_server.h | 81 +++++++++++++++++++++++++++++------------------- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/src/monitor.h b/src/monitor.h index 68ca858..7c9d55b 100644 --- a/src/monitor.h +++ b/src/monitor.h @@ -95,8 +95,9 @@ class BridgeMonitor: public Stream { int peek() override { k_mutex_lock(&monitor_mutex, K_FOREVER); if (temp_buffer.available()) { + int c = temp_buffer.peek(); k_mutex_unlock(&monitor_mutex); - return temp_buffer.peek(); + return c; } k_mutex_unlock(&monitor_mutex); return -1; diff --git a/src/tcp_server.h b/src/tcp_server.h index f62e57d..ee630a9 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -50,39 +50,35 @@ class BridgeTCPServer final: public Server { } k_mutex_lock(&server_mutex, K_FOREVER); - - String hostname = _addr.toString(); - _listening = bridge->call(TCP_LISTEN_METHOD, hostname, _port).result(listener_id); - + if (!_listening){ + String hostname = _addr.toString(); + _listening = bridge->call(TCP_LISTEN_METHOD, hostname, _port).result(listener_id); + } k_mutex_unlock(&server_mutex); } BridgeTCPClient accept() { - if (!_listening) { + k_mutex_lock(&server_mutex, K_FOREVER); + + if (!_listening) { // Not listening -> return disconnected (invalid) client + k_mutex_unlock(&server_mutex); return BridgeTCPClient(*bridge, 0, false); } - if (_connected) { + if (_connected) { // Connection already established return a client copy + k_mutex_unlock(&server_mutex); return BridgeTCPClient(*bridge, connection_id); } - k_mutex_lock(&server_mutex, K_FOREVER); - + // Accept a connection const bool ret = bridge->call(TCP_ACCEPT_METHOD, listener_id).result(connection_id); + _connected = ret; k_mutex_unlock(&server_mutex); - - if (ret) { // connection_id 0 marks an invalid connection - _connected = true; - return BridgeTCPClient(*bridge, connection_id); - } else { - _connected = false; - // Return invalid client - return BridgeTCPClient(*bridge, 0, false); - } - + // If no connection established return a disconnected (invalid) client + return ret? BridgeTCPClient(*bridge, connection_id) : BridgeTCPClient(*bridge, 0, false); } size_t write(uint8_t c) override { @@ -92,24 +88,25 @@ class BridgeTCPServer final: public Server { size_t write(const uint8_t *buf, size_t size) override { BridgeTCPClient client = accept(); + if (!client) return 0; - if (client && _connected) { - return client.write(buf, size); + k_mutex_lock(&server_mutex, K_FOREVER); + if (_connected) { + size_t written = client.write(buf, size); + k_mutex_unlock(&server_mutex); + return written; } - + k_mutex_unlock(&server_mutex); return 0; } void close() { k_mutex_lock(&server_mutex, K_FOREVER); - String msg; - const bool ret = bridge->call(TCP_CLOSE_LISTENER_METHOD, listener_id).result(msg); - - if (ret) { - _listening = false; + if (_listening){ + _listening = !bridge->call(TCP_CLOSE_LISTENER_METHOD, listener_id).result(msg); + // Debug msg? } - k_mutex_unlock(&server_mutex); } @@ -120,16 +117,36 @@ class BridgeTCPServer final: public Server { k_mutex_unlock(&server_mutex); } - bool is_listening() const { - return _listening; + bool is_listening() { + k_mutex_lock(&server_mutex, K_FOREVER); + bool out = _listening; + k_mutex_unlock(&server_mutex); + return out; } - uint16_t getPort() const { - return _port; + bool is_connected() { + k_mutex_lock(&server_mutex, K_FOREVER); + bool out = _connected; + k_mutex_unlock(&server_mutex); + return out; + } + + uint16_t getPort() { + k_mutex_lock(&server_mutex, K_FOREVER); + uint16_t port = _port; + k_mutex_unlock(&server_mutex); + return port; + } + + String getAddr() { + k_mutex_lock(&server_mutex, K_FOREVER); + String hostname = _addr.toString(); + k_mutex_unlock(&server_mutex); + return hostname; } operator bool() const { - return _listening; + return is_listening(); } using Print::write; From 572fff5081a06f36d7896810fac54f36be20fac3 Mon Sep 17 00:00:00 2001 From: Lucio Rossi Date: Fri, 3 Oct 2025 10:07:21 +0200 Subject: [PATCH 3/3] impr: monitor checks if connected before _read impr: tcp_client protect flags and initialization methods minor readability/optimization --- src/monitor.h | 23 ++++++-------- src/tcp_client.h | 80 ++++++++++++++++++------------------------------ src/tcp_server.h | 7 ++--- 3 files changed, 42 insertions(+), 68 deletions(-) diff --git a/src/monitor.h b/src/monitor.h index 7c9d55b..02b9d16 100644 --- a/src/monitor.h +++ b/src/monitor.h @@ -94,13 +94,12 @@ class BridgeMonitor: public Stream { int peek() override { k_mutex_lock(&monitor_mutex, K_FOREVER); + int out = -1; if (temp_buffer.available()) { - int c = temp_buffer.peek(); - k_mutex_unlock(&monitor_mutex); - return c; + out = temp_buffer.peek(); } k_mutex_unlock(&monitor_mutex); - return -1; + return out; } size_t write(uint8_t c) override { @@ -117,11 +116,8 @@ class BridgeMonitor: public Stream { size_t written; const bool ret = bridge->call(MON_WRITE_METHOD, send_buffer).result(written); - if (ret) { - return written; - } - return 0; + return ret? written : 0; } bool reset() { @@ -138,24 +134,23 @@ class BridgeMonitor: public Stream { if (size == 0) return; + k_mutex_lock(&monitor_mutex, K_FOREVER); + MsgPack::arr_t message; RpcResult async_rpc = bridge->call(MON_READ_METHOD, size); - - const bool ret = async_rpc.result(message); + const bool ret = _connected && async_rpc.result(message); if (ret) { - k_mutex_lock(&monitor_mutex, K_FOREVER); for (size_t i = 0; i < message.size(); ++i) { temp_buffer.store_char(static_cast(message[i])); } - k_mutex_unlock(&monitor_mutex); } // if (async_rpc.error.code > NO_ERR) { - // k_mutex_lock(&monitor_mutex, K_FOREVER); // _connected = false; - // k_mutex_unlock(&monitor_mutex); // } + + k_mutex_unlock(&monitor_mutex); } }; diff --git a/src/tcp_client.h b/src/tcp_client.h index b1acbbc..5931827 100644 --- a/src/tcp_client.h +++ b/src/tcp_client.h @@ -55,73 +55,55 @@ class BridgeTCPClient : public Client { int connect(const char *host, uint16_t port) override { - if (_connected) return 0; - - String hostname = host; - k_mutex_lock(&client_mutex, K_FOREVER); - const bool resp = bridge->call(TCP_CONNECT_METHOD, hostname, port).result(connection_id); - - if (!resp) { - _connected = false; - k_mutex_unlock(&client_mutex); - return -1; - } - _connected = true; + String hostname = host; + const bool ok = _connected || bridge->call(TCP_CONNECT_METHOD, hostname, port).result(connection_id); + _connected = ok; k_mutex_unlock(&client_mutex); - return 0; + return ok? 0 : -1; } int connectSSL(const char *host, uint16_t port, const char *ca_cert) { - if (_connected) return 0; + k_mutex_lock(&client_mutex, K_FOREVER); String hostname = host; String ca_cert_str = ca_cert; - k_mutex_lock(&client_mutex, K_FOREVER); - - const bool resp = bridge->call(TCP_CONNECT_SSL_METHOD, hostname, port, ca_cert_str).result(connection_id); - - if (!resp) { - _connected = false; - k_mutex_unlock(&client_mutex); - return -1; - } - _connected = true; - + const bool ok = _connected || bridge->call(TCP_CONNECT_SSL_METHOD, hostname, port, ca_cert_str).result(connection_id); + _connected = ok; k_mutex_unlock(&client_mutex); - return 0; + + return ok? 0 : -1; } - uint32_t getId() const { - return connection_id; + uint32_t getId() { + k_mutex_lock(&client_mutex, K_FOREVER); + const uint32_t out = connection_id; + k_mutex_unlock(&client_mutex); + return out; } size_t write(uint8_t c) override { return write(&c, 1); } - size_t write(const uint8_t *buf, size_t size) override { + size_t write(const uint8_t *buffer, size_t size) override { - if (!_connected) return 0; + if (!connected()) return 0; MsgPack::arr_t payload; for (size_t i = 0; i < size; ++i) { - payload.push_back(buf[i]); + payload.push_back(buffer[i]); } size_t written; - const bool ret = bridge->call(TCP_WRITE_METHOD, connection_id, payload).result(written); - if (ret) { - return written; - } - - return 0; + const bool ok = bridge->call(TCP_WRITE_METHOD, connection_id, payload).result(written); + return ok? written : 0; } int available() override { @@ -151,12 +133,12 @@ class BridgeTCPClient : public Client { int peek() override { k_mutex_lock(&client_mutex, K_FOREVER); + int out = -1; if (temp_buffer.available()) { - k_mutex_unlock(&client_mutex); - return temp_buffer.peek(); + out = temp_buffer.peek(); } k_mutex_unlock(&client_mutex); - return -1; + return out; } void flush() override { @@ -170,37 +152,35 @@ class BridgeTCPClient : public Client { void stop() override { k_mutex_lock(&client_mutex, K_FOREVER); String msg; - const bool resp = bridge->call(TCP_CLOSE_METHOD, connection_id).result(msg); - if (resp) { - _connected = false; + if (_connected) { + _connected = !bridge->call(TCP_CLOSE_METHOD, connection_id).result(msg); } k_mutex_unlock(&client_mutex); } uint8_t connected() override { - if (_connected) return 1; - return 0; + k_mutex_lock(&client_mutex, K_FOREVER); + const uint8_t out = _connected? 1 : 0; + k_mutex_unlock(&client_mutex); + return out; } operator bool() override { return available() || connected(); } - //friend class BridgeTCPServer; - using Print::write; private: void _read(size_t size) { - if (size == 0 || !_connected) return; + if (size == 0) return; k_mutex_lock(&client_mutex, K_FOREVER); MsgPack::arr_t message; RpcResult async_rpc = bridge->call(TCP_READ_METHOD, connection_id, size); - - const bool ret = async_rpc.result(message); + const bool ret = _connected && async_rpc.result(message); if (ret) { for (size_t i = 0; i < message.size(); ++i) { diff --git a/src/tcp_server.h b/src/tcp_server.h index ee630a9..dc863a6 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -91,13 +91,12 @@ class BridgeTCPServer final: public Server { if (!client) return 0; k_mutex_lock(&server_mutex, K_FOREVER); + size_t written = 0; if (_connected) { - size_t written = client.write(buf, size); - k_mutex_unlock(&server_mutex); - return written; + written = client.write(buf, size); } k_mutex_unlock(&server_mutex); - return 0; + return written; } void close() {