Skip to content

Commit

Permalink
tcp: add ipv6 async check
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghanchao committed Dec 29, 2019
1 parent e88d564 commit 8e56bf0
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 14 deletions.
162 changes: 155 additions & 7 deletions libavformat/tcp.c
Expand Up @@ -62,8 +62,16 @@ typedef struct TCPContext {
int dash_audio_tcp;
int dash_video_tcp;
int enable_ipv6;
pthread_t ipv6_check_thread;
int ipv6_check_timeout;
struct addrinfo *ipv6_check_ai;
} TCPContext;

#define IPV6_UNKNOWN 0
#define IPV6_CHECKING 1
#define IPV6_FAILED 2
#define IPV6_SUCCESSED 3

#define FAST_OPEN_FLAG 0x20000000

#define OFFSET(x) offsetof(TCPContext, x)
Expand All @@ -87,6 +95,7 @@ static const AVOption options[] = {
{ "dash_audio_tcp", "dash audio tcp", OFFSET(dash_audio_tcp), AV_OPT_TYPE_INT, { .i64 = 0}, 0, 1, .flags = D|E },
{ "dash_video_tcp", "dash video tcp", OFFSET(dash_video_tcp), AV_OPT_TYPE_INT, { .i64 = 0}, 0, 1, .flags = D|E },
{ "enable_ipv6", "priority of use ipv6", OFFSET(enable_ipv6), AV_OPT_TYPE_INT, { .i64 = 1}, 0, 1, .flags = D|E },
{ "ipv6_check_timeout", "set ipv6 check connect timeout (in microseconds) of socket", OFFSET(ipv6_check_timeout), AV_OPT_TYPE_INT, { .i64 = 2000000 }, -1, INT_MAX, .flags = D|E },
{ NULL }
};

Expand All @@ -97,6 +106,13 @@ static const AVClass tcp_class = {
.version = LIBAVUTIL_VERSION_INT,
};

static int gs_ipv6_check_timeout = -1;
static int gs_ipv6_state = IPV6_UNKNOWN;
static pthread_mutex_t gs_check_ipv6_mutex;
static pthread_once_t gs_key_once = PTHREAD_ONCE_INIT;
static int gs_init_mutex_ret = -1;
static int gs_need_tcp_ipv6_check_report = 1;

int ijk_tcp_getaddrinfo_nonblock(const char *hostname, const char *servname,
const struct addrinfo *hints, struct addrinfo **res,
int64_t timeout,
Expand Down Expand Up @@ -345,6 +361,88 @@ int ijk_tcp_getaddrinfo_nonblock(const char *hostname, const char *servname,
}
#endif

static void private_mutex_init(void){
gs_init_mutex_ret = pthread_mutex_init(&gs_check_ipv6_mutex, NULL);
}

static void once_check_ipv6_l(void *arg)
{
URLContext *h = (URLContext *)arg;
TCPContext *s = h->priv_data;
int ret = 0;

if (!s->ipv6_check_ai) {
gs_ipv6_state = IPV6_UNKNOWN;
return;
}
struct addrinfo *cur_ai = s->ipv6_check_ai;

int fd = ff_socket(cur_ai->ai_family,
cur_ai->ai_socktype,
cur_ai->ai_protocol);
if (fd < 0) {
ret = -1;
goto fail;
}

if (s->tcp_nodelay > 0) {
setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &s->tcp_nodelay, sizeof (s->tcp_nodelay));
}

if ((ret = ff_listen_connect(fd, cur_ai->ai_addr, cur_ai->ai_addrlen,
s->ipv6_check_timeout / 1000, h, !!cur_ai->ai_next)) < 0) {
if (ret == AVERROR_EXIT) {
gs_ipv6_state = IPV6_UNKNOWN;
return;
} else {
goto fail;
}
} else {
ret = 0;
}


fail:
if (!ret) {
gs_ipv6_state = IPV6_FAILED;
} else {
gs_ipv6_state = IPV6_SUCCESSED;
}
return;
}

static void *once_check_ipv6(URLContext *h,struct addrinfo *cur_v6_ai)
{
TCPContext *s = h->priv_data;
if (cur_v6_ai->ai_family != AF_INET6) {
return;
}

pthread_mutex_lock(&gs_check_ipv6_mutex);
if (gs_ipv6_state != IPV6_UNKNOWN) {
return NULL;
} else {
gs_ipv6_state = IPV6_CHECKING;
}
struct addrinfo *cur_ai_bak = (struct addrinfo *) av_mallocz(sizeof(struct addrinfo));
memcpy(cur_ai_bak, cur_v6_ai, sizeof(struct addrinfo));
cur_ai_bak->ai_addr = (struct sockaddr_in6 *) av_mallocz(sizeof(struct sockaddr_in6));
if (!cur_ai_bak->ai_addr) {
av_freep(&cur_ai_bak);
gs_ipv6_state = IPV6_UNKNOWN;
} else {
memcpy(cur_ai_bak->ai_addr, cur_v6_ai->ai_addr, sizeof(struct sockaddr_in6));
gs_ipv6_check_timeout = s->ipv6_check_timeout;
s->ipv6_check_ai = cur_ai_bak;
int ret = pthread_create(&s->ipv6_check_thread, NULL, once_check_ipv6_l, h);
if (ret) {
gs_ipv6_state = IPV6_UNKNOWN;
}
}
pthread_mutex_unlock(&gs_check_ipv6_mutex);
return NULL;
}

/* return non zero if error */
static int tcp_open(URLContext *h, const char *uri, int flags)
{
Expand All @@ -365,6 +463,7 @@ static int tcp_open(URLContext *h, const char *uri, int flags)
char ipbuf[MAX_IP_LEN];
struct sockaddr_in *ipaddr;
char *c_ipaddr = NULL;
int orig_ipv6_enable = s->enable_ipv6;

if (s->open_timeout < 0) {
s->open_timeout = 15000000;
Expand Down Expand Up @@ -464,7 +563,13 @@ static int tcp_open(URLContext *h, const char *uri, int flags)
}
dns_time = (av_gettime() - dns_time) / 1000;

while (cur_ai->ai_next && cur_ai->ai_next->ai_addr) {
if (gs_ipv6_state != IPV6_SUCCESSED) {
s->enable_ipv6 = 0;
}

av_log(NULL, AV_LOG_INFO, "s->enable_ipv6 = %d, orig_ipv6_enable = %d\n", s->enable_ipv6, orig_ipv6_enable);

while (!dns_entry && cur_ai->ai_next && cur_ai->ai_next->ai_addr) {
if (cur_ai->ai_family == AF_INET && cur_v4_ai == NULL) {
ipaddr = (struct sockaddr_in *)cur_ai->ai_addr;
c_ipaddr = (char *)inet_ntop(AF_INET, &ipaddr->sin_addr, ipbuf, MAX_IP_LEN);
Expand All @@ -482,6 +587,30 @@ static int tcp_open(URLContext *h, const char *uri, int flags)
cur_ai = cur_ai->ai_next;
}

if (dns_entry) {
if (cur_ai->ai_family == AF_INET && cur_v4_ai == NULL) {
cur_v4_ai = cur_ai;
if (cur_ai->ai_next && cur_ai->ai_next->ai_family == AF_INET6 && cur_v6_ai == NULL) {
cur_v6_ai = cur_ai->ai_next;
}
} else if (cur_ai->ai_family == AF_INET6 && cur_v6_ai == NULL) {
cur_v6_ai = cur_ai;
if (cur_ai->ai_next && cur_ai->ai_next->ai_family == AF_INET && cur_v4_ai == NULL) {
cur_v4_ai = cur_ai->ai_next;
}
}
}


if (orig_ipv6_enable && cur_v6_ai != NULL && gs_ipv6_state == IPV6_UNKNOWN) {
if (gs_init_mutex_ret) {
pthread_once(&gs_key_once, private_mutex_init);
}
if (gs_ipv6_state == IPV6_UNKNOWN && !gs_init_mutex_ret) {
once_check_ipv6(s, cur_v6_ai);
}
}

if ((s->enable_ipv6 || cur_v4_ai == NULL) && cur_v6_ai != NULL) {
cur_ai = cur_v6_ai;
} else if (cur_v4_ai != NULL) {
Expand Down Expand Up @@ -562,24 +691,36 @@ static int tcp_open(URLContext *h, const char *uri, int flags)
goto fail1;
}
tcp_time = av_gettime();
int last_check_ipv6_state = gs_ipv6_state;
if (gs_need_tcp_ipv6_check_report && (last_check_ipv6_state == IPV6_FAILED || last_check_ipv6_state == IPV6_SUCCESSED)) {
gs_need_tcp_ipv6_check_report = 0;
} else {
last_check_ipv6_state = IPV6_UNKNOWN;
}
if ((ret = ff_listen_connect(fd, cur_ai->ai_addr, cur_ai->ai_addrlen,
s->open_timeout / 1000, h, !!cur_ai->ai_next)) < 0) {
if (ret == AVERROR(ETIMEDOUT)) {
ret = AVERROR_TCP_CONNECT_TIMEOUT;
}
if (av_application_on_tcp_did_open(s->app_ctx, ret, fd, &control, s->dash_audio_tcp, cur_ai->ai_family, (av_gettime() - tcp_time) / 1000))
if (av_application_on_tcp_did_open(s->app_ctx, ret, fd, &control, s->dash_audio_tcp, cur_ai->ai_family, last_check_ipv6_state, (av_gettime() - tcp_time) / 1000))
goto fail1;
if (ret == AVERROR_EXIT)
goto fail1;
else
goto fail;
} else {
ret = av_application_on_tcp_did_open(s->app_ctx, 0, fd, &control, s->dash_audio_tcp, cur_ai->ai_family, (av_gettime() - tcp_time) / 1000);
ret = av_application_on_tcp_did_open(s->app_ctx, 0, fd, &control, s->dash_audio_tcp, cur_ai->ai_family, last_check_ipv6_state, (av_gettime() - tcp_time) / 1000);
if (ret) {
av_log(NULL, AV_LOG_WARNING, "terminated by application in AVAPP_CTRL_DID_TCP_OPEN");
goto fail1;
} else if (!dns_entry && !strstr(uri, control.ip) && s->dns_cache_timeout > 0) {
add_dns_cache_entry(uri, cur_ai, s->dns_cache_timeout);
if (cur_ai == cur_v4_ai && cur_v6_ai) {
add_dns_cache_entry(uri, cur_ai, cur_v6_ai, s->dns_cache_timeout);
} else if (cur_ai == cur_v6_ai && cur_v4_ai) {
add_dns_cache_entry(uri, cur_ai, cur_v4_ai, s->dns_cache_timeout);
} else {
add_dns_cache_entry(uri, cur_ai, NULL, s->dns_cache_timeout);
}
av_log(NULL, AV_LOG_INFO, "add dns cache uri = %s, ip = %s port = %d\n", uri , control.ip, control.port);
}
av_log(NULL, AV_LOG_INFO, "tcp did open uri = %s, ip = %s port = %d\n", uri , control.ip, control.port);
Expand Down Expand Up @@ -751,7 +892,7 @@ static int tcp_fast_open(URLContext *h, const char *http_request, const char *ur
if ((ret = ff_sendto(fd, http_request, strlen(http_request), FAST_OPEN_FLAG,
cur_ai->ai_addr, cur_ai->ai_addrlen, s->open_timeout / 1000, h, !!cur_ai->ai_next)) < 0) {
s->fastopen_success = 0;
if (av_application_on_tcp_did_open(s->app_ctx, ret, fd, &control, s->dash_audio_tcp, cur_ai->ai_family, 0))
if (av_application_on_tcp_did_open(s->app_ctx, ret, fd, &control, s->dash_audio_tcp, cur_ai->ai_family, IPV6_UNKNOWN, 0))
goto fail1;
if (ret == AVERROR_EXIT)
goto fail1;
Expand All @@ -763,12 +904,12 @@ static int tcp_fast_open(URLContext *h, const char *http_request, const char *ur
} else {
s->fastopen_success = 1;
}
ret = av_application_on_tcp_did_open(s->app_ctx, 0, fd, &control, s->dash_audio_tcp, cur_ai->ai_family, 0);
ret = av_application_on_tcp_did_open(s->app_ctx, 0, fd, &control, s->dash_audio_tcp, cur_ai->ai_family, IPV6_UNKNOWN, 0);
if (ret) {
av_log(NULL, AV_LOG_WARNING, "terminated by application in AVAPP_CTRL_DID_TCP_OPEN");
goto fail1;
} else if (!dns_entry && !strstr(uri, control.ip) && s->dns_cache_timeout > 0) {
add_dns_cache_entry(uri, cur_ai, s->dns_cache_timeout);
add_dns_cache_entry(uri, cur_ai, NULL, s->dns_cache_timeout);
av_log(NULL, AV_LOG_INFO, "add dns cache uri = %s, ip = %s\n", uri , control.ip);
}
av_log(NULL, AV_LOG_INFO, "tcp did open uri = %s, ip = %s\n", uri , control.ip);
Expand Down Expand Up @@ -932,6 +1073,13 @@ static int tcp_shutdown(URLContext *h, int flags)
static int tcp_close(URLContext *h)
{
TCPContext *s = h->priv_data;
pthread_join(s->ipv6_check_thread, NULL);
if (s->ipv6_check_ai) {
if (s->ipv6_check_ai->ai_addr) {
av_freep(&s->ipv6_check_ai->ai_addr);
}
av_freep(&s->ipv6_check_ai);
}
closesocket(s->fd);
return 0;
}
Expand Down
3 changes: 2 additions & 1 deletion libavutil/application.c
Expand Up @@ -171,7 +171,7 @@ int av_application_on_tcp_will_open(AVApplicationContext *h, int ai_family)
}

// only callback returns error
int av_application_on_tcp_did_open(AVApplicationContext *h, int error, int fd, AVAppTcpIOControl *control, int is_audio, int ai_family, int64_t duration)
int av_application_on_tcp_did_open(AVApplicationContext *h, int error, int fd, AVAppTcpIOControl *control, int is_audio, int ai_family, int ipv6_state, int64_t duration)
{
struct sockaddr_storage so_stg;
int ret = 0;
Expand All @@ -189,6 +189,7 @@ int av_application_on_tcp_did_open(AVApplicationContext *h, int error, int fd, A
} else if (ai_family == AF_INET6) {
control->family = WRAP_INET6_FAMILY;
}
control->ipv6_state = ipv6_state;

if (fd <= 0) {
control->error = error;
Expand Down
3 changes: 2 additions & 1 deletion libavutil/application.h
Expand Up @@ -104,6 +104,7 @@ typedef struct AVAppTcpIOControl {
int fd;
int is_audio;
int64_t duration;
int ipv6_state;
} AVAppTcpIOControl;

typedef struct AVAppAsyncStatistic {
Expand Down Expand Up @@ -255,7 +256,7 @@ void av_application_did_io_tcp_read(AVApplicationContext *h, void *obj, int byte
int av_application_on_io_control(AVApplicationContext *h, int event_type, AVAppIOControl *control);

int av_application_on_tcp_will_open(AVApplicationContext *h, int ai_family);
int av_application_on_tcp_did_open(AVApplicationContext *h, int error, int fd, AVAppTcpIOControl *control, int is_audio, int ai_family, int64_t duration);
int av_application_on_tcp_did_open(AVApplicationContext *h, int error, int fd, AVAppTcpIOControl *control, int is_audio, int ai_family, int ipv6_state, int64_t duration);

void av_application_on_async_statistic(AVApplicationContext *h, AVAppAsyncStatistic *statistic);
void av_application_on_async_read_speed(AVApplicationContext *h, AVAppAsyncReadSpeed *speed);
Expand Down
37 changes: 33 additions & 4 deletions libavutil/dns_cache.c
Expand Up @@ -48,6 +48,14 @@ static void inner_init(void) {

static void free_private_addrinfo(struct addrinfo **p_ai) {
struct addrinfo *ai = *p_ai;
struct addrinfo *next_ai = ai->ai_next;

if(next_ai) {
if (next_ai->ai_addr) {
av_freep(&next_ai->ai_addr);
}
av_freep(&next_ai);
}

if (ai) {
if (ai->ai_addr) {
Expand All @@ -71,7 +79,7 @@ static int inner_remove_dns_cache(const char *uri, DnsCacheEntry *dns_cache_entr
return 0;
}

static DnsCacheEntry *new_dns_cache_entry(const char *uri, struct addrinfo *cur_ai, int64_t timeout) {
static DnsCacheEntry *new_dns_cache_entry(const char *uri, struct addrinfo *cur_ai, struct addrinfo *next_ai, int64_t timeout) {
DnsCacheEntry *new_entry = NULL;
int64_t cur_time = av_gettime_relative();

Expand Down Expand Up @@ -109,8 +117,29 @@ static DnsCacheEntry *new_dns_cache_entry(const char *uri, struct addrinfo *cur_
memcpy(new_entry->res->ai_addr, cur_ai->ai_addr, sizeof(struct sockaddr));
}

if (next_ai) {
struct addrinfo *new_ai_next = (struct addrinfo *) av_mallocz(sizeof(struct addrinfo));
if (new_ai_next) {
memcpy(new_ai_next, next_ai, sizeof(struct addrinfo));
if (new_ai_next->ai_family == AF_INET6) {
new_ai_next->ai_addr = (struct sockaddr_in6 *) av_mallocz(sizeof(struct sockaddr_in6));
} else {
new_ai_next->ai_addr = (struct sockaddr *) av_mallocz(sizeof(struct sockaddr));
}
if (!new_ai_next->ai_addr) {
av_freep(&new_ai_next);
} else {
if (new_ai_next->ai_family == AF_INET6) {
memcpy(new_ai_next->ai_addr, next_ai->ai_addr, sizeof(struct sockaddr_in6));
} else {
memcpy(new_ai_next->ai_addr, next_ai->ai_addr, sizeof(struct sockaddr));
}
new_entry->res->ai_next = new_ai_next;
}
}
}

new_entry->res->ai_canonname = NULL;
new_entry->res->ai_next = NULL;
new_entry->ref_count = 0;
new_entry->delete_flag = 0;
new_entry->expired_time = cur_time + timeout * 1000;
Expand Down Expand Up @@ -193,7 +222,7 @@ int remove_dns_cache_entry(const char *uri) {
return 0;
}

int add_dns_cache_entry(const char *uri, struct addrinfo *cur_ai, int64_t timeout) {
int add_dns_cache_entry(const char *uri, struct addrinfo *cur_ai, struct addrinfo *next_ai, int64_t timeout) {
DnsCacheEntry *new_entry = NULL;
DnsCacheEntry *old_entry = NULL;
AVDictionaryEntry *elem = NULL;
Expand All @@ -216,7 +245,7 @@ int add_dns_cache_entry(const char *uri, struct addrinfo *cur_ai, int64_t timeou
goto fail;
}
}
new_entry = new_dns_cache_entry(uri, cur_ai, timeout);
new_entry = new_dns_cache_entry(uri, cur_ai, next_ai, timeout);
if (new_entry) {
av_dict_set_int(&context->dns_dictionary, uri, (int64_t) (intptr_t) new_entry, 0);
}
Expand Down
2 changes: 1 addition & 1 deletion libavutil/dns_cache.h
Expand Up @@ -33,7 +33,7 @@ typedef struct DnsCacheEntry {
DnsCacheEntry *get_dns_cache_reference(const char *uri);
int release_dns_cache_reference(const char *uri, DnsCacheEntry **p_entry);
int remove_dns_cache_entry(const char *uri);
int add_dns_cache_entry(const char *uri, struct addrinfo *cur_ai, int64_t timeout);
int add_dns_cache_entry(const char *uri, struct addrinfo *cur_ai, struct addrinfo *next_ai, int64_t timeout);
int remove_all_dns_cache_entry(void);


Expand Down

0 comments on commit 8e56bf0

Please sign in to comment.