Skip to content

Commit

Permalink
lib-http: client: Link peers to queue earlier: during connection atte…
Browse files Browse the repository at this point in the history
…mpts.

This makes sure that queues keep track of which peers are doing stuff on its behalf.
This is important to be able to manage the active peers when a new host name lookup was performed; if a peer is no longer listed in the returned IPs it should be dropped.
  • Loading branch information
stephanbosch authored and sirainen committed Nov 9, 2016
1 parent 3db76db commit 6d7bf6d
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 32 deletions.
3 changes: 3 additions & 0 deletions src/lib-http/http-client-private.h
Expand Up @@ -236,6 +236,9 @@ struct http_client_queue {
this can be more than one when soft connect timeouts are enabled */
ARRAY_TYPE(http_client_peer) pending_peers;

/* currently active peer */
struct http_client_peer *cur_peer;

/* all requests associated to this queue
(ordered by earliest timeout first) */
ARRAY_TYPE(http_client_request) requests;
Expand Down
156 changes: 124 additions & 32 deletions src/lib-http/http-client-queue.c
Expand Up @@ -122,18 +122,41 @@ http_client_queue_create(struct http_client_host *host,

void http_client_queue_free(struct http_client_queue *queue)
{
struct http_client_peer *const *peer_idx;
ARRAY_TYPE(http_client_peer) peers;

http_client_queue_debug(queue, "Destroy");

/* unlink all peers */
if (queue->cur_peer != NULL) {
struct http_client_peer *peer = queue->cur_peer;
queue->cur_peer = NULL;
http_client_peer_unlink_queue(peer, queue);
}
if (array_is_created(&queue->pending_peers)) {
t_array_init(&peers, array_count(&queue->pending_peers));
array_copy(&peers.arr, 0, &queue->pending_peers.arr, 0,
array_count(&queue->pending_peers));
array_foreach(&peers, peer_idx)
http_client_peer_unlink_queue(*peer_idx, queue);
array_free(&queue->pending_peers);
}

/* abort all requests */
http_client_queue_fail
(queue, HTTP_CLIENT_REQUEST_ERROR_ABORTED, "Aborted");
if (array_is_created(&queue->pending_peers))
array_free(&queue->pending_peers);
array_free(&queue->requests);
array_free(&queue->queued_requests);
array_free(&queue->queued_urgent_requests);
array_free(&queue->delayed_requests);

/* cancel timeouts */
if (queue->to_connect != NULL)
timeout_remove(&queue->to_connect);
if (queue->to_delayed != NULL)
timeout_remove(&queue->to_delayed);

/* free */
i_free(queue->addr_name);
i_free(queue->name);
i_free(queue);
Expand Down Expand Up @@ -226,18 +249,19 @@ http_client_queue_soft_connect_timeout(struct http_client_queue *queue)
http_client_queue_connection_setup(queue);
}

void http_client_queue_connection_setup(struct http_client_queue *queue)
static struct http_client_peer *
http_client_queue_connection_attempt(struct http_client_queue *queue)
{
struct http_client_host *host = queue->host;
struct http_client_peer *peer = NULL;
const struct http_client_peer_addr *addr = &queue->addr;
struct http_client_peer *peer;
struct http_client_peer_addr *addr = &queue->addr;
unsigned int num_requests =
array_count(&queue->queued_requests) +
array_count(&queue->queued_urgent_requests);
const char *ssl = "";

if (num_requests == 0)
return;
return NULL;

/* update our peer address */
if (queue->addr.type != HTTP_CLIENT_PEER_ADDR_UNIX) {
Expand All @@ -247,31 +271,77 @@ void http_client_queue_connection_setup(struct http_client_queue *queue)
ssl = (ssl == NULL ? "" : t_strdup_printf(" (SSL=%s)", ssl));
}

/* already got a peer? */
peer = NULL;
if (queue->cur_peer != NULL) {
i_assert(!array_is_created(&queue->pending_peers) ||
array_count(&queue->pending_peers) == 0);

/* is it still the one we want? */
if (http_client_peer_addr_cmp(addr, &queue->cur_peer->addr) == 0) {
/* is it still connected? */
if (http_client_peer_is_connected(queue->cur_peer)) {
/* yes */
http_client_queue_debug(queue,
"Using existing connection to %s%s "
"(%u requests pending)",
http_client_peer_addr2str(addr), ssl, num_requests);

/* handle requests; */
http_client_peer_trigger_request_handler(queue->cur_peer);
return queue->cur_peer;
}
/* no */
peer = queue->cur_peer;
} else {
/* peer is not relevant to this queue anymore */
http_client_peer_unlink_queue(queue->cur_peer, queue);
}

queue->cur_peer = NULL;
}

if (peer == NULL)
peer = http_client_peer_get(queue->client, addr);

http_client_queue_debug(queue, "Setting up connection to %s%s "
"(%u requests pending)", http_client_peer_addr2str(addr), ssl,
num_requests);


/* create/get peer */
peer = http_client_peer_get(queue->client, addr);
/* create provisional link between queue and peer */
http_client_peer_link_queue(peer, queue);

/* handle requests; creates new connections when needed/possible */
http_client_peer_trigger_request_handler(peer);

if (!http_client_peer_is_connected(peer)) {
if (http_client_peer_is_connected(peer)) {
/* drop any pending peers */
if (array_is_created(&queue->pending_peers) &&
array_count(&queue->pending_peers) > 0) {
struct http_client_peer *const *peer_idx;

array_foreach(&queue->pending_peers, peer_idx) {
i_assert(http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) != 0);
http_client_peer_unlink_queue(*peer_idx, queue);
}
array_clear(&queue->pending_peers);
}
queue->cur_peer = peer;

} else {
unsigned int msecs;
bool new_peer = TRUE;

/* not already connected, wait for connections */
if (!array_is_created(&queue->pending_peers))
if (!array_is_created(&queue->pending_peers)) {
i_array_init(&queue->pending_peers, 8);
else {
} else {
struct http_client_peer *const *peer_idx;

/* we may be waiting for this peer already */
array_foreach(&queue->pending_peers, peer_idx) {
if (http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) == 0) {
i_assert(*peer_idx == peer);
new_peer = FALSE;
break;
}
Expand All @@ -296,6 +366,13 @@ void http_client_queue_connection_setup(struct http_client_queue *queue)
}
}
}

return peer;
}

void http_client_queue_connection_setup(struct http_client_queue *queue)
{
(void)http_client_queue_connection_attempt(queue);
}

void
Expand Down Expand Up @@ -328,6 +405,8 @@ http_client_queue_connection_success(struct http_client_queue *queue,
connected peer, even if some of the connections
are pending. they may be intended for urgent
requests. */
i_assert(queue->cur_peer == NULL);
queue->cur_peer = *peer_idx;
continue;
}
/* unlink this queue from the peer; if this was the last/only queue, the
Expand All @@ -336,6 +415,7 @@ http_client_queue_connection_success(struct http_client_queue *queue,
http_client_peer_unlink_queue(*peer_idx, queue);
}
array_clear(&queue->pending_peers);
i_assert(queue->cur_peer != NULL);
}
}

Expand All @@ -347,6 +427,12 @@ http_client_queue_connection_failure(struct http_client_queue *queue,
&queue->client->set;
const char *https_name = http_client_peer_addr_get_https_name(addr);
struct http_client_host *host = queue->host;
struct http_client_peer *failed_peer;
struct http_client_peer *const *peer_idx;

i_assert(queue->cur_peer == NULL);
i_assert(array_is_created(&queue->pending_peers) &&
array_count(&queue->pending_peers) > 0);

http_client_queue_debug(queue,
"Failed to set up connection to %s%s: %s "
Expand All @@ -358,27 +444,26 @@ http_client_queue_connection_failure(struct http_client_queue *queue,
array_count(&queue->pending_peers): 0),
array_count(&queue->requests));

if (array_is_created(&queue->pending_peers) &&
array_count(&queue->pending_peers) > 0) {
struct http_client_peer *const *peer_idx;

/* we're still doing the initial connections to this hport. if
we're also doing parallel connections with soft timeouts
(pending_peer_count>1), wait for them to finish
first. */
array_foreach(&queue->pending_peers, peer_idx) {
if (http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) == 0) {
array_delete(&queue->pending_peers,
array_foreach_idx(&queue->pending_peers, peer_idx), 1);
break;
}
}
if (array_count(&queue->pending_peers) > 0) {
http_client_queue_debug(queue,
"Waiting for remaining pending peers.");
return;
/* we're still doing the initial connections to this hport. if
we're also doing parallel connections with soft timeouts
(pending_peer_count>1), wait for them to finish
first. */
failed_peer = NULL;
array_foreach(&queue->pending_peers, peer_idx) {
if (http_client_peer_addr_cmp(&(*peer_idx)->addr, addr) == 0) {
failed_peer = *peer_idx;
array_delete(&queue->pending_peers,
array_foreach_idx(&queue->pending_peers, peer_idx), 1);
break;
}
}
i_assert(failed_peer != NULL);
if (array_count(&queue->pending_peers) > 0) {
http_client_queue_debug(queue,
"Waiting for remaining pending peers.");
http_client_peer_unlink_queue(failed_peer, queue);
return;
}

/* one of the connections failed. if we're not using soft timeouts,
we need to try to connect to the next IP. if we are using soft
Expand Down Expand Up @@ -410,6 +495,7 @@ http_client_queue_connection_failure(struct http_client_queue *queue,
total_msecs/1000, total_msecs%1000);
}
queue->connect_attempts = 0;
http_client_peer_unlink_queue(failed_peer, queue);
http_client_queue_fail(queue,
HTTP_CLIENT_REQUEST_ERROR_CONNECT_FAILED, reason);
return;
Expand All @@ -418,7 +504,8 @@ http_client_queue_connection_failure(struct http_client_queue *queue,
queue->ips_connect_idx = (queue->ips_connect_idx + 1) % host->ips_count;
}

http_client_queue_connection_setup(queue);
if (http_client_queue_connection_attempt(queue) != failed_peer)
http_client_peer_unlink_queue(failed_peer, queue);
return;
}

Expand All @@ -428,6 +515,11 @@ http_client_queue_peer_disconnected(struct http_client_queue *queue,
{
struct http_client_peer *const *peer_idx;

if (queue->cur_peer == peer) {
queue->cur_peer = NULL;
return;
}

if (!array_is_created(&queue->pending_peers))
return;

Expand Down

0 comments on commit 6d7bf6d

Please sign in to comment.