diff --git a/examples/decoder_tests/decoder_tester.h b/examples/decoder_tests/decoder_tester.h index 266064f..61c00b0 100644 --- a/examples/decoder_tests/decoder_tester.h +++ b/examples/decoder_tests/decoder_tester.h @@ -21,10 +21,40 @@ class DecoderTester { DecoderTester(RpcDecoder<>& _d): decoder(_d){} + void first_response_info() { + if (!decoder.response_queued()) { + Serial.println("No response queued"); + return; + } + Serial.println("-- First response info --"); + Serial.print("RESP OFFSET: "); + Serial.println(static_cast(decoder._response_offset)); + Serial.print("RESP SIZE: "); + Serial.println(static_cast(decoder._response_size)); + } + + size_t get_response_size() { + return decoder._response_size; + } + + size_t get_response_offset() { + return decoder._response_offset; + } + + template + bool get_response(const uint32_t msg_id, RType& result, RpcError& error) { + return decoder.get_response(msg_id, result, error); + } + void crop_bytes(size_t size, size_t offset){ decoder.consume(size, offset); } + void pop_first() { + uint8_t temp_buffer[512]; + decoder.pop_packet(temp_buffer, 512); + } + void print_raw_buf(){ Serial.print("Decoder raw buffer content: "); diff --git a/examples/decoder_tests/decoder_tests.ino b/examples/decoder_tests/decoder_tests.ino index b823bf9..46e92c0 100644 --- a/examples/decoder_tests/decoder_tests.ino +++ b/examples/decoder_tests/decoder_tests.ino @@ -48,7 +48,7 @@ void runDecoderTest(const char* label) { Serial.println("-- Done --\n"); } -void runDecoderConsumeTest(const char* label, size_t second_packet_sz) { +void runDecoderConsumeTest(const char* label, size_t expected_2nd_pack_size) { Serial.println(label); print_buf(); @@ -63,20 +63,184 @@ void runDecoderConsumeTest(const char* label, size_t second_packet_sz) { delay(50); } + dt.first_response_info(); + + while (!decoder.response_queued()) { + Serial.println("1st response not ready"); + decoder.decode(); + delay(50); + } + size_t pack_size = decoder.get_packet_size(); Serial.print("1st Packet size: "); Serial.println(pack_size); + dt.first_response_info(); + + if ((dt.get_response_offset()!=pack_size)||(dt.get_response_size()!=expected_2nd_pack_size)) { + Serial.println("ERROR parsing 1st response\n"); + return; + } + + Serial.print("Consuming 2nd packet of given size: "); + Serial.println(dt.get_response_size()); + + dt.crop_bytes(dt.get_response_size(), dt.get_response_offset()); + + dt.print_raw_buf(); + + Serial.println("-- Done --\n"); +} + +void runDecoderPopFirstTest(const char* label, size_t expected_2nd_pack_size) { + Serial.println(label); + + print_buf(); + DummyTransport dummy_transport(packer.data(), packer.size()); + RpcDecoder<> decoder(dummy_transport); + + DecoderTester dt(decoder); + + while (!decoder.packet_incoming()) { + Serial.println("Packet not ready"); + decoder.decode(); + delay(50); + } + + while (!decoder.response_queued()) { + Serial.println("1st response not ready"); + decoder.decode(); + delay(50); + } + + dt.first_response_info(); + + size_t pack_size = decoder.get_packet_size(); + Serial.print("Consuming 1st Packet of size: "); + Serial.println(pack_size); + dt.pop_first(); + dt.print_raw_buf(); + + dt.first_response_info(); + + if ((dt.get_response_offset()!=0)||(dt.get_response_size()!=expected_2nd_pack_size)) { + Serial.println("ERROR moving 1st response\n"); + return; + } + Serial.print("Consuming 2nd packet of given size: "); - Serial.println(second_packet_sz); + Serial.println(dt.get_response_size()); + + dt.crop_bytes(dt.get_response_size(), dt.get_response_offset()); + + dt.print_raw_buf(); + dt.first_response_info(); + + Serial.println("-- Done --\n"); +} + +void runDecoderGetResponseTest(const char* label, size_t expected_2nd_pack_size, int _id) { + Serial.println(label); + + print_buf(); + DummyTransport dummy_transport(packer.data(), packer.size()); + RpcDecoder<> decoder(dummy_transport); + + DecoderTester dt(decoder); + + while (!decoder.packet_incoming()) { + Serial.println("Packet not ready"); + decoder.decode(); + delay(50); + } + + dt.first_response_info(); + + while (!decoder.response_queued()) { + Serial.println("1st response not ready"); + decoder.decode(); + delay(50); + } + + size_t pack_size = decoder.get_packet_size(); + Serial.print("1st Packet size: "); + Serial.println(pack_size); + + dt.first_response_info(); + + if ((dt.get_response_offset()!=pack_size)||(dt.get_response_size()!=expected_2nd_pack_size)) { + Serial.println("ERROR parsing 1st response\n"); + return; + } + + Serial.print("Getting response (2nd packet) size: "); + Serial.println(dt.get_response_size()); + + int res; + RpcError _err; + dt.get_response(_id, res, _err); + + Serial.print("Result: "); + Serial.println(res); + + dt.print_raw_buf(); + + Serial.println("-- Done --\n"); +} + + +void runDecoderGetTopResponseTest(const char* label, size_t expected_size, int _id) { + Serial.println(label); + + print_buf(); + DummyTransport dummy_transport(packer.data(), packer.size()); + RpcDecoder<> decoder(dummy_transport); + + DecoderTester dt(decoder); + + while (!decoder.packet_incoming()) { + Serial.println("Packet not ready"); + decoder.decode(); + delay(50); + } - dt.crop_bytes(second_packet_sz, pack_size); + dt.first_response_info(); + + while (!decoder.response_queued()) { + Serial.println("1st response not ready"); + decoder.decode(); + delay(50); + } + + size_t pack_size = decoder.get_packet_size(); + Serial.print("1st Packet size: "); + Serial.println(pack_size); + + dt.first_response_info(); + + if ((dt.get_response_offset()!=0)||(dt.get_response_size()!=expected_size)) { + Serial.println("ERROR parsing 1st response\n"); + return; + } + + Serial.print("Getting response size: "); + Serial.println(dt.get_response_size()); + + int res; + RpcError _err; + dt.get_response(_id, res, _err); + + Serial.print("Result: "); + Serial.println(res); dt.print_raw_buf(); + dt.first_response_info(); + Serial.println("-- Done --\n"); } + void testNestedArrayRequest() { packer.clear(); MsgPack::arr_size_t outer_arr(3); @@ -166,6 +330,63 @@ void testMultipleRpcPackets() { } +// Multiple RPCs in one buffer. Pop the 1st request and then the 2nd response +void testPopRpcPackets() { + packer.clear(); + MsgPack::arr_size_t req_sz(4); + MsgPack::arr_size_t par_sz(2); + MsgPack::arr_size_t resp_sz(4); + MsgPack::object::nil_t nil; + + // 1st request + packer.serialize(req_sz, 0, 1, "sum", par_sz, 10, 20); + // 2nd response + packer.serialize(resp_sz, 1, 1, nil, 42); + // 3rd request + packer.serialize(req_sz, 0, 2, "echo", par_sz, "Hello", true); + + runDecoderPopFirstTest("== Test: Pop-first packet ==", 5); + +} + +// Multiple RPCs in one buffer. Get the response in the buffer +void testGetResponsePacket() { + packer.clear(); + MsgPack::arr_size_t req_sz(4); + MsgPack::arr_size_t par_sz(2); + MsgPack::arr_size_t resp_sz(4); + MsgPack::object::nil_t nil; + + // 1st request + packer.serialize(req_sz, 0, 1, "sum", par_sz, 10, 20); + // 2nd response + packer.serialize(resp_sz, 1, 1, nil, 101); + // 3rd request + packer.serialize(req_sz, 0, 2, "echo", par_sz, "Hello", true); + + runDecoderGetResponseTest("== Test: Get response packet ==", 5, 1); + +} + +// Multiple RPCs in one buffer. The response is top of the buffer +void testGetTopResponsePacket() { + packer.clear(); + MsgPack::arr_size_t req_sz(4); + MsgPack::arr_size_t par_sz(2); + MsgPack::arr_size_t resp_sz(4); + MsgPack::object::nil_t nil; + + // 1st response + packer.serialize(resp_sz, 1, 1, nil, 101); + // 2nd request + packer.serialize(req_sz, 0, 2, "echo", par_sz, "Hello", true); + // 3rd request + packer.serialize(req_sz, 0, 1, "sum", par_sz, 30, 30); + + runDecoderGetTopResponseTest("== Test: Get top response packet ==", 5, 1); + +} + // Binary parameter (e.g., binary blob) void testBinaryParam() { packer.clear(); @@ -225,6 +446,9 @@ void setup() { testDeepNestedStructure(); testArrayOfMapsResponse(); testMultipleRpcPackets(); + testPopRpcPackets(); + testGetResponsePacket(); + testGetTopResponsePacket(); testBinaryParam(); testExtensionParam(); testCombinedComplexBuffer(); diff --git a/src/decoder.h b/src/decoder.h index 883abeb..5ffd946 100644 --- a/src/decoder.h +++ b/src/decoder.h @@ -57,12 +57,12 @@ class RpcDecoder { template bool get_response(const uint32_t msg_id, RType& result, RpcError& error) { - if (!packet_incoming() || _packet_type!=RESP_MSG) return false; + if (!response_queued()) return false; MsgPack::Unpacker unpacker; unpacker.clear(); - if (!unpacker.feed(_raw_buffer, _packet_size)) return false; + if (!unpacker.feed(_raw_buffer + _response_offset, _response_size)) return false; MsgPack::arr_size_t resp_size; int resp_type; @@ -107,8 +107,11 @@ class RpcDecoder { } } - consume(_packet_size); - reset_packet(); + consume(_response_size, _response_offset); + if (_response_offset==0) { // the response was in the first position + reset_packet(); + } + reset_response(); return true; } @@ -188,17 +191,24 @@ class RpcDecoder { void parse_packet(){ - if (packet_incoming()){return;} + size_t offset = 0; + + if (packet_incoming()) { + if (response_queued()) { + return; + } + offset = _response_offset; + } size_t bytes_checked = 0; size_t container_size; int type; MsgPack::Unpacker unpacker; - while (bytes_checked < _bytes_stored){ + while (bytes_checked + offset < _bytes_stored){ bytes_checked++; unpacker.clear(); - if (!unpacker.feed(_raw_buffer, bytes_checked)) continue; + if (!unpacker.feed(_raw_buffer + offset, bytes_checked)) continue; if (unpackTypedArray(unpacker, container_size, type)) { @@ -214,11 +224,24 @@ class RpcDecoder { break; // Not a valid RPC format } - _packet_type = type; - _packet_size = bytes_checked; + if (offset == 0) { // that's the first packet + _packet_type = type; + _packet_size = bytes_checked; + if (type == RESP_MSG) { // and it is for a client + _response_offset = 0; + _response_size = bytes_checked; + } else { + _response_offset = bytes_checked; + } + } else { // we have a response packet in the queue + if (type == RESP_MSG) { + _response_size = bytes_checked; + } else { + _response_offset = offset + bytes_checked; + } + } + break; - } else { - continue; } } @@ -227,6 +250,10 @@ class RpcDecoder { bool packet_incoming() const { return _packet_size >= MIN_RPC_BYTES; } + bool response_queued() const { + return (_response_offset < _bytes_stored) && (_response_size > 0); + } + int packet_type() const { return _packet_type; } size_t get_packet_size() const { return _packet_size;} @@ -243,6 +270,8 @@ class RpcDecoder { size_t _bytes_stored = 0; int _packet_type = NO_MSG; size_t _packet_size = 0; + size_t _response_offset = 0; + size_t _response_size = 0; uint32_t _msg_id = 0; uint32_t _discarded_packets = 0; @@ -275,6 +304,9 @@ class RpcDecoder { } reset_packet(); + if (_response_offset >= packet_size) { + _response_offset -= packet_size; + } return consume(packet_size); } @@ -289,18 +321,23 @@ class RpcDecoder { _packet_size = 0; } -size_t consume(size_t size, size_t offset = 0) { - // Boundary checks - if (offset + size > _bytes_stored || size == 0) return 0; - - size_t remaining_bytes = _bytes_stored - size; - for (size_t i=offset; i _bytes_stored || size == 0) return 0; + + size_t remaining_bytes = _bytes_stored - size; + for (size_t i=offset; i