Skip to content

Commit

Permalink
[apps] Added non-blocking option for bonding examples (#1746)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethouris committed Jan 21, 2021
1 parent b665e35 commit 5066569
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 15 deletions.
117 changes: 106 additions & 11 deletions examples/test-c-client-bonding.c
Expand Up @@ -40,6 +40,43 @@ struct

#define SIZE(array) (sizeof array/sizeof(array[0]))

// Note that in this example application there's a socket
// used first to connect to the service and then it will be
// used for writing. Therefore the same function will be used
// for waiting for the socket to be connected and then to wait
// for write-ready on the socket used for transmission. For a
// model of waiting for read-ready see test-c-server-bonding.c file.
int WaitForWriteReady(int eid, SRTSOCKET ss)
{
int ready_err[2];
int ready_err_len = 2;
int ready_out[2];
int ready_out_len = 2;

int st = srt_epoll_wait(eid, ready_err, &ready_err_len, ready_out, &ready_out_len, -1,
0, 0, 0, 0);

// Note: with indefinite wait time we can either have a connection reported
// or possibly error. Also srt_epoll_wait never returns 0 - at least the number
// of ready connections is reported or -1 is returned for error, including timeout.
if (st < 1)
{
fprintf(stderr, "srt_epoll_wait: %s\n", srt_getlasterror_str());
return 0;
}

// Check if this was reported as error-ready, in which case it doesn't
// matter if read-ready.
if (ready_err[0] == ss)
{
int reason = srt_getrejectreason(ss);
fprintf(stderr, "srt_epoll_wait: socket @%d reported error reason=%d: %s\n", ss, reason, srt_rejectreason_str(reason));
return 0;
}

return 1;
}

int main(int argc, char** argv)
{
int ss, st;
Expand All @@ -62,26 +99,42 @@ int main(int argc, char** argv)
break;
}

printf("srt startup\n");
srt_startup();

int is_nonblocking = 0;
size_t nmemb = argc - 2;
if (nmemb % 2)
if (nmemb < 2)
{
fprintf(stderr, "Usage error: after <type>, <host> <port> pairs are expected.\n");
fprintf(stderr, "Usage error: no members specified\n");
return 1;
}

if (nmemb % 2)
{
// Last argument is then optionset
--nmemb;
const char* opt = argv[argc-1];
if (strchr(opt, 'n'))
is_nonblocking = 1;
}

nmemb /= 2;

printf("srt startup\n");
srt_startup();

// Declare all variables before any destructive goto.
// In C++ such a code that jumps over initialization would be illegal,
// in C it causes an uninitialized value to be used.
int eid = -1;
int write_modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
SRT_SOCKGROUPDATA* grpdata = NULL;
SRT_SOCKGROUPCONFIG* grpconfig = calloc(nmemb, sizeof (SRT_SOCKGROUPCONFIG));

printf("srt group\n");
ss = srt_create_group(gtype);
if (ss == SRT_ERROR)
{
fprintf(stderr, "srt_create_group: %s\n", srt_getlasterror_str());
return 1;
goto end;
}

const int B = 2;
Expand All @@ -94,12 +147,21 @@ int main(int argc, char** argv)
sa.sin_port = htons(atoi(argv[B + 2*i + 1]));
if (inet_pton(AF_INET, argv[B + 2*i], &sa.sin_addr) != 1)
{
return 1;
fprintf(stderr, "inet_pton: can't resolve address: %s\n", argv[B + 2*i]);
goto end;
}

grpconfig[i] = srt_prepare_endpoint(NULL, (struct sockaddr*)&sa, sizeof sa);
}

if (is_nonblocking)
{
int blockingmode = 0;
srt_setsockflag(ss, SRTO_RCVSYN, &blockingmode, sizeof (blockingmode));
eid = srt_epoll_create();
srt_epoll_add_usock(eid, ss, &write_modes);
}

printf("srt connect (group)\n");

// Note: this function unblocks at the moment when at least one connection
Expand All @@ -109,7 +171,28 @@ int main(int argc, char** argv)
if (st == SRT_ERROR)
{
fprintf(stderr, "srt_connect: %s\n", srt_getlasterror_str());
return 1;
goto end;
}

// In non-blocking mode the srt_connect function returns immediately
// and displays only errors of the initial usage, not runtime errors.
// These could be reported by epoll.
if (is_nonblocking)
{
// WRITE-ready means connected

printf("srt wait for socket reporting connection success\n");
if (!WaitForWriteReady(eid, ss))
goto end;
}

// In non-blocking mode now is the time to possibly change the epoll.
// As this socket will be used for writing, it is in the right mode already.
// Just set the right flag, as for non-blocking connect it needs RCVSYN.
if (is_nonblocking)
{
int blockingmode = 0;
srt_setsockflag(ss, SRTO_SNDSYN, &blockingmode, sizeof (blockingmode));
}

// Important: Normally you need that at least one link is ready for
Expand All @@ -123,7 +206,7 @@ int main(int argc, char** argv)
printf("sleeping 1s to make it probable all links are established\n");
sleep(1);

SRT_SOCKGROUPDATA* grpdata = calloc(nmemb, sizeof (SRT_SOCKGROUPDATA));
grpdata = calloc(nmemb, sizeof (SRT_SOCKGROUPDATA));

for (i = 0; i < 100; i++)
{
Expand All @@ -133,11 +216,18 @@ int main(int argc, char** argv)
mc.grpdata = grpdata;
mc.grpdata_size = nmemb; // Set maximum known

if (is_nonblocking)
{
// Block in epoll as srt_recvmsg2 will not block.
if (!WaitForWriteReady(eid, ss))
goto end;
}

st = srt_sendmsg2(ss, message, sizeof message, &mc);
if (st == SRT_ERROR)
{
fprintf(stderr, "srt_sendmsg: %s\n", srt_getlasterror_str());
return 1;
goto end;
}

// Perform the group check. This can be used to recognize broken connections
Expand All @@ -162,7 +252,11 @@ int main(int argc, char** argv)
usleep(1000); // 1 ms
}


end:
if (eid != -1)
{
srt_epoll_release(eid);
}
printf("srt close\n");
st = srt_close(ss);
if (st == SRT_ERROR)
Expand All @@ -174,6 +268,7 @@ int main(int argc, char** argv)
free(grpdata);
free(grpconfig);

//cleanup:
printf("srt cleanup\n");
srt_cleanup();
return 0;
Expand Down
127 changes: 123 additions & 4 deletions examples/test-c-server-bonding.c
Expand Up @@ -22,6 +22,49 @@

#include "srt.h"

// Note that in this example application there's a listening
// socket, off which then a transmission socket is accepted,
// then this socket will be used for reading. Therefore the same
// function will be used for waiting for the listener to get the
// accepted socket ready and then to wait for read-readiness on
// the transmission socket. For a model of waiting for write-ready
// see test-c-client-bonding.c file.
int WaitForReadReady(int eid, SRTSOCKET ss)
{
int ready_in[2];
int ready_in_len = 2;
int ready_err[2];
int ready_err_len = 2;

int st = srt_epoll_wait(eid, ready_in, &ready_in_len, ready_err, &ready_err_len, -1,
0, 0, 0, 0);

// Note: with indefinite wait time we can either have a connection reported
// or possibly error. Also srt_epoll_wait never returns 0 - at least the number
// of ready connections is reported or -1 is returned for error, including timeout.
if (st < 1)
{
fprintf(stderr, "srt_epoll_wait: %s\n", srt_getlasterror_str());
return 0;
}

// Check if this was reported as error-ready, in which case it doesn't
// matter if read-ready.
if (ready_err[0] == ss)
{
fprintf(stderr, "srt_epoll_wait: socket @%d reported error\n", ss);
return 0;
}

if (ready_in[0] != ss)
{
fprintf(stderr, "srt_epoll_wait: socket @%d not reported ready\n", ss);
return 0;
}

return 1;
}

int main(int argc, char** argv)
{
int globstatus = 0;
Expand All @@ -32,9 +75,10 @@ int main(int argc, char** argv)
struct sockaddr_storage their_addr;
SRT_SOCKGROUPDATA* grpdata = NULL;

if (argc != 3) {
fprintf(stderr, "Usage: %s <host> <port>\n", argv[0]);
return 1;
if (argc < 3 || argc > 4)
{
fprintf(stderr, "Usage: %s <host> <port> [options]\n", argv[0]);
return 1;
}

printf("srt startup\n");
Expand All @@ -51,6 +95,19 @@ int main(int argc, char** argv)
}
// Now that the socket is created, jump to 'end' on error.

// Check options
int is_nonblocking = 0;
SRTSOCKET their_fd = SRT_INVALID_SOCK; // declared early because of gotos
int eid = -1;
int lsn_modes = SRT_EPOLL_IN | SRT_EPOLL_ERR;
int read_modes = lsn_modes;
if (argc > 3)
{
const char* opt = argv[3];
if (strchr(opt, 'n'))
is_nonblocking = 1;
}

printf("srt bind address\n");
if (0 == strcmp(argv[1], "0"))
{
Expand All @@ -77,6 +134,14 @@ int main(int argc, char** argv)
goto end;
}

if (is_nonblocking)
{
int blockingmode = 0;
srt_setsockflag(ss, SRTO_RCVSYN, &blockingmode, sizeof (blockingmode));
eid = srt_epoll_create();
srt_epoll_add_usock(eid, ss, &lsn_modes);
}

printf("srt listen\n");

// We set here 10, just for a case. Every unit in this number
Expand All @@ -91,22 +156,58 @@ int main(int argc, char** argv)
goto end;
}


// In this example, there will be prepared an array of 10 items.
// The listener, however, doesn't know how many member connections
// one bonded connection will contain, so a real application should be
// prepared for dynamically adjusting the array size.
const size_t N = 10;
grpdata = calloc(N, sizeof (SRT_SOCKGROUPDATA));

// In non-blocking mode you can't call srt_accept immediately.
// You must first wait for readiness on the listener socket.
if (is_nonblocking)
{
printf("srt wait for listener socket reporting in a new connection\n");
if (!WaitForReadReady(eid, ss))
goto end;
}

printf("srt accept\n");
int addr_size = sizeof their_addr;
SRTSOCKET their_fd = srt_accept(ss, (struct sockaddr *)&their_addr, &addr_size);
their_fd = srt_accept(ss, (struct sockaddr *)&their_addr, &addr_size);

if (their_fd == -1)
{
fprintf(stderr, "srt_accept: %s\n", srt_getlasterror_str());
goto end;
}

printf("accepted socket: @%d\n", their_fd);

// You never know if `srt_accept` is going to give you a socket or a group.
// You have to check it on your own. The SRTO_GROUPCONNECT flag doesn't disallow
// single socket connections.
int isgroup = their_fd & SRTGROUP_MASK;

if (!isgroup)
{
fprintf(stderr, "srt_accept: Accepted @%d is not a group???\n", their_fd);
goto end;
}

if (is_nonblocking)
{
// NOTE: The SRTO_RCVSYN = false flag will be derived from
// the listener socket and we are going to read, so it matches
// the need. In case when you'd like to write to the accepted
// socket, you'd have to set also SRTO_SNDSYN = false.
srt_epoll_add_usock(eid, their_fd, &read_modes);

// The listener socket is no longer important.
srt_epoll_remove_usock(eid, ss);
}

// Still, use the same procedure for receiving, no matter if
// this is a bonded or single connection.
int i;
Expand All @@ -117,6 +218,14 @@ int main(int argc, char** argv)
SRT_MSGCTRL mc = srt_msgctrl_default;
mc.grpdata = grpdata;
mc.grpdata_size = N;

if (is_nonblocking)
{
// Block in epoll as srt_recvmsg2 will not block.
if (!WaitForReadReady(eid, their_fd))
goto end;
}

st = srt_recvmsg2(their_fd, msg, sizeof msg, &mc);
if (st == SRT_ERROR)
{
Expand Down Expand Up @@ -146,8 +255,18 @@ int main(int argc, char** argv)
}

end:
if (eid != -1)
{
srt_epoll_release(eid);
}
free(grpdata);
printf("srt close\n");
st = srt_close(their_fd); // just for a case; broken socket should be wiped out anyway
if (st == SRT_ERROR)
{
fprintf(stderr, "srt_close: %s\n", srt_getlasterror_str());
// But not matter, we're finishing here.
}
st = srt_close(ss);
if (st == SRT_ERROR)
{
Expand Down

0 comments on commit 5066569

Please sign in to comment.