Skip to content

Commit

Permalink
add subscribe_ok
Browse files Browse the repository at this point in the history
  • Loading branch information
suhasHere committed Jun 2, 2024
1 parent 5648e92 commit 6dfee82
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 34 deletions.
19 changes: 10 additions & 9 deletions include/quicr/moq_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ using ReasonPhrase = quicr::bytes;
using GroupId = uintVar_t;
using ObjectId = uintVar_t;
using ObjectPriority = uintVar_t;
using SubscribeId = uintVar_t;
using SubscribeId = uint64_t;
using TrackAlias = uintVar_t;
using ParamType = uintVar_t;

Expand Down Expand Up @@ -125,10 +125,16 @@ struct MoqSubscribe {

struct MoqSubscribeOk {
SubscribeId subscribe_id;
uintVar_t expires;
uint64_t expires;
bool content_exists;
std::optional<GroupId> largest_group;
std::optional<ObjectId> largest_object;
uint64_t largest_group {0};
uint64_t largest_object {0};
friend bool operator>>(qtransport::StreamBuffer<uint8_t> &buffer, MoqSubscribeOk &msg);
friend qtransport::StreamBuffer<uint8_t>& operator<<(qtransport::StreamBuffer<uint8_t>& buffer,
const MoqSubscribeOk& msg);
private:
size_t current_pos {0};
const size_t MAX_FIELDS = 5;
};


Expand All @@ -153,19 +159,14 @@ struct MoqSubscribeDone
ObjectId final_object_id;
};

MessageBuffer& operator<<(MessageBuffer &buffer, const MoqSubscribe &msg);
MessageBuffer& operator>>(MessageBuffer &buffer, MoqSubscribe &msg);
MessageBuffer& operator<<(MessageBuffer &buffer, const MoqUnsubscribe &msg);
MessageBuffer& operator>>(MessageBuffer &buffer, MoqUnsubscribe &msg);
MessageBuffer& operator<<(MessageBuffer &buffer, const MoqSubscribeOk &msg);
MessageBuffer& operator>>(MessageBuffer &buffer, MoqSubscribeOk &msg);
MessageBuffer& operator<<(MessageBuffer &buffer, const MoqSubscribeError &msg);
MessageBuffer& operator>>(MessageBuffer &buffer, MoqSubscribeError &msg);
MessageBuffer& operator<<(MessageBuffer &buffer, const MoqSubscribeDone &msg);
MessageBuffer& operator>>(MessageBuffer &buffer, MoqSubscribeDone &msg);



//
// Announce
//
Expand Down
102 changes: 77 additions & 25 deletions src/moq_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,39 +247,91 @@ operator>>(MessageBuffer &buffer, MoqUnsubscribe &msg) {
return buffer;
}

MessageBuffer &
operator<<(MessageBuffer &buffer, const MoqSubscribeOk &msg) {
buffer << static_cast<uint8_t>(MESSAGE_TYPE_SUBSCRIBE_OK);
buffer << msg.subscribe_id;
buffer << msg.expires;
buffer << static_cast<uint8_t>(msg.content_exists);
if (msg.content_exists) {
buffer << msg.largest_group;
buffer << msg.largest_object;
return buffer;

/*
* SubscribeId subscribe_id;
uintVar_t expires;
bool content_exists;
std::optional<GroupId> largest_group;
std::optional<ObjectId> largest_object;
*/
qtransport::StreamBuffer<uint8_t>& operator<<(qtransport::StreamBuffer<uint8_t>& buffer,
const MoqSubscribeOk& msg){
buffer.push(qtransport::to_uintV(static_cast<uint64_t>(MESSAGE_TYPE_SUBSCRIBE_OK)));
buffer.push(qtransport::to_uintV(msg.subscribe_id));
buffer.push(qtransport::to_uintV(msg.expires));
msg.content_exists ? buffer.push(static_cast<uint8_t>(1)) : buffer.push(static_cast<uint8_t>(0));
if(msg.content_exists) {
buffer.push(qtransport::to_uintV(msg.largest_group));
buffer.push(qtransport::to_uintV(msg.largest_object));
}
return buffer;
}

MessageBuffer &
operator>>(MessageBuffer &buffer, MoqSubscribeOk &msg) {
buffer >> msg.subscribe_id;
buffer >> msg.expires;
uint8_t content_exists {0};
buffer >> content_exists;
if(content_exists > 1) {
throw std::runtime_error("Invalid Context Exists Value");
bool parse_uintV_field(qtransport::StreamBuffer<uint8_t> &buffer, uint64_t& field) {
auto val = buffer.decode_uintV();
if (!val) {
return false;
}
field = val.value();
return true;
}

if (content_exists == 1) {
msg.content_exists = true;
buffer >> msg.largest_group;
buffer >> msg.largest_object;
return buffer;
bool operator>>(qtransport::StreamBuffer<uint8_t> &buffer, MoqSubscribeOk &msg) {

switch (msg.current_pos) {
case 0:
{
if(!parse_uintV_field(buffer, msg.subscribe_id)) {
return false;
}
msg.current_pos += 1;
}
break;
case 1: {
if(!parse_uintV_field(buffer, msg.expires)) {
return false;
}
msg.current_pos += 1;
}
break;
case 2: {
const auto val = buffer.front();
if (!val) {
return false;
}
buffer.pop();
msg.content_exists = (val.value()) == 1;
msg.current_pos += 1;
if (!msg.content_exists) {
// nothing more to process.
return true;
}
}
break;
case 3: {
if(!parse_uintV_field(buffer, msg.largest_group)) {
return false;
}
msg.current_pos += 1;
}
break;
case 4: {
if(!parse_uintV_field(buffer, msg.largest_object)) {
return false;
}
msg.current_pos += 1;
}
break;
default:
throw std::runtime_error("Malformed Message (SubscribeOK)");
}

msg.content_exists = false;
return buffer;
if (msg.current_pos < msg.MAX_FIELDS) {
return false;
}
return true;
}

MessageBuffer &
Expand Down
45 changes: 45 additions & 0 deletions test/moq_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,49 @@ TEST_CASE("Subscribe (Params) Message encode/decode")
CHECK_EQ(subscribe.track_params[0].param_type, subscribe_out.track_params[0].param_type);
CHECK_EQ(subscribe.track_params[0].param_length, subscribe_out.track_params[0].param_length);
CHECK_EQ(subscribe.track_params[0].param_value, subscribe_out.track_params[0].param_value);
}


TEST_CASE("SubscribeOk Message encode/decode")
{
qtransport::StreamBuffer<uint8_t> buffer;

auto subscribe_ok = MoqSubscribeOk {};
subscribe_ok.subscribe_id = 0x1;
subscribe_ok.expires = 0x100;
subscribe_ok.content_exists = false;
buffer << subscribe_ok;

std::vector<uint8_t> net_data = buffer.front(buffer.size());

MoqSubscribeOk subscribe_ok_out;
CHECK(verify(net_data, static_cast<uint64_t>(MESSAGE_TYPE_SUBSCRIBE_OK), subscribe_ok_out));
CHECK_EQ(subscribe_ok.subscribe_id, subscribe_ok_out.subscribe_id);
CHECK_EQ(subscribe_ok.expires, subscribe_ok_out.expires);
CHECK_EQ(subscribe_ok.content_exists, subscribe_ok_out.content_exists);
}


TEST_CASE("SubscribeOk (content-exists) Message encode/decode")
{
qtransport::StreamBuffer<uint8_t> buffer;

auto subscribe_ok = MoqSubscribeOk {};
subscribe_ok.subscribe_id = 0x1;
subscribe_ok.expires = 0x100;
subscribe_ok.content_exists = true;
subscribe_ok.largest_group = 0x1000;
subscribe_ok.largest_object = 0xff;
buffer << subscribe_ok;

std::vector<uint8_t> net_data = buffer.front(buffer.size());

MoqSubscribeOk subscribe_ok_out;
CHECK(verify(net_data, static_cast<uint64_t>(MESSAGE_TYPE_SUBSCRIBE_OK), subscribe_ok_out));
CHECK_EQ(subscribe_ok.subscribe_id, subscribe_ok_out.subscribe_id);
CHECK_EQ(subscribe_ok.expires, subscribe_ok_out.expires);
CHECK_EQ(subscribe_ok.content_exists, subscribe_ok_out.content_exists);
CHECK_EQ(subscribe_ok.largest_group, subscribe_ok_out.largest_group);
CHECK_EQ(subscribe_ok.largest_object, subscribe_ok_out.largest_object);

}

0 comments on commit 6dfee82

Please sign in to comment.