Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINT] Added non-blocking option for bonding examples #1746

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
117 changes: 106 additions & 11 deletions examples/test-c-client-bonding.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,43 @@ struct

#define SIZE(array) (sizeof array/sizeof(array[0]))

// Note that in this example application there's a socket
// used first to connect to the service and then it will be
// used for writing. Therefore the same function will be used
// for waiting for the socket to be connected and then to wait
// for write-ready on the socket used for transmission. For a
// model of waiting for read-ready see test-c-server-bonding.c file.
int WaitForWriteReady(int eid, SRTSOCKET ss)
{
int ready_err[2];
int ready_err_len = 2;
int ready_out[2];
int ready_out_len = 2;

int st = srt_epoll_wait(eid, ready_err, &ready_err_len, ready_out, &ready_out_len, -1,
0, 0, 0, 0);

// Note: with indefinite wait time we can either have a connection reported
// or possibly error. Also srt_epoll_wait never returns 0 - at least the number
// of ready connections is reported or -1 is returned for error, including timeout.
if (st < 1)
{
fprintf(stderr, "srt_epoll_wait: %s\n", srt_getlasterror_str());
return 0;
}

// Check if this was reported as error-ready, in which case it doesn't
// matter if read-ready.
if (ready_err[0] == ss)
{
int reason = srt_getrejectreason(ss);
fprintf(stderr, "srt_epoll_wait: socket @%d reported error reason=%d: %s\n", ss, reason, srt_rejectreason_str(reason));
return 0;
}

return 1;
}

int main(int argc, char** argv)
{
int ss, st;
Expand All @@ -62,26 +99,42 @@ int main(int argc, char** argv)
break;
}

printf("srt startup\n");
srt_startup();

int is_nonblocking = 0;
size_t nmemb = argc - 2;
if (nmemb % 2)
if (nmemb < 2)
{
fprintf(stderr, "Usage error: after <type>, <host> <port> pairs are expected.\n");
fprintf(stderr, "Usage error: no members specified\n");
return 1;
}

if (nmemb % 2)
{
// Last argument is then optionset
--nmemb;
const char* opt = argv[argc-1];
if (strchr(opt, 'n'))
is_nonblocking = 1;
}

nmemb /= 2;

printf("srt startup\n");
srt_startup();

// Declare all variables before any destructive goto.
// In C++ such a code that jumps over initialization would be illegal,
// in C it causes an uninitialized value to be used.
int eid = -1;
int write_modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
SRT_SOCKGROUPDATA* grpdata = NULL;
SRT_SOCKGROUPCONFIG* grpconfig = calloc(nmemb, sizeof (SRT_SOCKGROUPCONFIG));

printf("srt group\n");
ss = srt_create_group(gtype);
if (ss == SRT_ERROR)
{
fprintf(stderr, "srt_create_group: %s\n", srt_getlasterror_str());
return 1;
goto end;
}

const int B = 2;
Expand All @@ -94,12 +147,21 @@ int main(int argc, char** argv)
sa.sin_port = htons(atoi(argv[B + 2*i + 1]));
if (inet_pton(AF_INET, argv[B + 2*i], &sa.sin_addr) != 1)
{
return 1;
fprintf(stderr, "inet_pton: can't resolve address: %s\n", argv[B + 2*i]);
goto end;
}

grpconfig[i] = srt_prepare_endpoint(NULL, (struct sockaddr*)&sa, sizeof sa);
}

if (is_nonblocking)
{
int blockingmode = 0;
srt_setsockflag(ss, SRTO_RCVSYN, &blockingmode, sizeof (blockingmode));
eid = srt_epoll_create();
srt_epoll_add_usock(eid, ss, &write_modes);
}

printf("srt connect (group)\n");

// Note: this function unblocks at the moment when at least one connection
Expand All @@ -109,7 +171,28 @@ int main(int argc, char** argv)
if (st == SRT_ERROR)
{
fprintf(stderr, "srt_connect: %s\n", srt_getlasterror_str());
return 1;
goto end;
}

// In non-blocking mode the srt_connect function returns immediately
// and displays only errors of the initial usage, not runtime errors.
// These could be reported by epoll.
if (is_nonblocking)
{
// WRITE-ready means connected

printf("srt wait for socket reporting connection success\n");
if (!WaitForWriteReady(eid, ss))
goto end;
}

// In non-blocking mode now is the time to possibly change the epoll.
// As this socket will be used for writing, it is in the right mode already.
// Just set the right flag, as for non-blocking connect it needs RCVSYN.
if (is_nonblocking)
{
int blockingmode = 0;
srt_setsockflag(ss, SRTO_SNDSYN, &blockingmode, sizeof (blockingmode));
}

// Important: Normally you need that at least one link is ready for
Expand All @@ -123,7 +206,7 @@ int main(int argc, char** argv)
printf("sleeping 1s to make it probable all links are established\n");
sleep(1);

SRT_SOCKGROUPDATA* grpdata = calloc(nmemb, sizeof (SRT_SOCKGROUPDATA));
grpdata = calloc(nmemb, sizeof (SRT_SOCKGROUPDATA));

for (i = 0; i < 100; i++)
{
Expand All @@ -133,11 +216,18 @@ int main(int argc, char** argv)
mc.grpdata = grpdata;
mc.grpdata_size = nmemb; // Set maximum known

if (is_nonblocking)
{
// Block in epoll as srt_recvmsg2 will not block.
if (!WaitForWriteReady(eid, ss))
goto end;
}

st = srt_sendmsg2(ss, message, sizeof message, &mc);
if (st == SRT_ERROR)
{
fprintf(stderr, "srt_sendmsg: %s\n", srt_getlasterror_str());
return 1;
goto end;
}

// Perform the group check. This can be used to recognize broken connections
Expand All @@ -162,7 +252,11 @@ int main(int argc, char** argv)
usleep(1000); // 1 ms
}


end:
if (eid != -1)
{
srt_epoll_release(eid);
}
printf("srt close\n");
st = srt_close(ss);
if (st == SRT_ERROR)
Expand All @@ -174,6 +268,7 @@ int main(int argc, char** argv)
free(grpdata);
free(grpconfig);

//cleanup:
printf("srt cleanup\n");
srt_cleanup();
return 0;
Expand Down
127 changes: 123 additions & 4 deletions examples/test-c-server-bonding.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,49 @@

#include "srt.h"

// Note that in this example application there's a listening
// socket, off which then a transmission socket is accepted,
// then this socket will be used for reading. Therefore the same
// function will be used for waiting for the listener to get the
// accepted socket ready and then to wait for read-readiness on
// the transmission socket. For a model of waiting for write-ready
// see test-c-client-bonding.c file.
int WaitForReadReady(int eid, SRTSOCKET ss)
{
int ready_in[2];
int ready_in_len = 2;
int ready_err[2];
int ready_err_len = 2;

int st = srt_epoll_wait(eid, ready_in, &ready_in_len, ready_err, &ready_err_len, -1,
0, 0, 0, 0);

// Note: with indefinite wait time we can either have a connection reported
// or possibly error. Also srt_epoll_wait never returns 0 - at least the number
// of ready connections is reported or -1 is returned for error, including timeout.
if (st < 1)
{
fprintf(stderr, "srt_epoll_wait: %s\n", srt_getlasterror_str());
return 0;
}

// Check if this was reported as error-ready, in which case it doesn't
// matter if read-ready.
if (ready_err[0] == ss)
{
fprintf(stderr, "srt_epoll_wait: socket @%d reported error\n", ss);
return 0;
}

if (ready_in[0] != ss)
{
fprintf(stderr, "srt_epoll_wait: socket @%d not reported ready\n", ss);
return 0;
}

return 1;
}

int main(int argc, char** argv)
{
int globstatus = 0;
Expand All @@ -32,9 +75,10 @@ int main(int argc, char** argv)
struct sockaddr_storage their_addr;
SRT_SOCKGROUPDATA* grpdata = NULL;

if (argc != 3) {
fprintf(stderr, "Usage: %s <host> <port>\n", argv[0]);
return 1;
if (argc < 3 || argc > 4)
{
fprintf(stderr, "Usage: %s <host> <port> [options]\n", argv[0]);
return 1;
}

printf("srt startup\n");
Expand All @@ -51,6 +95,19 @@ int main(int argc, char** argv)
}
// Now that the socket is created, jump to 'end' on error.

// Check options
int is_nonblocking = 0;
SRTSOCKET their_fd = SRT_INVALID_SOCK; // declared early because of gotos
int eid = -1;
int lsn_modes = SRT_EPOLL_IN | SRT_EPOLL_ERR;
int read_modes = lsn_modes;
if (argc > 3)
{
const char* opt = argv[3];
if (strchr(opt, 'n'))
is_nonblocking = 1;
}

printf("srt bind address\n");
if (0 == strcmp(argv[1], "0"))
{
Expand All @@ -77,6 +134,14 @@ int main(int argc, char** argv)
goto end;
}

if (is_nonblocking)
{
int blockingmode = 0;
srt_setsockflag(ss, SRTO_RCVSYN, &blockingmode, sizeof (blockingmode));
eid = srt_epoll_create();
srt_epoll_add_usock(eid, ss, &lsn_modes);
}

printf("srt listen\n");

// We set here 10, just for a case. Every unit in this number
Expand All @@ -91,22 +156,58 @@ int main(int argc, char** argv)
goto end;
}


// In this example, there will be prepared an array of 10 items.
// The listener, however, doesn't know how many member connections
// one bonded connection will contain, so a real application should be
// prepared for dynamically adjusting the array size.
const size_t N = 10;
grpdata = calloc(N, sizeof (SRT_SOCKGROUPDATA));

// In non-blocking mode you can't call srt_accept immediately.
// You must first wait for readiness on the listener socket.
if (is_nonblocking)
{
printf("srt wait for listener socket reporting in a new connection\n");
if (!WaitForReadReady(eid, ss))
goto end;
}

printf("srt accept\n");
int addr_size = sizeof their_addr;
SRTSOCKET their_fd = srt_accept(ss, (struct sockaddr *)&their_addr, &addr_size);
their_fd = srt_accept(ss, (struct sockaddr *)&their_addr, &addr_size);

if (their_fd == -1)
{
fprintf(stderr, "srt_accept: %s\n", srt_getlasterror_str());
goto end;
}

printf("accepted socket: @%d\n", their_fd);

// You never know if `srt_accept` is going to give you a socket or a group.
// You have to check it on your own. The SRTO_GROUPCONNECT flag doesn't disallow
// single socket connections.
int isgroup = their_fd & SRTGROUP_MASK;

if (!isgroup)
{
fprintf(stderr, "srt_accept: Accepted @%d is not a group???\n", their_fd);
goto end;
}

if (is_nonblocking)
{
// NOTE: The SRTO_RCVSYN = false flag will be derived from
// the listener socket and we are going to read, so it matches
// the need. In case when you'd like to write to the accepted
// socket, you'd have to set also SRTO_SNDSYN = false.
srt_epoll_add_usock(eid, their_fd, &read_modes);

// The listener socket is no longer important.
srt_epoll_remove_usock(eid, ss);
}

// Still, use the same procedure for receiving, no matter if
// this is a bonded or single connection.
int i;
Expand All @@ -117,6 +218,14 @@ int main(int argc, char** argv)
SRT_MSGCTRL mc = srt_msgctrl_default;
mc.grpdata = grpdata;
mc.grpdata_size = N;

if (is_nonblocking)
{
// Block in epoll as srt_recvmsg2 will not block.
if (!WaitForReadReady(eid, their_fd))
goto end;
}

st = srt_recvmsg2(their_fd, msg, sizeof msg, &mc);
if (st == SRT_ERROR)
{
Expand Down Expand Up @@ -146,8 +255,18 @@ int main(int argc, char** argv)
}

end:
if (eid != -1)
{
srt_epoll_release(eid);
}
free(grpdata);
printf("srt close\n");
st = srt_close(their_fd); // just for a case; broken socket should be wiped out anyway
if (st == SRT_ERROR)
{
fprintf(stderr, "srt_close: %s\n", srt_getlasterror_str());
// But not matter, we're finishing here.
}
st = srt_close(ss);
if (st == SRT_ERROR)
{
Expand Down