Skip to content

Commit

Permalink
[RAW] swim: introduce a dissemination component
Browse files Browse the repository at this point in the history
Dissemination components broadcasts events about member status
updates.

Public API:

swim.cfg({server = <uri>, members = <array of uris>,
          heartbeat = <seconds>})

    Configures the SWIM module.

    @server - URI of UDP server to which other cluster
        members will send SWIM messages. It should
        have the format "ip:port".

    @members - array of URIs explicitly defined by a
        user. These members are never deleted from
        members table until they are removed from the
        configuration explicitly. SWIM downloads from
        them their members tables, merges with its
        own and repeats.

    @Heartbeat - how often send a part of members
        table to another member. Note, that it is not
        how ofter send the whole table, nor how ofter
        to send the table to all members. It is only
        one step of the protocol.

swim.stop()

    Stops the SWIM module: shuts down the server,
    closes socket, destroys queues, frees memory.
    Note that after it swim.cfg can be called again.

swim.info()

    Show info about each known member in the format:
    {
        ["ip:port"] = {
            status = <alive/dead>,
            incarnation = <growing unsigned number>
        }
    }

Closes #3234
  • Loading branch information
Gerold103 committed Nov 22, 2018
1 parent 72d93e0 commit 62f3713
Showing 1 changed file with 228 additions and 6 deletions.
234 changes: 228 additions & 6 deletions src/lib/swim/swim.c
Expand Up @@ -48,7 +48,6 @@
* - indirect ping.
* - increment own incarnation on each round.
* - attach dst incarnation to ping.
* - fix swim_member_bin mp_encode_map(2) to 3 in the first patch.
*/

/**
Expand Down Expand Up @@ -224,6 +223,26 @@ struct swim_member {
struct swim_io_task ping_task;
/** Position in a queue of members waiting for an ack. */
struct rlist in_queue_wait_ack;
/**
*
* Dissemination component
*
* Dissemination component sends events. Event is a
* notification about member status update. So formally,
* this structure already has all the needed attributes.
* But also an event somehow should be sent to all members
* at least once according to SWIM, so it requires
* something like TTL, which decrements on each send. And
* a member can not be removed from the global table until
* it gets dead and its dissemination TTL is 0, so as to
* allow other members learn its dead status.
*/
int dissemination_ttl;
/**
* Events are put into a queue sorted by event occurrence
* time.
*/
struct rlist in_queue_events;
};

/**
Expand All @@ -240,6 +259,7 @@ static struct swim_member *self = NULL;
enum swim_component_type {
SWIM_ANTI_ENTROPY = 0,
SWIM_FAILURE_DETECTION,
SWIM_DISSEMINATION,
};

/** {{{ Failure detection component */
Expand Down Expand Up @@ -438,6 +458,92 @@ static struct swim_io_task round_step_task = {

/** }}} Anti-entropy component */

/** {{{ Dissemination component */

/** SWIM dissemination MsgPack template. */
struct PACKED swim_diss_header_bin {
/** mp_encode_uint(SWIM_DISSEMINATION) */
uint8_t k_header;
/** mp_encode_array() */
uint8_t m_header;
uint32_t v_header;
};

static inline void
swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size)
{
header->k_header = SWIM_DISSEMINATION;
header->m_header = 0xdd;
header->v_header = mp_bswap_u32(batch_size);
}

/** SWIM event MsgPack template. */
struct PACKED swim_event_bin {
/** mp_encode_map(4) */
uint8_t m_header;

/** mp_encode_uint(SWIM_MEMBER_STATUS) */
uint8_t k_status;
/** mp_encode_uint(enum member_status) */
uint8_t v_status;

/** mp_encode_uint(SWIM_MEMBER_ADDR) */
uint8_t k_addr;
/** mp_encode_uint(addr.sin_addr.s_addr) */
uint8_t m_addr;
uint32_t v_addr;

/** mp_encode_uint(SWIM_MEMBER_PORT) */
uint8_t k_port;
/** mp_encode_uint(addr.sin_port) */
uint8_t m_port;
uint16_t v_port;

/** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
uint8_t k_incarnation;
/** mp_encode_uint(64bit incarnation) */
uint8_t m_incarnation;
uint64_t v_incarnation;
};

static inline void
swim_event_bin_create(struct swim_event_bin *header)
{
header->m_header = 0x84;
header->k_status = SWIM_MEMBER_STATUS;
header->k_addr = SWIM_MEMBER_ADDR;
header->m_addr = 0xce;
header->k_port = SWIM_MEMBER_PORT;
header->m_port = 0xcd;
header->k_incarnation = SWIM_MEMBER_INCARNATION;
header->m_incarnation = 0xcf;
}

static inline void
swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member)
{
header->v_status = member->status;
header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
header->v_port = mp_bswap_u16(member->addr.sin_port);
header->v_incarnation = mp_bswap_u64(member->incarnation);
}

/** Queue of events sorted by occurrence time. */
static RLIST_HEAD(queue_events);
static int event_count = 0;

static inline void
swim_schedule_event(struct swim_member *member)
{
if (rlist_empty(&member->in_queue_events)) {
rlist_add_tail_entry(&queue_events, member, in_queue_events);
event_count++;
}
member->dissemination_ttl = mh_size(members);
}

/** }}} Dissemination component */

/**
* SWIM message structure:
* {
Expand All @@ -448,6 +554,18 @@ static struct swim_io_task round_step_task = {
*
* OR/AND
*
* SWIM_DISSEMINATION: [
* {
* SWIM_MEMBER_STATUS: uint, enum member_status,
* SWIM_MEMBER_ADDR: uint, ip,
* SWIM_MEMBER_PORT: uint, port,
* SWIM_MEMBER_INCARNATION: uint
* },
* ...
* ],
*
* OR/AND
*
* SWIM_ANTI_ENTROPY: [
* {
* SWIM_MEMBER_STATUS: uint, enum member_status,
Expand Down Expand Up @@ -531,6 +649,16 @@ swim_send_ack(struct swim_io_task *task);
static void
swim_send_ping(struct swim_io_task *task);

/**
* Make all needed actions to process a member's update like a
* change of its status, or incarnation, or both.
*/
static void
swim_member_is_updated(struct swim_member *member)
{
swim_schedule_event(member);
}

/**
* Update status of the member if needed. Statuses are compared as
* a compound key: {incarnation, status}. So @a new_status can
Expand All @@ -548,11 +676,14 @@ swim_member_update_status(struct swim_member *member,
{
assert(member != self);
if (member->incarnation == incarnation) {
if (member->status < new_status)
if (member->status < new_status) {
member->status = new_status;
swim_member_is_updated(member);
}
} else if (member->incarnation < incarnation) {
member->status = new_status;
member->incarnation = incarnation;
swim_member_is_updated(member);
}
}

Expand Down Expand Up @@ -589,6 +720,8 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status,
swim_io_task_create(&member->ping_task, swim_send_ping);
rlist_add_entry(&queue_round, member, in_queue_round);
rlist_create(&member->in_queue_wait_ack);
rlist_create(&member->in_queue_events);
swim_schedule_event(member);
return member;
}

Expand Down Expand Up @@ -617,6 +750,7 @@ swim_member_delete(struct swim_member *member)
swim_io_task_destroy(&member->ping_task);
rlist_del_entry(member, in_queue_round);
rlist_del_entry(member, in_queue_wait_ack);
assert(rlist_empty(&member->in_queue_events));
free(member);
}

Expand Down Expand Up @@ -696,19 +830,53 @@ swim_encode_round_msg(char *buffer, int size)
assert((uint)size > sizeof(struct swim_fd_header_bin) + 1);
size -= sizeof(struct swim_fd_header_bin) + 1;

int diss_batch_size = calculate_bin_batch_size(
sizeof(struct swim_diss_header_bin),
sizeof(struct swim_event_bin), size);
if (diss_batch_size > event_count)
diss_batch_size = event_count;
size -= sizeof(struct swim_diss_header_bin) -
diss_batch_size * sizeof(struct swim_event_bin);

int ae_batch_size = calculate_bin_batch_size(
sizeof(struct swim_anti_entropy_header_bin),
sizeof(struct swim_member_bin), size);
if (ae_batch_size > shuffled_members_size)
ae_batch_size = shuffled_members_size;

buffer = mp_encode_map(buffer, 2);
buffer = mp_encode_map(buffer, 1 + (diss_batch_size > 0) +
(ae_batch_size > 0));

struct swim_fd_header_bin fd_header_bin;
swim_fd_header_bin_create(&fd_header_bin, SWIM_FD_MSG_PING);
memcpy(buffer, &fd_header_bin, sizeof(fd_header_bin));
buffer += sizeof(fd_header_bin);

if (diss_batch_size > 0) {
struct swim_diss_header_bin diss_header_bin;
swim_diss_header_bin_create(&diss_header_bin, diss_batch_size);
memcpy(buffer, &diss_header_bin, sizeof(diss_header_bin));
buffer += sizeof(diss_header_bin);

int i = 0;
struct swim_member *member, *tmp;
struct swim_event_bin event_bin;
swim_event_bin_create(&event_bin);
rlist_foreach_entry_safe(member, &queue_events, in_queue_events,
tmp) {
swim_event_bin_reset(&event_bin, member);
memcpy(buffer, &event_bin, sizeof(event_bin));
buffer += sizeof(event_bin);
rlist_del_entry(member, in_queue_events);
--member->dissemination_ttl;
if (++i >= diss_batch_size)
break;
}
event_count -= diss_batch_size;
}

if (ae_batch_size == 0)
return buffer - start;
struct swim_anti_entropy_header_bin ae_header_bin;
swim_anti_entropy_header_bin_create(&ae_header_bin, ae_batch_size);
memcpy(buffer, &ae_header_bin, sizeof(ae_header_bin));
Expand Down Expand Up @@ -835,12 +1003,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
break;
++m->failed_pings;
if (m->failed_pings >= NO_ACKS_TO_GC) {
if (!m->is_pinned)
if (!m->is_pinned && m->dissemination_ttl == 0)
swim_member_delete(m);
continue;
}
if (m->failed_pings >= NO_ACKS_TO_DEAD)
if (m->failed_pings >= NO_ACKS_TO_DEAD) {
m->status = MEMBER_DEAD;
swim_member_is_updated(m);
}
swim_io_task_push(&m->ping_task);
rlist_del_entry(m, in_queue_wait_ack);
}
Expand Down Expand Up @@ -1089,6 +1259,50 @@ swim_process_failure_detection(const char **pos, const char *end,
return 0;
}

static int
swim_process_dissemination(const char **pos, const char *end)
{
const char *msg_pref = "Invald SWIM dissemination message:";
if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
say_error("%s message should be an array", msg_pref);
return -1;
}
uint64_t size = mp_decode_array(pos);
for (uint64_t i = 0; i < size; ++i) {
if (mp_typeof(**pos) != MP_MAP ||
mp_check_map(*pos, end) > 0) {
say_error("%s event should be map", msg_pref);
return -1;
}
uint64_t map_size = mp_decode_map(pos);
struct swim_member_def def;
swim_member_def_create(&def);
for (uint64_t j = 0; j < map_size; ++j) {
if (mp_typeof(**pos) != MP_UINT ||
mp_check_uint(*pos, end) > 0) {
say_error("%s event key should be uint",
msg_pref);
return -1;
}
uint64_t key = mp_decode_uint(pos);
if (key >= swim_member_key_MAX) {
say_error("%s unknown event key", msg_pref);
return -1;
}
if (swim_process_member_key(key, pos, end, msg_pref,
&def) != 0)
return -1;
}
if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
say_error("%s member address should be specified",
msg_pref);
return -1;
}
swim_process_member_update(&def);
}
return 0;
}

/** Receive and process a new message. */
static void
swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
Expand Down Expand Up @@ -1134,6 +1348,11 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
&addr) != 0)
return;
break;
case SWIM_DISSEMINATION:
say_verbose("SWIM: process dissemination");
if (swim_process_dissemination(&pos, end) != 0)
return;
break;
default:
say_error("%s unknown component type component is "\
"supported", msg_pref);
Expand Down Expand Up @@ -1332,7 +1551,8 @@ swim_stop()
struct swim_member *m = (struct swim_member *)
mh_i64ptr_node(members, node)->val;
mh_i64ptr_del(members, node, NULL);
free(m);
rlist_del_entry(m, in_queue_events);
swim_member_delete(m);
node = mh_first(members);
}
mh_i64ptr_delete(members);
Expand All @@ -1344,7 +1564,9 @@ swim_stop()
cfg_size = 0;
shuffled_members = NULL;
shuffled_members_size = 0;
event_count = 0;
rlist_create(&queue_wait_ack);
rlist_create(&queue_output);
rlist_create(&queue_round);
rlist_create(&queue_events);
}

0 comments on commit 62f3713

Please sign in to comment.