Skip to content

Commit

Permalink
swim: split send/recv into phases
Browse files Browse the repository at this point in the history
At this moment swim_scheduler_on_output() is a relatively simple
function. It takes a task, builds its meta and flushes a result
into the network. But soon SWIM will be able to encrypt messages.

It means, that in addition to regular preprocessing like building
meta headers a new phase will appear - encryption. What is more -
conditional encryption, because a user may want to do not encrypt
messages.

All the same is about swim_scheduler_on_input() - if a SWIM
instance uses encryption, it should decrypt incoming messages
before forwarding them into the SWIM core logic.

The chosen strategy - lets reuse on_output/on_input virtuality
and create two version of on_input/on_output functions:

    swim_on_plain_input()  | swim_on_encrypted_input()
    swim_on_plain_output() | swim_on_encrypted_output()

One of these pairs is chosen depending on if the instance uses
encryption.

To make these 4 functions as simple and short as possible this
commit creates two sets of functions, doing all the logic except
encryption:

    swim_begin_send()
    swim_do_send()
    swim_complete_send()

    swim_begin_recv()
    swim_do_recv()
    swim_complete_recv()

These functions will be used by on_input/on_output functions with
different arguments.

Part of #3234
  • Loading branch information
Gerold103 committed May 21, 2019
1 parent 39dd852 commit f77f4b9
Showing 1 changed file with 104 additions and 25 deletions.
129 changes: 104 additions & 25 deletions src/lib/swim/swim_io.c
Expand Up @@ -338,66 +338,135 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler)
swim_scheduler_stop_input(scheduler);
}

static void
swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
/**
* Begin packet transmission. Prepare a next task in the queue to
* send its packet: build a meta header, pop the task from the
* queue.
* @param scheduler Scheduler to pop a task from.
* @param loop Event loop passed by libev.
* @param io Descriptor to send to.
* @param events Mask of happened events passed by libev.
* @param[out] dst Destination address to send the packet to. Can
* be different from task.dst, for example, if task.proxy
* is specified.
*
* @retval NULL The queue is empty. Input has been stopped.
* @retval not NULL A task ready to be sent.
*/
static struct swim_task *
swim_begin_send(struct swim_scheduler *scheduler, struct ev_loop *loop,
struct ev_io *io, int events, const struct sockaddr_in **dst)
{
assert((events & EV_WRITE) != 0);
(void) events;
struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
if (rlist_empty(&scheduler->queue_output)) {
/*
* Possible, if a member pushed a task and then
* was deleted together with it.
*/
swim_ev_io_stop(loop, io);
return;
return NULL;
}
struct swim_task *task =
rlist_shift_entry(&scheduler->queue_output, struct swim_task,
in_queue_output);
const struct sockaddr_in *src = &scheduler->transport.addr;
const struct sockaddr_in *dst = &task->dst;
const char *dst_str = swim_inaddr_str(dst);
const char *dst_str = swim_inaddr_str(&task->dst);
if (! swim_inaddr_is_empty(&task->proxy)) {
dst = &task->proxy;
*dst = &task->proxy;
dst_str = tt_sprintf("%s via %s", dst_str,
swim_inaddr_str(dst));
swim_inaddr_str(*dst));
swim_packet_build_meta(&task->packet, src, src, &task->dst);
} else {
*dst = &task->dst;
swim_packet_build_meta(&task->packet, src, NULL, NULL);
}
say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler),
task->desc, dst_str);
int rc = swim_transport_send(&scheduler->transport, task->packet.buf,
task->packet.pos - task->packet.buf,
(const struct sockaddr *) dst,
sizeof(*dst));
if (rc < 0)
return task;
}

/** Send a packet into the network. */
static inline ssize_t
swim_do_send(struct swim_scheduler *scheduler, const char *buf, int size,
const struct sockaddr_in *dst)
{
return swim_transport_send(&scheduler->transport, buf, size,
(const struct sockaddr *) dst, sizeof(*dst));
}

/**
* Finalize packet transmission, call the completion callback.
* @param scheduler Scheduler owning @a task.
* @param task Sent (or failed to be sent) task.
* @param size Result of send().
*/
static inline void
swim_complete_send(struct swim_scheduler *scheduler, struct swim_task *task,
ssize_t size)
{
if (size < 0)
diag_log();
if (task->complete != NULL)
task->complete(task, scheduler, rc);
task->complete(task, scheduler, size);
}

static void
swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
{
struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
const struct sockaddr_in *dst;
struct swim_task *task = swim_begin_send(scheduler, loop, io, events,
&dst);
if (task == NULL)
return;
ssize_t size = swim_do_send(scheduler, task->packet.buf,
task->packet.pos - task->packet.buf, dst);
swim_complete_send(scheduler, task, size);
}

/**
* Begin packet receipt. Note, this function is no-op, and exists
* just for consistency with begin/do/complete_send() functions.
*/
static inline void
swim_begin_recv(struct swim_scheduler *scheduler, struct ev_loop *loop,
struct ev_io *io, int events)
{
assert((events & EV_READ) != 0);
(void) io;
(void) scheduler;
(void) events;
(void) loop;
struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
}

/** Receive a packet from the network. */
static ssize_t
swim_do_recv(struct swim_scheduler *scheduler, char *buf, int size)
{
struct sockaddr_in src;
socklen_t len = sizeof(src);
char buf[UDP_PACKET_SIZE];
ssize_t size = swim_transport_recv(&scheduler->transport, buf,
sizeof(buf),
(struct sockaddr *) &src, &len);
if (size <= 0) {
if (size < 0)
goto error;
return;
}
ssize_t rc = swim_transport_recv(&scheduler->transport, buf, size,
(struct sockaddr *) &src, &len);
if (rc <= 0)
return rc;
say_verbose("SWIM %d: received from %s", swim_scheduler_fd(scheduler),
swim_inaddr_str(&src));
return rc;
}

/**
* Finalize packet receipt, call the SWIM core callbacks, or
* forward the packet to a next node.
*/
static void
swim_complete_recv(struct swim_scheduler *scheduler, const char *buf,
ssize_t size)
{
if (size < 0)
goto error;
if (size == 0)
return;
struct swim_meta_def meta;
const char *pos = buf, *end = pos + size;
if (swim_meta_def_decode(&meta, &pos, end) < 0)
Expand Down Expand Up @@ -446,6 +515,16 @@ swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
diag_log();
}

static void
swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
{
struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
char buf[UDP_PACKET_SIZE];
swim_begin_recv(scheduler, loop, io, events);
ssize_t size = swim_do_recv(scheduler, buf, UDP_PACKET_SIZE);
swim_complete_recv(scheduler, buf, size);
}

const char *
swim_inaddr_str(const struct sockaddr_in *addr)
{
Expand Down

0 comments on commit f77f4b9

Please sign in to comment.