Skip to content

Commit

Permalink
Three fixes in one commit (sorry): a) Take care of the tcpbuf if it e…
Browse files Browse the repository at this point in the history
…nds while queued for transmission, note broken servers and close them in the main loop, and store TCP socket generation number in order not to send the same query twice over the same socket.
  • Loading branch information
sesse committed Sep 28, 2007
1 parent a6a159d commit b669e17
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 16 deletions.
6 changes: 6 additions & 0 deletions ares__close_sockets.c
Expand Up @@ -35,6 +35,8 @@ void ares__close_sockets(ares_channel channel, struct server_state *server)
/* Advance server->qhead; pull out query as we go. */
sendreq = server->qhead;
server->qhead = sendreq->next;
if (sendreq->data_storage != NULL)
free(sendreq->data_storage);
free(sendreq);
}
server->qtail = NULL;
Expand All @@ -45,12 +47,16 @@ void ares__close_sockets(ares_channel channel, struct server_state *server)
server->tcp_buffer = NULL;
server->tcp_lenbuf_pos = 0;

/* Reset brokenness */
server->is_broken = 0;

/* Close the TCP and UDP sockets. */
if (server->tcp_socket != ARES_SOCKET_BAD)
{
SOCK_STATE_CALLBACK(channel, server->tcp_socket, 0, 0);
closesocket(server->tcp_socket);
server->tcp_socket = ARES_SOCKET_BAD;
server->tcp_connection_generation = ++channel->tcp_connection_generation;
}
if (server->udp_socket != ARES_SOCKET_BAD)
{
Expand Down
2 changes: 1 addition & 1 deletion ares_cancel.c
Expand Up @@ -33,7 +33,7 @@ void ares_cancel(ares_channel channel)
next = query->next;
query->callback(query->arg, ARES_ETIMEOUT, NULL, 0);
free(query->tcpbuf);
free(query->skip_server);
free(query->server_info);
free(query);
}
channel->queries = NULL;
Expand Down
4 changes: 2 additions & 2 deletions ares_destroy.c
Expand Up @@ -65,8 +65,8 @@ void ares_destroy(ares_channel channel)
query->callback(query->arg, ARES_EDESTRUCTION, NULL, 0);
if (query->tcpbuf)
free(query->tcpbuf);
if (query->skip_server)
free(query->skip_server);
if (query->server_info)
free(query->server_info);
free(query);
}

Expand Down
3 changes: 3 additions & 0 deletions ares_init.c
Expand Up @@ -136,6 +136,7 @@ int ares_init_options(ares_channel *channelptr, struct ares_options *options,
channel->nservers = -1;
channel->ndomains = -1;
channel->nsort = -1;
channel->tcp_connection_generation = 0;
channel->lookups = NULL;
channel->queries = NULL;
channel->domains = NULL;
Expand Down Expand Up @@ -201,10 +202,12 @@ int ares_init_options(ares_channel *channelptr, struct ares_options *options,
server = &channel->servers[i];
server->udp_socket = ARES_SOCKET_BAD;
server->tcp_socket = ARES_SOCKET_BAD;
server->tcp_connection_generation = ++channel->tcp_connection_generation;
server->tcp_lenbuf_pos = 0;
server->tcp_buffer = NULL;
server->qhead = NULL;
server->qtail = NULL;
server->is_broken = 0;
}

init_id_key(&channel->id_key, ARES_ID_KEY_LEN);
Expand Down
28 changes: 26 additions & 2 deletions ares_private.h
Expand Up @@ -89,6 +89,11 @@ struct send_request {
const unsigned char *data;
size_t len;

/* The query for which we're sending this data */
struct query* owner_query;
/* The buffer we're using, if we have our own copy of the packet */
unsigned char *data_storage;

/* Next request in queue */
struct send_request *next;
};
Expand All @@ -110,6 +115,17 @@ struct server_state {
/* TCP output queue */
struct send_request *qhead;
struct send_request *qtail;

/* Which incarnation of this connection is this? We don't want to
* retransmit requests into the very same socket, but if the server
* closes on us and we re-open the connection, then we do want to
* re-send. */
int tcp_connection_generation;

/* Is this server broken? We mark connections as broken when a
* request that is queued for sending times out.
*/
int is_broken;
};

struct query {
Expand All @@ -130,14 +146,20 @@ struct query {
/* Query status */
int try;
int server;
int *skip_server;
struct query_server_info *server_info; /* per-server state */
int using_tcp;
int error_status;

/* Next query in chain */
struct query *next;
};

/* Per-server state for a query */
struct query_server_info {
int skip_server; /* should we skip server, due to errors, etc? */
int tcp_connection_generation; /* into which TCP connection did we send? */
};

/* An IP address pattern; matches an IP address X if X & mask == addr */
#define PATTERN_MASK 0x1
#define PATTERN_CIDR 0x2
Expand Down Expand Up @@ -188,6 +210,9 @@ struct ares_channeldata {
/* key to use when generating new ids */
rc4_key id_key;

/* Generation number to use for the next TCP socket open/close */
int tcp_connection_generation;

/* Active queries */
struct query *queries;

Expand Down Expand Up @@ -220,4 +245,3 @@ short ares__generate_new_id(rc4_key* key);
#endif

#endif /* __ARES_PRIVATE_H */

113 changes: 105 additions & 8 deletions ares_process.c
Expand Up @@ -62,6 +62,7 @@ static void read_tcp_data(ares_channel channel, fd_set *read_fds,
static void read_udp_packets(ares_channel channel, fd_set *read_fds,
ares_socket_t read_fd, time_t now);
static void process_timeouts(ares_channel channel, time_t now);
static void process_broken_connections(ares_channel channel, time_t now);
static void process_answer(ares_channel channel, unsigned char *abuf,
int alen, int whichserver, int tcp, time_t now);
static void handle_error(ares_channel channel, int whichserver, time_t now);
Expand All @@ -87,6 +88,7 @@ void ares_process(ares_channel channel, fd_set *read_fds, fd_set *write_fds)
read_tcp_data(channel, read_fds, ARES_SOCKET_BAD, now);
read_udp_packets(channel, read_fds, ARES_SOCKET_BAD, now);
process_timeouts(channel, now);
process_broken_connections(channel, now);
}

/* Something interesting happened on the wire, or there was a timeout.
Expand Down Expand Up @@ -157,7 +159,7 @@ static void write_tcp_data(ares_channel channel,
/* Make sure server has data to send and is selected in write_fds or
write_fd. */
server = &channel->servers[i];
if (!server->qhead || server->tcp_socket == ARES_SOCKET_BAD)
if (!server->qhead || server->tcp_socket == ARES_SOCKET_BAD || server->is_broken)
continue;

if(write_fds) {
Expand Down Expand Up @@ -216,6 +218,8 @@ static void write_tcp_data(ares_channel channel,
SOCK_STATE_CALLBACK(channel, server->tcp_socket, 1, 0);
server->qtail = NULL;
}
if (sendreq->data_storage != NULL)
free(sendreq->data_storage);
free(sendreq);
}
else
Expand Down Expand Up @@ -248,6 +252,8 @@ static void write_tcp_data(ares_channel channel,
SOCK_STATE_CALLBACK(channel, server->tcp_socket, 1, 0);
server->qtail = NULL;
}
if (sendreq->data_storage != NULL)
free(sendreq->data_storage);
free(sendreq);
}
else
Expand Down Expand Up @@ -278,7 +284,7 @@ static void read_tcp_data(ares_channel channel, fd_set *read_fds,
{
/* Make sure the server has a socket and is selected in read_fds. */
server = &channel->servers[i];
if (server->tcp_socket == ARES_SOCKET_BAD)
if (server->tcp_socket == ARES_SOCKET_BAD || server->is_broken)
continue;

if(read_fds) {
Expand Down Expand Up @@ -376,7 +382,7 @@ static void read_udp_packets(ares_channel channel, fd_set *read_fds,
/* Make sure the server has a socket and is selected in read_fds. */
server = &channel->servers[i];

if (server->udp_socket == ARES_SOCKET_BAD)
if (server->udp_socket == ARES_SOCKET_BAD || server->is_broken)
continue;

if(read_fds) {
Expand Down Expand Up @@ -492,6 +498,20 @@ static void process_answer(ares_channel channel, unsigned char *abuf,
end_query(channel, query, ARES_SUCCESS, abuf, alen);
}

/* Close all the connections that are no longer usable. */
static void process_broken_connections(ares_channel channel, time_t now)
{
int i;
for (i = 0; i < channel->nservers; i++)
{
struct server_state *server = &channel->servers[i];
if (server->is_broken)
{
handle_error(channel, i, now);
}
}
}

static void handle_error(ares_channel channel, int whichserver, time_t now)
{
struct query *query, *next;
Expand Down Expand Up @@ -526,7 +546,7 @@ static void skip_server(ares_channel channel, struct query *query,
*/
if (channel->nservers > 1)
{
query->skip_server[whichserver] = 1;
query->server_info[whichserver].skip_server = 1;
}
}

Expand All @@ -538,10 +558,21 @@ static struct query *next_server(ares_channel channel, struct query *query, time
{
for (; query->server < channel->nservers; query->server++)
{
if (!query->skip_server[query->server])
struct server_state *server = &channel->servers[query->server];
/* We don't want to use this server if (1) we decided this
* connection is broken, and thus about to be closed, (2)
* we've decided to skip this server because of earlier
* errors we encountered, or (3) we already sent this query
* over this exact connection.
*/
if (!server->is_broken &&
!query->server_info[query->server].skip_server &&
!(query->using_tcp &&
(query->server_info[query->server].tcp_connection_generation ==
server->tcp_connection_generation)))
{
ares__send_query(channel, query, now);
return (query->next);
ares__send_query(channel, query, now);
return (query->next);
}
}
query->server = 0;
Expand Down Expand Up @@ -582,8 +613,16 @@ void ares__send_query(ares_channel channel, struct query *query, time_t now)
end_query(channel, query, ARES_ENOMEM, NULL, 0);
return;
}
/* To make the common case fast, we avoid copies by using the
* query's tcpbuf for as long as the query is alive. In the rare
* case where the query ends while it's queued for transmission,
* then we give the sendreq its own copy of the request packet
* and put it in sendreq->data_storage.
*/
sendreq->data_storage = NULL;
sendreq->data = query->tcpbuf;
sendreq->len = query->tcplen;
sendreq->owner_query = query;
sendreq->next = NULL;
if (server->qtail)
server->qtail->next = sendreq;
Expand All @@ -594,6 +633,8 @@ void ares__send_query(ares_channel channel, struct query *query, time_t now)
}
server->qtail = sendreq;
query->timeout = 0;
query->server_info[query->server].tcp_connection_generation =
server->tcp_connection_generation;
}
else
{
Expand Down Expand Up @@ -721,6 +762,7 @@ static int open_tcp_socket(ares_channel channel, struct server_state *server)
SOCK_STATE_CALLBACK(channel, s, 1, 0);
server->tcp_buffer_pos = 0;
server->tcp_socket = s;
server->tcp_connection_generation = ++channel->tcp_connection_generation;
return 0;
}

Expand Down Expand Up @@ -839,6 +881,61 @@ static struct query *end_query (ares_channel channel, struct query *query, int s
struct query **q, *next;
int i;

/* First we check to see if this query ended while one of our send
* queues still has pointers to it.
*/
for (i = 0; i < channel->nservers; i++)
{
struct server_state *server = &channel->servers[i];
struct send_request *sendreq;
for (sendreq = server->qhead; sendreq; sendreq = sendreq->next)
if (sendreq->owner_query == query)
{
sendreq->owner_query = NULL;
assert(sendreq->data_storage == NULL);
if (status == ARES_SUCCESS)
{
/* We got a reply for this query, but this queued
* sendreq points into this soon-to-be-gone query's
* tcpbuf. Probably this means we timed out and queued
* the query for retransmission, then received a
* response before actually retransmitting. This is
* perfectly fine, so we want to keep the connection
* running smoothly if we can. But in the worst case
* we may have sent only some prefix of the query,
* with some suffix of the query left to send. Also,
* the buffer may be queued on multiple queues. To
* prevent dangling pointers to the query's tcpbuf and
* handle these cases, we just give such sendreqs
* their own copy of the query packet.
*/
sendreq->data_storage = malloc(sendreq->len);
if (sendreq->data_storage != NULL)
{
memcpy(sendreq->data_storage, sendreq->data, sendreq->len);
sendreq->data = sendreq->data_storage;
}
}
if ((status != ARES_SUCCESS) || (sendreq->data_storage == NULL))
{
/* We encountered an error (probably a timeout,
* suggesting the DNS server we're talking to is
* probably unreachable, wedged, or severely
* overloaded) or we couldn't copy the request, so
* mark the connection as broken. When we get to
* process_broken_connections() we'll close the
* connection and try to re-send requests to another
* server.
*/
server->is_broken = 1;
/* Just to be paranoid, zero out this sendreq... */
sendreq->data = NULL;
sendreq->len = 0;
}
}
}

/* Invoke the callback */
query->callback(query->arg, status, abuf, alen);
for (q = &channel->queries; *q; q = &(*q)->next)
{
Expand All @@ -851,7 +948,7 @@ static struct query *end_query (ares_channel channel, struct query *query, int s
else
next = NULL;
free(query->tcpbuf);
free(query->skip_server);
free(query->server_info);
free(query);

/* Simple cleanup policy: if no queries are remaining, close all
Expand Down
10 changes: 7 additions & 3 deletions ares_send.c
Expand Up @@ -62,8 +62,9 @@ void ares_send(ares_channel channel, const unsigned char *qbuf, int qlen,
callback(arg, ARES_ENOMEM, NULL, 0);
return;
}
query->skip_server = malloc(channel->nservers * sizeof(int));
if (!query->skip_server)
query->server_info = malloc(channel->nservers *
sizeof(query->server_info[0]));
if (!query->server_info)
{
free(query->tcpbuf);
free(query);
Expand Down Expand Up @@ -93,7 +94,10 @@ void ares_send(ares_channel channel, const unsigned char *qbuf, int qlen,
query->try = 0;
query->server = 0;
for (i = 0; i < channel->nservers; i++)
query->skip_server[i] = 0;
{
query->server_info[i].skip_server = 0;
query->server_info[i].tcp_connection_generation = 0;
}
query->using_tcp = (channel->flags & ARES_FLAG_USEVC) || qlen > PACKETSZ;
query->error_status = ARES_ECONNREFUSED;

Expand Down

0 comments on commit b669e17

Please sign in to comment.