Skip to content

Commit

Permalink
Merge branch 'master' into contrib-embed-libknot
Browse files Browse the repository at this point in the history
  • Loading branch information
Marek Vavruša committed May 5, 2015
2 parents dea0ebe + 5bccaa9 commit d9a5992
Show file tree
Hide file tree
Showing 31 changed files with 892 additions and 426 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -8,6 +8,7 @@ notifications:
email:
on_success: change
on_failure: change
slack: knot-resolver:Pq2K9Q2DZ76Vf2jHvDKR4eoQ
matrix:
fast_finish: true
allow_failures:
Expand Down
31 changes: 27 additions & 4 deletions daemon/README.rst
Expand Up @@ -335,7 +335,21 @@ The cache in Knot DNS Resolver is persistent with LMDB backend, this means that
the cached data on restart or crash to avoid cold-starts. The cache may be reused between cache
daemons or manipulated from other processes, making for example synchronised load-balanced recursors possible.

.. function:: cache.open(max_size)
.. function:: cache.backends()

:return: map of backends

The cache supports runtime-changeable backends, using the optional :rfc:`3986` URI, where the scheme
represents backend protocol and the rest of the URI backend-specific configuration. By default, it
is a ``lmdb`` backend in working directory, i.e. ``lmdb://``.

Example output:

.. code-block:: lua
[lmdb://] => true
.. function:: cache.open(max_size[, config_uri])

:param number max_size: Maximum cache size in bytes.
:return: boolean
Expand All @@ -345,6 +359,13 @@ daemons or manipulated from other processes, making for example synchronised loa

.. tip:: Use ``kB, MB, GB`` constants as a multiplier, e.g. ``100*MB``.

The cache supports runtime-changeable backends, see :func:`cache.backends()` for mor information and
default. Refer to specific documentation of specific backends for configuration string syntax.

- ``lmdb://``

As of now it only allows you to change the cache directory, e.g. ``lmdb:///tmp/cachedir``.

.. function:: cache.count()

:return: Number of entries in the cache.
Expand All @@ -355,13 +376,15 @@ daemons or manipulated from other processes, making for example synchronised loa

Close the cache.

.. note:: This may or may not clear the cache, depending on the used backend. See :func:`cachectl.clear()`.

Timers and events
^^^^^^^^^^^^^^^^^

The timer represents exactly the thing described in the examples - it allows you to execute closures after specified time,
or event recurrent events. Time is always described in miliseconds, but there are convenient variables that you can use -
``sec, minute, hour``. For example, ``5 * hour`` represents five hours, or 5*60*60*100 milliseconds.
The timer represents exactly the thing described in the examples - it allows you to execute closures
after specified time, or event recurrent events. Time is always described in milliseconds,
but there are convenient variables that you can use - ``sec, minute, hour``.
For example, ``5 * hour`` represents five hours, or 5*60*60*100 milliseconds.

.. function:: event.after(time, function)

Expand Down
56 changes: 51 additions & 5 deletions daemon/bindings.c
Expand Up @@ -285,6 +285,21 @@ int lib_net(lua_State *L)
return 1;
}

/** Return available cached backends. */
static int cache_backends(lua_State *L)
{
struct engine *engine = engine_luaget(L);
storage_registry_t *registry = &engine->storage_registry;

lua_newtable(L);
for (unsigned i = 0; i < registry->len; ++i) {
struct storage_api *storage = &registry->at[i];
lua_pushboolean(L, storage->api() == kr_cache_storage());
lua_setfield(L, -2, storage->prefix);
}
return 1;
}

/** Return number of cached records. */
static int cache_count(lua_State *L)
{
Expand All @@ -304,6 +319,26 @@ static int cache_count(lua_State *L)
return 1;
}

static struct storage_api *cache_select_storage(struct engine *engine, const char **conf)
{
/* Return default backend */
storage_registry_t *registry = &engine->storage_registry;
if (!*conf || !strstr(*conf, "://")) {
return &registry->at[0];
}

/* Find storage backend from config prefix */
for (unsigned i = 0; i < registry->len; ++i) {
struct storage_api *storage = &registry->at[i];
if (strncmp(*conf, storage->prefix, strlen(storage->prefix)) == 0) {
*conf += strlen(storage->prefix);
return storage;
}
}

return NULL;
}

/** Open cache */
static int cache_open(lua_State *L)
{
Expand All @@ -314,16 +349,26 @@ static int cache_open(lua_State *L)
lua_error(L);
}

/* Close if already open */
/* Select cache storage backend */
struct engine *engine = engine_luaget(L);
const char *conf = n > 1 ? lua_tostring(L, 2) : NULL;
struct storage_api *storage = cache_select_storage(engine, &conf);
if (!storage) {
format_error(L, "unsupported cache backend");
lua_error(L);
}
kr_cache_storage_set(storage->api);

/* Close if already open */
if (engine->resolver.cache != NULL) {
kr_cache_close(engine->resolver.cache);
}

/* Open resolution context cache */
engine->resolver.cache = kr_cache_open(".", engine->pool, lua_tointeger(L, 1));
/* Reopen cache */
void *storage_opts = storage->opts_create(conf, lua_tointeger(L, 1));
engine->resolver.cache = kr_cache_open(storage_opts, engine->pool);
free(storage_opts);
if (engine->resolver.cache == NULL) {
format_error(L, "can't open cache in rundir");
format_error(L, "can't open cache");
lua_error(L);
}

Expand All @@ -347,6 +392,7 @@ static int cache_close(lua_State *L)
int lib_cache(lua_State *L)
{
static const luaL_Reg lib[] = {
{ "backends", cache_backends },
{ "count", cache_count },
{ "open", cache_open },
{ "close", cache_close },
Expand Down
25 changes: 23 additions & 2 deletions daemon/engine.c
Expand Up @@ -17,6 +17,8 @@
#include <uv.h>
#include <unistd.h>
#include <libknot/internal/mempattern.h>
/* #include <libknot/internal/namedb/namedb_trie.h> @todo Not supported (doesn't keep value copy) */
#include <libknot/internal/namedb/namedb_lmdb.h>

#include "daemon/engine.h"
#include "daemon/bindings.h"
Expand Down Expand Up @@ -97,16 +99,34 @@ static int l_trampoline(lua_State *L)
* Engine API.
*/

/** @internal Make lmdb options. */
void *namedb_lmdb_mkopts(const char *conf, size_t maxsize)
{
struct namedb_lmdb_opts *opts = malloc(sizeof(*opts));
if (opts) {
memset(opts, 0, sizeof(*opts));
opts->path = conf ? conf : ".";
opts->mapsize = maxsize;
}
return opts;
}

static int init_resolver(struct engine *engine)
{
/* Open resolution context */
engine->resolver.modules = &engine->modules;

/* Load basic modules */
engine_register(engine, "iterate");
engine_register(engine, "itercache");
engine_register(engine, "rrcache");
engine_register(engine, "pktcache");

return kr_ok();
/* Initialize storage backends */
struct storage_api lmdb = {
"lmdb://", namedb_lmdb_api, namedb_lmdb_mkopts
};

return array_push(engine->storage_registry, lmdb);
}

static int init_state(struct engine *engine)
Expand Down Expand Up @@ -168,6 +188,7 @@ void engine_deinit(struct engine *engine)
kr_module_unload(&engine->modules.at[i]);
}
array_clear(engine->modules);
array_clear(engine->storage_registry);

if (engine->L) {
lua_close(engine->L);
Expand Down
11 changes: 11 additions & 0 deletions daemon/engine.h
Expand Up @@ -24,10 +24,21 @@ struct lua_State;
#include "lib/resolve.h"
#include "daemon/network.h"

/** Cache storage backend. */
struct storage_api {
const char *prefix; /**< Storage prefix, e.g. 'lmdb://' */
const namedb_api_t *(*api)(void); /**< Storage API implementation */
void *(*opts_create)(const char *, size_t); /**< Storage options factory */
};

/** @internal Array of cache backend options. */
typedef array_t(struct storage_api) storage_registry_t;

struct engine {
struct kr_context resolver;
struct network net;
module_array_t modules;
storage_registry_t storage_registry;
mm_ctx_t *pool;
struct lua_State *L;
};
Expand Down
27 changes: 10 additions & 17 deletions daemon/io.c
Expand Up @@ -53,18 +53,10 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
{
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;

/* UDP requests are oneshot, always close afterwards */
if (handle->data && !uv_is_closing((uv_handle_t *)handle)) { /* Do not free master socket */
io_close((uv_handle_t *)handle);
}

/* Check the incoming wire length. */
if (nread > KNOT_WIRE_HEADER_SIZE) {
knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
worker_exec(worker, (uv_handle_t *)handle, query, addr);
knot_pkt_free(&query);
}
knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
query->max_size = sizeof(worker->bufs.wire);
worker_exec(worker, (uv_handle_t *)handle, query, addr);
knot_pkt_free(&query);
}

int udp_bind(struct endpoint *ep, struct sockaddr *addr)
Expand Down Expand Up @@ -94,24 +86,25 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;

/* Check for connection close */
if (nread <= 0) {
/* Check for originator connection close */
if (nread <= 0 && handle->data == 0) {
io_close((uv_handle_t *)handle);
return;
} else if (nread < 2) {
/* Not enough bytes to read length */
worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
return;
}

/* Set packet size */
/** @todo This is not going to work if the packet is fragmented in the stream ! */
uint16_t nbytes = wire_read_u16((const uint8_t *)buf->base);

/* Check if there's enough data and execute */
if (nbytes + 2 < nread) {
worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
return;
}

knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, worker->mm);
query->max_size = sizeof(worker->bufs.wire);
worker_exec(worker, (uv_handle_t *)handle, query, NULL);
knot_pkt_free(&query);
}
Expand Down
31 changes: 13 additions & 18 deletions daemon/worker.c
Expand Up @@ -85,7 +85,7 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
}

/* Create buffers */
knot_pkt_t *next_query = knot_pkt_new(NULL, KNOT_WIRE_MIN_PKTSIZE, &task->req.pool);
knot_pkt_t *next_query = knot_pkt_new(NULL, KNOT_EDNS_MAX_UDP_PAYLOAD, &task->req.pool);
knot_pkt_t *answer = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, &task->req.pool);
if (!next_query || !answer) {
mp_delete(pool.ctx);
Expand All @@ -110,9 +110,8 @@ static void qr_task_free(uv_handle_t *handle)
static void qr_task_timeout(uv_timer_t *req)
{
struct qr_task *task = req->data;
if (!uv_is_closing(task->next_handle)) {
if (task->next_handle) {
io_stop_read(task->next_handle);
uv_close(task->next_handle, (uv_close_cb) free);
qr_task_step(task, NULL);
}
}
Expand All @@ -127,6 +126,7 @@ static void qr_task_on_send(uv_req_t* req, int status)
io_start_read(task->next_handle);
}
} else { /* Finalize task */
uv_timer_stop(&task->timeout);
uv_close((uv_handle_t *)&task->timeout, qr_task_free);
}
}
Expand Down Expand Up @@ -171,9 +171,12 @@ static int qr_task_finalize(struct qr_task *task, int state)

static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
{
/* Cancel timeout if active */
uv_timer_stop(&task->timeout);
task->next_handle = NULL;
/* Cancel timeout if active, close handle. */
if (task->next_handle) {
uv_close(task->next_handle, (uv_close_cb) free);
uv_timer_stop(&task->timeout);
task->next_handle = NULL;
}

/* Consume input and produce next query */
int sock_type = -1;
Expand All @@ -187,10 +190,7 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
/* We're done, no more iterations needed */
if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
return qr_task_finalize(task, state);
}

/* Iteration limit */
if (++task->iter_count > KR_ITER_LIMIT) {
} else if (++task->iter_count > KR_ITER_LIMIT) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}

Expand All @@ -206,20 +206,17 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
if (sock_type == SOCK_STREAM) {
uv_connect_t *connect = &task->ioreq.connect;
if (uv_tcp_connect(connect, (uv_tcp_t *)task->next_handle, addr, qr_task_on_connect) != 0) {
uv_close(task->next_handle, (uv_close_cb) free);
return qr_task_step(task, NULL);
}
connect->data = task;
} else {
if (qr_task_send(task, task->next_handle, addr, next_query) != 0) {
uv_close(task->next_handle, (uv_close_cb) free);
return qr_task_step(task, NULL);
}
}

/* Start next timeout */
/* Start next step with timeout */
uv_timer_start(&task->timeout, qr_task_timeout, KR_CONN_RTT_MAX, 0);

return kr_ok();
}

Expand All @@ -231,15 +228,13 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer

/* Parse query */
int ret = parse_query(query);
if (ret != 0) {
return ret;
}

/* Start new task on master sockets, or resume existing */
struct qr_task *task = handle->data;
bool is_master_socket = (!task);
if (is_master_socket) {
if (knot_wire_get_qr(query->wire)) {
/* Ignore badly formed queries or responses. */
if (ret != 0 || knot_wire_get_qr(query->wire)) {
return kr_error(EINVAL); /* Ignore. */
}
task = qr_task_create(worker, handle, addr);
Expand Down
2 changes: 1 addition & 1 deletion lib/README.rst
Expand Up @@ -33,7 +33,7 @@ Writing layers

The resolver :ref:`library <lib_index>` leverages the `processing API`_ from the libknot to separate packet processing code
into layers. In order to keep the core library sane and coverable, there are only two built-in layers:
the :c:func:`iterate_layer`, and the :c:func:`itercache_layer`.
the :c:func:`iterate_layer`, and the :c:func:`rrcache_layer`.

*Note* |---| This is only crash-course in the library internals, see the resolver :ref:`library <lib_index>` documentation for the complete overview of the services.

Expand Down

0 comments on commit d9a5992

Please sign in to comment.