Skip to content

Commit

Permalink
add subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
suhasHere committed Jun 1, 2024
1 parent 2cc7171 commit 5648e92
Show file tree
Hide file tree
Showing 3 changed files with 338 additions and 101 deletions.
39 changes: 19 additions & 20 deletions include/quicr/moq_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ enum struct ParameterType : uint8_t {
};

struct MoqParameter{
std::optional<ParamType> param_type;
std::optional<uint64_t> param_type;
uintVar_t param_length;
bytes param_value;
};
Expand Down Expand Up @@ -96,28 +96,31 @@ MessageBuffer& operator>>(MessageBuffer &buffer, MoqServerSetup &msg);
//
// Subscribe
//
enum struct LocationMode: uint8_t {
enum struct FilterType: uint64_t {
None = 0x0,
Absolute,
RelativePrevious,
RelativeNext
};

struct Location {
LocationMode mode;
std::optional<uintVar_t> value;
LatestGroup,
LatestObject,
AbsoluteStart,
AbsoluteRange
};

struct MoqSubscribe {
SubscribeId subscribe_id;
TrackAlias track_alias;
std::optional<uint64_t> subscribe_id;
std::optional<uint64_t> track_alias;
TrackNamespace track_namespace;
TrackName track_name;
Location start_group;
Location start_object;
Location end_group;
Location end_object;
FilterType filter_type {FilterType::None};
std::optional<uintVar_t> start_group;
std::optional<uintVar_t> end_group;
std::optional<uintVar_t> start_object;
std::optional<uintVar_t> end_object;
uint64_t num_params {0};
std::vector<MoqParameter> track_params;
friend bool operator>>(qtransport::StreamBuffer<uint8_t> &buffer, MoqSubscribe &msg);
friend qtransport::StreamBuffer<uint8_t>& operator<<(qtransport::StreamBuffer<uint8_t>& buffer,
const MoqSubscribe& msg);
private:
MoqParameter current_param{};
};

struct MoqSubscribeOk {
Expand Down Expand Up @@ -150,8 +153,6 @@ struct MoqSubscribeDone
ObjectId final_object_id;
};

MessageBuffer& operator<<(MessageBuffer &buffer, const Location &msg);
MessageBuffer& operator>>(MessageBuffer &buffer, Location &msg);
MessageBuffer& operator<<(MessageBuffer &buffer, const MoqSubscribe &msg);
MessageBuffer& operator>>(MessageBuffer &buffer, MoqSubscribe &msg);
MessageBuffer& operator<<(MessageBuffer &buffer, const MoqUnsubscribe &msg);
Expand Down Expand Up @@ -283,8 +284,6 @@ MessageBuffer& operator>>(MessageBuffer &buffer, MoqStreamHeaderGroup &msg);
MessageBuffer& operator<<(MessageBuffer& buffer, const MoqStreamGroupObject& msg);
MessageBuffer& operator>>(MessageBuffer &buffer, MoqStreamGroupObject &msg);

// utility
std::tuple<Location, Location, Location, Location> to_locations(const SubscribeIntent& intent);
MessageBuffer& operator<<(MessageBuffer& buffer, const std::vector<uintVar_t>& val);
MessageBuffer& operator>>(MessageBuffer& msg, std::vector<uintVar_t>& val);
MessageBuffer& operator>>(MessageBuffer& msg, std::vector<uintVar_t>& val);
Expand Down
250 changes: 169 additions & 81 deletions src/moq_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,71 +53,185 @@ MessageBuffer& operator>>(MessageBuffer &buffer, MoqParameter &param) {
// Subscribe
//

qtransport::StreamBuffer<uint8_t>& operator<<(qtransport::StreamBuffer<uint8_t>& buffer,
const MoqSubscribe& msg){
buffer.push(qtransport::to_uintV(static_cast<uint64_t>(MESSAGE_TYPE_SUBSCRIBE)));
buffer.push(qtransport::to_uintV(msg.subscribe_id.value()));
buffer.push(qtransport::to_uintV(msg.track_alias.value()));
buffer.push_lv(msg.track_namespace);
buffer.push_lv(msg.track_name);
buffer.push(qtransport::to_uintV(static_cast<uint64_t>(msg.filter_type)));

MessageBuffer& operator<<(MessageBuffer &buffer, const Location &msg) {
if (msg.mode == LocationMode::None) {
buffer << static_cast<uint8_t >(msg.mode);
return buffer;
switch (msg.filter_type) {
case FilterType::None:
case FilterType::LatestGroup:
case FilterType::LatestObject:
break;
case FilterType::AbsoluteStart: {
buffer.push(qtransport::to_uintV(msg.start_group.value()));
buffer.push(qtransport::to_uintV(msg.start_object.value()));
}
break;
case FilterType::AbsoluteRange:
buffer.push(qtransport::to_uintV(msg.start_group.value()));
buffer.push(qtransport::to_uintV(msg.start_object.value()));
buffer.push(qtransport::to_uintV(msg.end_group.value()));
buffer.push(qtransport::to_uintV(msg.end_object.value()));
break;
}
buffer << static_cast<uint8_t >(msg.mode);
buffer << msg.value.value();
return buffer;
}

MessageBuffer& operator>>(MessageBuffer &buffer, Location &msg) {
uint8_t mode {0};
buffer >> mode;
msg.mode = static_cast<LocationMode>(mode);
if (static_cast<LocationMode>(mode) != LocationMode::None) {
uintVar_t loc_val{0};
buffer >> loc_val;
msg.value = loc_val;
buffer.push(qtransport::to_uintV(msg.num_params));
for (const auto& param: msg.track_params) {
buffer.push(qtransport::to_uintV(static_cast<uint64_t>(param.param_type.value())));
buffer.push(qtransport::to_uintV(param.param_length));
buffer.push(param.param_value);
}

return buffer;
}

MessageBuffer &
operator<<(MessageBuffer &buffer, const MoqSubscribe &msg) {
buffer << static_cast<uintVar_t>(MESSAGE_TYPE_SUBSCRIBE);
buffer << msg.subscribe_id;
buffer << msg.track_alias;
buffer << msg.track_namespace;
buffer << msg.track_name;
buffer << msg.start_group;
buffer << msg.start_object;
buffer << msg.end_group;
buffer << msg.end_object;
buffer << static_cast<uintVar_t>(msg.track_params.size());
for (const auto& param: msg.track_params) {
buffer << param.param_type;
buffer << param.param_length;
buffer << param.param_value;
bool operator>>(qtransport::StreamBuffer<uint8_t> &buffer, MoqSubscribe &msg) {

if (!msg.subscribe_id.has_value()) {
auto val = buffer.decode_uintV();
if (!val) {
return false;
}
msg.subscribe_id = val.value();
}
return buffer;
}

MessageBuffer &
operator>>(MessageBuffer &buffer, MoqSubscribe &msg) {
buffer >> msg.subscribe_id;
buffer >> msg.track_alias;
buffer >> msg.track_namespace;
buffer >> msg.track_name;
buffer >> msg.start_group;
buffer >> msg.start_object;
buffer >> msg.end_group;
buffer >> msg.end_object;
uintVar_t num_params {0};
buffer >> num_params;
auto track_params = std::vector<MoqParameter>{};
while(static_cast<uint64_t>(num_params) > 0) {
auto param = MoqParameter{};
buffer >> param.param_type;
buffer >> param.param_length;
buffer >> param.param_value;
track_params.push_back(std::move(param));
num_params = num_params - 1;
if (!msg.track_alias.has_value()) {
auto val = buffer.decode_uintV();
if (!val) {
return false;
}
msg.track_alias = val.value();
}
return buffer;

if (msg.track_namespace.empty())
{
const auto val = buffer.decode_bytes();
if (!val) {
return false;
}
msg.track_namespace = val.value();
}

if (msg.track_name.empty())
{
const auto val = buffer.decode_bytes();
if (!val) {
return false;
}
msg.track_name = val.value();
}

if (msg.filter_type == FilterType::None)
{
const auto val = buffer.decode_uintV();
if (!val) {
return false;
}

auto filter = val.value();
msg.filter_type = static_cast<FilterType>(filter);
}

switch (msg.filter_type) {
case FilterType::None:
throw std::runtime_error("Malformed Filter Type");
case FilterType::LatestGroup:
case FilterType::LatestObject:
break;
case FilterType::AbsoluteStart:
{
if (!msg.start_group.has_value()) {
const auto val = buffer.decode_uintV();
if (!val) {
return false;
}
msg.start_group = val.value();
}

if (!msg.start_object.has_value()) {
const auto val = buffer.decode_uintV();
if (!val) {
return false;
}
msg.start_object = val.value();
}
}
break;
case FilterType::AbsoluteRange:
{
if (!msg.start_group.has_value()) {
const auto val = buffer.decode_uintV();
if (!val) {
return false;
}
msg.start_group = val.value();
}

if (!msg.start_object.has_value()) {
const auto val = buffer.decode_uintV();
if (!val) {
return false;
}
msg.start_object = val.value();
}

if (!msg.end_group.has_value()) {
const auto val = buffer.decode_uintV();
if (!val) {
return false;
}
msg.end_group = val.value();
}

if (!msg.end_object.has_value()) {
const auto val = buffer.decode_uintV();
if (!val) {
return false;
}
msg.end_object = val.value();
}
}
break;
}

if (!msg.num_params) {
const auto val = buffer.decode_uintV();
if (!val) {
return false;
}
msg.num_params = val.value();
}

// parse each param
while (msg.num_params > 0) {
if (!msg.current_param.param_type) {
auto val = buffer.front();
if (!val) {
return false;
}
msg.current_param.param_type = *val;
buffer.pop();
}

// decode param_len:<bytes>
auto param = buffer.decode_bytes();
if (!param) {
return false;
}

msg.current_param.param_length = param->size();
msg.current_param.param_value = param.value();
msg.track_params.push_back(msg.current_param);
msg.current_param = {};
msg.num_params -= 1;
}

return true;
}

MessageBuffer &
Expand Down Expand Up @@ -522,32 +636,6 @@ operator>>(MessageBuffer &buffer, MoqStreamGroupObject &msg) {
return buffer;
}

std::tuple<Location, Location, Location, Location> to_locations(const SubscribeIntent& intent) {
auto none_location = Location {.mode = LocationMode::None, .value = std::nullopt};

/*
* Sequence: 0 1 2 3 4 [5] [6] ...
^
Largest Sequence
RelativePrevious Value: 4 3 2 1 0
RelativeNext Value: 0 1 ...
*/
switch (intent) {
case SubscribeIntent::immediate:
return std::make_tuple(Location{.mode=LocationMode::RelativePrevious, .value=0}, //StartGroup
none_location, // EndGroup
Location{.mode=LocationMode::RelativePrevious, .value=0}, //StartObject
none_location); // EndObject
case SubscribeIntent::sync_up:
throw std::runtime_error("Intent Unsupported for Subscribe");
case SubscribeIntent::wait_up:
throw std::runtime_error("Intent Unsupported for Subscribe");
default:
throw std::runtime_error("Bad Intent for Subscribe");
}

}


// Setup

Expand Down
Loading

0 comments on commit 5648e92

Please sign in to comment.