Skip to content

Commit

Permalink
Merge pull request #165 from Quicr/moq-streambuf
Browse files Browse the repository at this point in the history
Integrate transport steram buffer API changes
  • Loading branch information
TimEvens committed May 29, 2024
2 parents b1c5d1d + d655ee5 commit b6b1424
Show file tree
Hide file tree
Showing 11 changed files with 421 additions and 172 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CLANG_FORMAT=clang-format -i
.PHONY: all clean cclean format tidy

all: ${BUILD_DIR}
cmake --build ${BUILD_DIR}
cmake --build ${BUILD_DIR} --parallel 8

${BUILD_DIR}: CMakeLists.txt cmd/CMakeLists.txt
cmake -B${BUILD_DIR} -DBUILD_TESTING=TRUE -DQUICR_BUILD_TESTS=ON -DCMAKE_BUILD_TYPE=Debug .
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ openssl req -nodes -x509 -newkey rsa:2048 -days 365 \
-keyout server-key.pem -out server-cert.pem
```

#### MbedTLS cert

```
openssl req -nodes -x509 -newkey ec:<(openssl ecparam -name secp256r1) -days 365 \
-subj "/C=US/ST=CA/L=San Jose/O=Cisco/CN=test.quicr.ctgpoc.com" \
-keyout server-key.pem -out server-cert.pem
```

Run:

```
Expand Down
133 changes: 104 additions & 29 deletions src/encode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,23 @@ operator>>(MessageBuffer& msg, std::string& val)
MessageBuffer&
operator<<(MessageBuffer& buffer, const Connect& msg)
{
buffer << static_cast<uint32_t>(0); // Length of message

buffer << static_cast<uint8_t>(MessageType::Connect);
buffer << msg.endpoint_id;

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size(); // Update the message length

return buffer;
}

MessageBuffer&
operator>>(MessageBuffer& buffer, Connect& msg)
{
uint32_t len;
buffer >> len;

auto msg_type = uint8_t(0);
buffer >> msg_type;
if (msg_type != static_cast<uint8_t>(MessageType::Connect)) {
Expand All @@ -242,15 +250,23 @@ operator>>(MessageBuffer& buffer, Connect& msg)
MessageBuffer&
operator<<(MessageBuffer& buffer, const ConnectResponse& msg)
{
buffer << static_cast<uint32_t>(0); // Length of message

buffer << static_cast<uint8_t>(MessageType::ConnectResponse);
buffer << msg.relay_id;

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size(); // Update the message length

return buffer;
}

MessageBuffer&
operator>>(MessageBuffer& buffer, ConnectResponse& msg)
{
uint32_t len;
buffer >> len;

auto msg_type = uint8_t(0);
buffer >> msg_type;
if (msg_type != static_cast<uint8_t>(MessageType::ConnectResponse)) {
Expand All @@ -269,19 +285,27 @@ operator>>(MessageBuffer& buffer, ConnectResponse& msg)
MessageBuffer&
operator<<(MessageBuffer& buffer, const Subscribe& msg)
{
buffer << static_cast<uint32_t>(0); // Length of message

buffer << static_cast<uint8_t>(MessageType::Subscribe);
buffer << msg.transaction_id;
buffer << msg.quicr_namespace;
buffer << static_cast<uint8_t>(msg.intent);
buffer << static_cast<uint8_t>(msg.transport_mode);
buffer << msg.remote_data_ctx_id;

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size();

return buffer;
}

MessageBuffer&
operator>>(MessageBuffer& buffer, Subscribe& msg)
{
uint32_t len;
buffer >> len;

auto msg_type = uint8_t(0);
buffer >> msg_type;
if (msg_type != static_cast<uint8_t>(MessageType::Subscribe)) {
Expand All @@ -305,15 +329,23 @@ operator>>(MessageBuffer& buffer, Subscribe& msg)
MessageBuffer&
operator<<(MessageBuffer& buffer, const Unsubscribe& msg)
{
buffer << static_cast<uint32_t>(0);

buffer << static_cast<uint8_t>(MessageType::Unsubscribe);
buffer << msg.quicr_namespace;

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size(); // Update the message length

return buffer;
}

MessageBuffer&
operator>>(MessageBuffer& buffer, Unsubscribe& msg)
{
uint32_t len;
buffer >> len;

auto msg_type = uint8_t(0);
buffer >> msg_type;
if (msg_type != static_cast<uint8_t>(MessageType::Unsubscribe)) {
Expand All @@ -328,17 +360,25 @@ operator>>(MessageBuffer& buffer, Unsubscribe& msg)
MessageBuffer&
operator<<(MessageBuffer& buffer, const SubscribeResponse& msg)
{
buffer << static_cast<uint32_t>(0);

buffer << static_cast<uint8_t>(MessageType::SubscribeResponse);
buffer << static_cast<uint8_t>(msg.response);
buffer << msg.transaction_id;
buffer << msg.quicr_namespace;

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size(); // Update the message length

return buffer;
}

MessageBuffer&
operator>>(MessageBuffer& buffer, SubscribeResponse& msg)
{
uint32_t len;
buffer >> len;

auto msg_type = uint8_t(0);
buffer >> msg_type;
if (msg_type != static_cast<uint8_t>(MessageType::SubscribeResponse)) {
Expand All @@ -358,16 +398,23 @@ operator>>(MessageBuffer& buffer, SubscribeResponse& msg)
MessageBuffer&
operator<<(MessageBuffer& buffer, const SubscribeEnd& msg)
{
buffer << static_cast<uint32_t>(0);
buffer << static_cast<uint8_t>(MessageType::SubscribeEnd);
buffer << static_cast<uint8_t>(msg.reason);
buffer << msg.quicr_namespace;

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size(); // Update the message length

return buffer;
}

MessageBuffer&
operator>>(MessageBuffer& buffer, SubscribeEnd& msg)
{
uint32_t len;
buffer >> len;

auto msg_type = uint8_t(0);
buffer >> msg_type;
if (msg_type != static_cast<uint8_t>(MessageType::SubscribeEnd)) {
Expand All @@ -390,6 +437,8 @@ operator>>(MessageBuffer& buffer, SubscribeEnd& msg)
MessageBuffer&
operator<<(MessageBuffer& buffer, const PublishIntent& msg)
{
buffer << static_cast<uint32_t>(0);

buffer << static_cast<uint8_t>(msg.message_type);
buffer << msg.transaction_id;
buffer << msg.quicr_namespace;
Expand All @@ -398,25 +447,37 @@ operator<<(MessageBuffer& buffer, const PublishIntent& msg)
buffer << msg.datagram_capable;
buffer << static_cast<uint8_t>(msg.transport_mode);

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size(); // Update the message length

return buffer;
}

MessageBuffer&
operator<<(MessageBuffer& buffer, PublishIntent&& msg)
{
buffer << static_cast<uint32_t>(0);

buffer << static_cast<uint8_t>(msg.message_type);
buffer << msg.transaction_id;
buffer << msg.quicr_namespace;
buffer << std::move(msg.payload);
buffer << msg.media_id;
buffer << msg.datagram_capable;
buffer << static_cast<uint8_t>(msg.transport_mode);

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size(); // Update the message length

return buffer;
}

MessageBuffer&
operator>>(MessageBuffer& buffer, PublishIntent& msg)
{
uint32_t len;
buffer >> len;

auto msg_type = uint8_t(0);
buffer >> msg_type;
msg.message_type = static_cast<MessageType>(msg_type);
Expand All @@ -437,18 +498,26 @@ operator>>(MessageBuffer& buffer, PublishIntent& msg)
MessageBuffer&
operator<<(MessageBuffer& buffer, const PublishIntentResponse& msg)
{
buffer << static_cast<uint32_t>(0);

buffer << static_cast<uint8_t>(msg.message_type);
buffer << msg.quicr_namespace;
buffer << static_cast<uint8_t>(msg.response);
buffer << msg.transaction_id;
buffer << msg.remote_data_ctx_id;

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size(); // Update the message length

return buffer;
}

MessageBuffer&
operator>>(MessageBuffer& buffer, PublishIntentResponse& msg)
{
uint32_t len;
buffer >> len;

auto msg_type = uint8_t(0);
buffer >> msg_type;
msg.message_type = static_cast<MessageType>(msg_type);
Expand Down Expand Up @@ -494,30 +563,43 @@ operator>>(MessageBuffer& buffer, Header& msg)
MessageBuffer&
operator<<(MessageBuffer& buffer, const PublishDatagram& msg)
{
buffer << static_cast<uint32_t>(0);

buffer << static_cast<uint8_t>(MessageType::Publish);
buffer << msg.header;
buffer << static_cast<uint8_t>(msg.media_type);
buffer << msg.media_data_length;
buffer << msg.media_data;

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size(); // Update the message length

return buffer;
}

MessageBuffer&
operator<<(MessageBuffer& buffer, PublishDatagram&& msg)
{
buffer << static_cast<uint32_t>(0);

buffer << static_cast<uint8_t>(MessageType::Publish);
buffer << msg.header;
buffer << static_cast<uint8_t>(msg.media_type);
buffer << msg.media_data_length;
buffer << std::move(msg.media_data);

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size(); // Update the message length

return buffer;
}

MessageBuffer&
operator>>(MessageBuffer& buffer, PublishDatagram& msg)
{
uint32_t len;
buffer >> len;

auto msg_type = uint8_t(0);
buffer >> msg_type;
if (msg_type != static_cast<uint8_t>(MessageType::Publish)) {
Expand All @@ -541,58 +623,42 @@ operator>>(MessageBuffer& buffer, PublishDatagram& msg)
return buffer;
}

MessageBuffer&
operator<<(MessageBuffer& buffer, const PublishStream& msg)
{
buffer << msg.media_data_length;
buffer << msg.media_data;
return buffer;
}

MessageBuffer&
operator<<(MessageBuffer& buffer, PublishStream&& msg)
{
buffer << msg.media_data_length;
buffer << std::move(msg.media_data);
return buffer;
}

MessageBuffer&
operator>>(MessageBuffer& buffer, PublishStream& msg)
{
buffer >> msg.media_data_length;
buffer >> msg.media_data;
if (msg.media_data.size() != static_cast<size_t>(msg.media_data_length)) {
throw MessageBuffer::LengthException(msg.media_data.size(),
msg.media_data_length);
}

return buffer;
}

MessageBuffer&
operator<<(MessageBuffer& buffer, const PublishIntentEnd& msg)
{
buffer << static_cast<uint32_t>(0);

buffer << static_cast<uint8_t>(msg.message_type);
buffer << msg.quicr_namespace;
buffer << msg.payload;

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size(); // Update the message length

return buffer;
}

MessageBuffer&
operator<<(MessageBuffer& buffer, PublishIntentEnd&& msg)
{
buffer << static_cast<uint32_t>(0);

buffer << static_cast<uint8_t>(msg.message_type);
buffer << msg.quicr_namespace;
buffer << std::move(msg.payload);

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size(); // Update the message length

return buffer;
}

MessageBuffer&
operator>>(MessageBuffer& buffer, PublishIntentEnd& msg)
{
uint32_t len;
buffer >> len;

auto msg_type = uint8_t(0);
buffer >> msg_type;
msg.message_type = static_cast<MessageType>(msg_type);
Expand All @@ -610,15 +676,24 @@ operator>>(MessageBuffer& buffer, PublishIntentEnd& msg)
MessageBuffer&
operator<<(MessageBuffer& buffer, const Fetch& msg)
{
buffer << static_cast<uint32_t>(0);

buffer << static_cast<uint8_t>(MessageType::Fetch);
buffer << msg.transaction_id;
buffer << msg.name;

uint32_t* len = reinterpret_cast<uint32_t*>(buffer.data());
*len = buffer.size(); // Update the message length

return buffer;
}

MessageBuffer&
operator>>(MessageBuffer& buffer, Fetch& msg)
{
uint32_t len;
buffer >> len;

auto msg_type = uint8_t(0);
buffer >> msg_type;
if (msg_type != static_cast<uint8_t>(MessageType::Fetch)) {
Expand Down
Loading

0 comments on commit b6b1424

Please sign in to comment.