Skip to content

Commit

Permalink
daemon: fast retransmit address selection
Browse files Browse the repository at this point in the history
instead of single I/O request per step, the daemon now retries
all addresses in the selection with 300ms timeout between tries.
there are len(list) + len(list)/2 tries

the idea is to reduce latency when UDP request doesn't punch through,
or some NSs are overwhelmed/faulty
  • Loading branch information
Marek Vavruša committed Nov 14, 2015
1 parent de701dc commit e25abc9
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 80 deletions.
231 changes: 157 additions & 74 deletions daemon/worker.c
Expand Up @@ -39,6 +39,52 @@ struct ioreq
} as;
};

/** @internal Number of request within timeout window. */
#define MAX_PENDING (KR_NSREP_MAXADDR + (KR_NSREP_MAXADDR / 2))

/** @internal Query resolution task. */
struct qr_task
{
struct kr_request req;
struct worker_ctx *worker;
knot_pkt_t *pktbuf;
uv_handle_t *pending[MAX_PENDING];
uint16_t pending_count;
uint16_t addrlist_count;
uint16_t addrlist_turn;
struct sockaddr *addrlist;
uv_timer_t retry, timeout;
worker_cb_t on_complete;
void *baton;
struct {
union {
struct sockaddr_in ip4;
struct sockaddr_in6 ip6;
} addr;
uv_handle_t *handle;
} source;
uint16_t iter_count;
uint16_t refs;
uint16_t bytes_remaining;
};

/* Convenience macros */
#define qr_task_ref(task) \
do { ++(task)->refs; } while(0)
#define qr_task_unref(task) \
do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0)
#define qr_valid_handle(task, checked) \
(!uv_is_closing((checked)) || (task)->source.handle == (checked))

/* Forward decls */
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet);

/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
{
return uv_default_loop()->data;
}

static inline struct ioreq *ioreq_take(struct worker_ctx *worker)
{
struct ioreq *req = NULL;
Expand All @@ -62,6 +108,47 @@ static inline void ioreq_release(struct worker_ctx *worker, struct ioreq *req)
}
}

static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype)
{
if (task->pending_count >= MAX_PENDING) {
return NULL;
}
/* Create connection for iterative query */
uv_handle_t *req = (uv_handle_t *)ioreq_take(task->worker);
if (!req) {
return NULL;
}
io_create(task->worker->loop, req, socktype);
req->data = task;
/* Connect or issue query datagram */
task->pending[task->pending_count] = req;
task->pending_count += 1;
return req;
}

static void ioreq_on_close(uv_handle_t *handle)
{
struct worker_ctx *worker = get_worker();
ioreq_release(worker, (struct ioreq *)handle);
}

static void ioreq_kill(uv_handle_t *req)
{
assert(req);
if (!uv_is_closing(req)) {
io_stop_read(req);
uv_close(req, ioreq_on_close);
}
}

static void ioreq_killall(struct qr_task *task)
{
for (size_t i = 0; i < task->pending_count; ++i) {
ioreq_kill(task->pending[i]);
}
task->pending_count = 0;
}

static inline struct mempool *pool_take(struct worker_ctx *worker)
{
/* Recycle available mempool if possible */
Expand All @@ -88,45 +175,6 @@ static inline void pool_release(struct worker_ctx *worker, struct mempool *mp)
}
}

/** @internal Query resolution task. */
struct qr_task
{
struct kr_request req;
struct worker_ctx *worker;
knot_pkt_t *pktbuf;
uv_handle_t *iohandle;
uv_timer_t timeout;
worker_cb_t on_complete;
void *baton;
struct {
union {
struct sockaddr_in ip4;
struct sockaddr_in6 ip6;
} addr;
uv_handle_t *handle;
} source;
uint16_t iter_count;
uint16_t refs;
uint16_t bytes_remaining;
};

/* Convenience macros */
#define qr_task_ref(task) \
do { ++(task)->refs; } while(0)
#define qr_task_unref(task) \
do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0)
#define qr_valid_handle(task, checked) \
((task)->iohandle == (checked) || (task)->source.handle == (checked))

/* Forward decls */
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet);

/** @internal Get singleton worker. */
static inline struct worker_ctx *get_worker(void)
{
return uv_default_loop()->data;
}

static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr *addr)
{
/* How much can client handle? */
Expand Down Expand Up @@ -165,13 +213,16 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
}
task->req.answer = answer;
task->pktbuf = pktbuf;
task->iohandle = NULL;
task->addrlist = NULL;
task->pending_count = 0;
task->bytes_remaining = 0;
task->iter_count = 0;
task->refs = 1;
task->worker = worker;
task->source.handle = handle;
uv_timer_init(worker->loop, &task->retry);
uv_timer_init(worker->loop, &task->timeout);
task->retry.data = task;
task->timeout.data = task;
task->on_complete = NULL;
/* Remember query source addr */
Expand All @@ -192,6 +243,7 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
return task;
}

/* This is called when the task refcount is zero, free memory. */
static void qr_task_free(struct qr_task *task)
{
/* Return mempool to ring or free it if it's full */
Expand All @@ -209,10 +261,20 @@ static void qr_task_free(struct qr_task *task)
}
}

/* This is called when retry timer closes */
static void retransmit_close(uv_handle_t *handle)
{
struct qr_task *task = handle->data;
qr_task_unref(task);
}

/* This is called when task completes and timeout timer is closed. */
static void qr_task_complete(uv_handle_t *handle)
{
struct qr_task *task = handle->data;
struct worker_ctx *worker = task->worker;
/* Kill pending I/O requests */
ioreq_killall(task);
/* Run the completion callback. */
if (task->on_complete) {
task->on_complete(worker, &task->req, task->baton);
Expand All @@ -229,33 +291,36 @@ static void qr_task_complete(uv_handle_t *handle)
worker->stats.concurrent -= 1;
}

static void qr_task_timeout(uv_timer_t *req)
/* This is called when I/O timeouts */
static void on_timeout(uv_timer_t *req)
{
struct qr_task *task = req->data;
if (!uv_is_closing((uv_handle_t *)req)) {
qr_task_step(task, NULL, NULL);
}
}

/* This is called when we send subrequest / answer */
static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
{
/* When NOOP, it means we sent the final answer to originator,
* there we start to close timers and finalize task. */
if (task->req.state != KNOT_STATE_NOOP) {
if (status == 0 && handle) {
io_start_read(handle); /* Start reading answer */
}
} else { /* Finalize task */
} else {
/* Close retry timer (borrows task) */
qr_task_ref(task);
uv_timer_stop(&task->retry);
uv_close((uv_handle_t *)&task->retry, retransmit_close);
/* Close timeout timer (finishes task) */
uv_timer_stop(&task->timeout);
uv_close((uv_handle_t *)&task->timeout, qr_task_complete);
}
return status;
}

static void on_close(uv_handle_t *handle)
{
struct worker_ctx *worker = get_worker();
ioreq_release(worker, (struct ioreq *)handle);
}

static void on_send(uv_udp_send_t *req, int status)
{
struct worker_ctx *worker = get_worker();
Expand Down Expand Up @@ -342,6 +407,24 @@ static void on_connect(uv_connect_t *req, int status)
ioreq_release(worker, (struct ioreq *)req);
}

static void on_retransmit(uv_timer_t *req)
{
struct qr_task *task = req->data;
/* Create connection for iterative query */
if (!uv_is_closing((uv_handle_t *)req) && task->addrlist) {
uv_handle_t *subreq = ioreq_spawn(task, SOCK_DGRAM);
if (subreq) {
struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
if (qr_task_send(task, subreq, (struct sockaddr *)choice, task->pktbuf) == 0) {
task->addrlist_turn = (task->addrlist_turn + 1) % task->addrlist_count; /* Round robin */
return;
}
}
}
/* Not possible to spawn request, stop trying */
uv_timer_stop(req);
}

static int qr_task_finalize(struct qr_task *task, int state)
{
kr_resolve_finish(&task->req, state);
Expand All @@ -353,21 +436,19 @@ static int qr_task_finalize(struct qr_task *task, int state)

static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet)
{
/* Close subrequest handle. */
/* Close pending I/O requests */
uv_timer_stop(&task->retry);
uv_timer_stop(&task->timeout);
if (task->iohandle && !uv_is_closing(task->iohandle)) {
io_stop_read(task->iohandle);
uv_close(task->iohandle, on_close);
task->iohandle = NULL;
}
ioreq_killall(task);

/* Consume input and produce next query */
int sock_type = -1;
struct sockaddr *addr = NULL;
knot_pkt_t *pktbuf = task->pktbuf;
task->addrlist = NULL;
task->addrlist_count = 0;
task->addrlist_turn = 0;
int state = kr_resolve_consume(&task->req, packet_source, packet);
while (state == KNOT_STATE_PRODUCE) {
state = kr_resolve_produce(&task->req, &addr, &sock_type, pktbuf);
state = kr_resolve_produce(&task->req, &task->addrlist, &sock_type, task->pktbuf);
if (unlikely(++task->iter_count > KR_ITER_LIMIT)) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
Expand All @@ -376,39 +457,41 @@ static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_sour
/* We're done, no more iterations needed */
if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
return qr_task_finalize(task, state);
} else if (!addr || sock_type < 0) {
} else if (!task->addrlist || sock_type < 0) {
return qr_task_step(task, NULL, NULL);
}

/* Create connection for iterative query */
uv_handle_t *subreq = (uv_handle_t *)ioreq_take(task->worker);
if (!subreq) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
/* Count available address choices */
struct sockaddr_in6 *choice = (struct sockaddr_in6 *)task->addrlist;
for (size_t i = 0; i < KR_NSREP_MAXADDR && choice->sin6_family != AF_UNSPEC; ++i) {
task->addrlist_count += 1;
choice += 1;
}
io_create(task->worker->loop, subreq, sock_type);
subreq->data = task;

/* Connect or issue query datagram */
task->iohandle = subreq;
/* Start fast retransmit with UDP, otherwise connect. */
if (sock_type == SOCK_DGRAM) {
if (qr_task_send(task, subreq, addr, pktbuf) != 0) {
uv_timer_start(&task->retry, on_retransmit, 0, KR_CONN_RETRY);
} else {
struct ioreq *conn = ioreq_take(task->worker);
if (!conn) {
return qr_task_step(task, NULL, NULL);
}
} else {
struct ioreq *conn_req = ioreq_take(task->worker);
if (!conn_req) {
uv_handle_t *client = ioreq_spawn(task, sock_type);
if (!client) {
ioreq_release(task->worker, conn);
return qr_task_step(task, NULL, NULL);
}
conn_req->as.connect.data = task;
if (uv_tcp_connect(&conn_req->as.connect, (uv_tcp_t *)subreq, addr, on_connect) != 0) {
ioreq_release(task->worker, conn_req);
conn->as.connect.data = task;
if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) {
ioreq_release(task->worker, conn);
return qr_task_step(task, NULL, NULL);
}
/* Connect request borrows task */
qr_task_ref(task);
}

/* Start next step with timeout */
uv_timer_start(&task->timeout, qr_task_timeout, KR_CONN_RTT_MAX, 0);
uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0);
return kr_ok();
}

Expand Down
1 change: 1 addition & 0 deletions lib/defines.h
Expand Up @@ -36,6 +36,7 @@ static inline int __attribute__((__cold__)) kr_error(int x) {
* @cond internal
*/
#define KR_CONN_RTT_MAX 3000 /* Timeout for network activity */
#define KR_CONN_RETRY 300 /* Retry interval for network activity */
#define KR_ITER_LIMIT 50 /* Built-in iterator limit */

/*
Expand Down
6 changes: 0 additions & 6 deletions lib/resolve.c
Expand Up @@ -411,12 +411,6 @@ int kr_resolve_consume(struct kr_request *request, const struct sockaddr *src, k
struct kr_query *qry = TAIL(rplan->pending);
bool tried_tcp = (qry->flags & QUERY_TCP);
if (!packet || packet->size == 0) {
/* Network error, retry over TCP. */
if (!tried_tcp) {
DEBUG_MSG(qry, "=> NS unreachable, retrying over TCP\n");
qry->flags |= QUERY_TCP;
return KNOT_STATE_PRODUCE;
}
request->state = KNOT_STATE_FAIL;
} else {
/* Packet cleared, derandomize QNAME. */
Expand Down

0 comments on commit e25abc9

Please sign in to comment.