Skip to content

Commit

Permalink
[core] Changed priority rule to higher preferred. Set uint16_t as wei…
Browse files Browse the repository at this point in the history
…ght type. (#1550)
  • Loading branch information
ethouris committed Sep 17, 2020
1 parent 4100835 commit 6e7520b
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 24 deletions.
10 changes: 6 additions & 4 deletions docs/API-functions.md
Expand Up @@ -722,13 +722,15 @@ where:
* `token`: An integer value unique for every connection, or -1 if unused

The `srt_perpare_endpoint` sets these fields to default values. After that
you can change the value of `weight` and `config` fields. The `weight`
parameter's meaning is dependent on the group type:
you can change the value of `weight` and `config` and `token` fields. The
`weight` parameter's meaning is dependent on the group type:

* BROADCAST: not used
* BACKUP: positive value of link priority, 0 is the highest
* BACKUP: positive value of link priority (the greater, the more preferred)
* BALANCING: relative expected load on this link for fixed algorithm

In any case, the allowed value ranges for `weight` is between 0 and 32767.

The `config` parameter is used to provide options to be set separately
on a socket for a particular connection (see [`srt_create_config()`](#srt_create_config)).

Expand All @@ -750,7 +752,7 @@ typedef struct SRT_SocketGroupData_
SRTSOCKET id;
struct sockaddr_storage peeraddr;
SRT_SOCKSTATUS sockstate;
int weight;
uint16_t weight;
SRT_MEMBERSTATUS memberstate;
int result;
int token;
Expand Down
10 changes: 5 additions & 5 deletions docs/socket-groups.md
Expand Up @@ -90,7 +90,7 @@ sending for a short time. This state should last at most as long as it takes
for SRT to determie the link broken - either by getting the link broken by
itself, or by closing the link when it's remaining unstable too long time.

This mode allows also to set link priorities - the lower, the more preferred.
This mode allows also to set link priorities - the greater, the more preferred.
This priority decides mainly, which link is "best" and which is selected to
take over transmission over a broken link before others, as well as which
links should remain active should multiple links be stable at a time.
Expand Down Expand Up @@ -702,18 +702,18 @@ from a device that streams to this machine to port 5555):

At the caller side you can also use some group-member specific options.
Currently there exists only one option dedicated for the Backup group
type, which is priority parameter with a `pri` key. In the simplified
type, which is priority parameter with a `weight` key. In the simplified
syntax it should be attached to the member parameter:

```
./srt-test-live srt://*?type=backup alpha:5000?pri=1 beta:5000?pri=0 -g udp://239.255.133.10:5999
./srt-test-live srt://*?type=backup alpha:5000?weight=0 beta:5000?weight=1 -g udp://239.255.133.10:5999
```

Priorities in the Backup group type define which links should be preferred
over the others when deciding to silence links in a situation of multiple
stable links. Also at the moment when the group is connected, the link with
highest priority is preferred for activation, and if another link is connected
with higher priority it also takes over.
highest priority is preferred for activation (greatest weight value), and if
another link is connected with higher priority it also takes over.

Here the `beta` host has higher priority than `alpha`, so when both
links are established, it should use the host `beta` to send the data,
Expand Down
6 changes: 6 additions & 0 deletions srtcore/api.cpp
Expand Up @@ -1263,6 +1263,12 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar
LOGC(aclog.Error, log << "srt_connect/group: family differs on source and target address");
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}

if (targets[tii].weight > CUDT::MAX_WEIGHT)
{
LOGC(aclog.Error, log << "srt_connect/group: weight value must be between 0 and " << (+CUDT::MAX_WEIGHT));
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}
}


Expand Down
1 change: 1 addition & 0 deletions srtcore/core.h
Expand Up @@ -326,6 +326,7 @@ class CUDT
static const uint64_t COMM_KEEPALIVE_PERIOD_US = 1*1000*1000;
static const int32_t COMM_SYN_INTERVAL_US = 10*1000;
static const int COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS = 3000;
static const uint16_t MAX_WEIGHT = 32767;

static const int
DEF_MSS = 1500,
Expand Down
28 changes: 17 additions & 11 deletions srtcore/group.cpp
Expand Up @@ -2605,13 +2605,19 @@ void CUDTGroup::bstatsSocket(CBytePerfMon* perf, bool clear)

// For sorting group members by priority

struct FByWeight //: public std::binary_predicate<CUDTGroup::gli_t, CUDTGroup::gli_t>
struct FPriorityOrder
{
// returned true = "elements are in the right order"
static bool check(uint16_t preceding, uint16_t succeeding)
{
return preceding > succeeding;
}

typedef CUDTGroup::gli_t gli_t;
bool operator()(gli_t a, gli_t b)

bool operator()(gli_t preceding, gli_t succeeding)
{
// this should be operator <
return a->weight < b->weight;
return check(preceding->weight, succeeding->weight);
}
};

Expand Down Expand Up @@ -2798,7 +2804,7 @@ bool CUDTGroup::sendBackup_CheckSendStatus(gli_t
int32_t& w_curseq,
vector<gli_t>& w_parallel,
int& w_final_stat,
set<int>& w_sendable_pri,
set<uint16_t>& w_sendable_pri,
size_t& w_nsuccessful,
bool& w_is_nunstable)
{
Expand Down Expand Up @@ -3389,7 +3395,7 @@ void CUDTGroup::sendBackup_CheckParallelLinks(const vector<gli_t>& unstable,
// the one with highest priority.
if (w_parallel.size() > 1)
{
sort(w_parallel.begin(), w_parallel.end(), FByWeight());
sort(w_parallel.begin(), w_parallel.end(), FPriorityOrder());
steady_clock::time_point currtime = steady_clock::now();

vector<gli_t>::iterator b = w_parallel.begin();
Expand Down Expand Up @@ -3555,7 +3561,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
}

// Sort the idle sockets by priority so the highest priority idle links are checked first.
sort(idlers.begin(), idlers.end(), FByWeight());
sort(idlers.begin(), idlers.end(), FPriorityOrder());

vector<Sendstate> sendstates;

Expand Down Expand Up @@ -3594,7 +3600,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
// Collect priorities from sendable links, added only after sending is successful.
// This will be used to check if any of the idlers have higher priority
// and therefore need to be activated.
set<int> sendable_pri;
set<uint16_t> sendable_pri;

// We believe that we need to send the payload over every sendable link anyway.
for (vector<gli_t>::iterator snd = sendable.begin(); snd != sendable.end(); ++snd)
Expand Down Expand Up @@ -3745,7 +3751,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
// (those are collected in 'sendable_pri'). Check if there are any (if
// no sendable, a new link needs to be activated anyway), and if the
// priority has a lower number.
if (sendable_pri.empty() || (!idlers.empty() && idlers[0]->weight < *sendable_pri.begin()))
if (sendable_pri.empty() || (!idlers.empty() && FPriorityOrder::check(idlers[0]->weight, *sendable_pri.begin())))
{
need_activate = true;
#if ENABLE_HEAVY_LOGGING
Expand All @@ -3766,7 +3772,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
{
// Only now we are granted that both sendable_pri and idlers are nonempty
LOGC(gslog.Debug,
log << "grp/sendBackup: found link pri " << idlers[0]->weight << " < " << (*sendable_pri.begin())
log << "grp/sendBackup: found link pri " << idlers[0]->weight << " PREF OVER " << (*sendable_pri.begin())
<< " (highest from sendable) - will activate an idle link");
activate_reason = "found higher pri link";
}
Expand All @@ -3776,7 +3782,7 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: sendable_pri (" << sendable_pri.size() << "): " << Printable(sendable_pri)
<< " first idle pri: " << (idlers.size() > 0 ? idlers[0]->weight : -1)
<< " first idle pri: " << (idlers.size() > 0 ? int(idlers[0]->weight) : -1)
<< " - will NOT activate an idle link");
}
}
Expand Down
4 changes: 2 additions & 2 deletions srtcore/group.h
Expand Up @@ -74,7 +74,7 @@ class CUDTGroup
bool ready_error;

// Configuration
int weight;
uint16_t weight;
};

struct ConfigItem
Expand Down Expand Up @@ -230,7 +230,7 @@ class CUDTGroup
int32_t& w_curseq,
std::vector<gli_t>& w_parallel,
int& w_final_stat,
std::set<int>& w_sendable_pri,
std::set<uint16_t>& w_sendable_pri,
size_t& w_nsuccessful,
bool& w_is_unstable);
void sendBackup_Buffering(const char* buf, const int len, int32_t& curseq, SRT_MSGCTRL& w_mc);
Expand Down
4 changes: 2 additions & 2 deletions srtcore/srt.h
Expand Up @@ -787,7 +787,7 @@ typedef struct SRT_SocketGroupData_
SRTSOCKET id;
struct sockaddr_storage peeraddr; // Don't want to expose sockaddr_any to public API
SRT_SOCKSTATUS sockstate;
int weight;
uint16_t weight;
SRT_MEMBERSTATUS memberstate;
int result;
int token;
Expand All @@ -800,7 +800,7 @@ typedef struct SRT_GroupMemberConfig_
SRTSOCKET id;
struct sockaddr_storage srcaddr;
struct sockaddr_storage peeraddr; // Don't want to expose sockaddr_any to public API
int weight;
uint16_t weight;
SRT_SOCKOPT_CONFIG* config;
int errorcode;
int token;
Expand Down

0 comments on commit 6e7520b

Please sign in to comment.