Skip to content

Commit

Permalink
replication: allow applier to subscribe with a list of UUIDs
Browse files Browse the repository at this point in the history
Extend IPROTO_SUBSCRIBE with a list of UUIDs to fetch changes from.
Implement src_black_list in relay to filter out rows originating
from replicas in this list.

This is required for #3294
  • Loading branch information
Konstantin Belyavskiy committed Apr 26, 2018
1 parent e6cf699 commit 676990a
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 17 deletions.
3 changes: 2 additions & 1 deletion src/box/applier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ applier_subscribe(struct applier *applier)
struct vclock remote_vclock_at_subscribe;

xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
&replicaset.vclock);
&replicaset.vclock, applier->feeder_uuids,
applier->nfeeder_uuids);
coio_write_xrow(coio, &row);

if (applier->state == APPLIER_READY) {
Expand Down
4 changes: 4 additions & 0 deletions src/box/applier.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ struct applier {
struct xstream *join_stream;
/** xstream to process rows during final JOIN and SUBSCRIBE */
struct xstream *subscribe_stream;
/** List of other replica UUIDs to fetch changes from */
struct tt_uuid feeder_uuids[VCLOCK_MAX];
/** Number of UUIDs to fetch changes from TODO: remove */
uint32_t nfeeder_uuids;
};

/**
Expand Down
8 changes: 7 additions & 1 deletion src/box/box.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1409,8 +1409,11 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
struct vclock replica_clock;
uint32_t replica_version_id;
vclock_create(&replica_clock);
struct tt_uuid feeder_uuids[VCLOCK_MAX];
uint32_t nuuids = 0;
xrow_decode_subscribe_xc(header, &replicaset_uuid, &replica_uuid,
&replica_clock, &replica_version_id);
&replica_clock, &replica_version_id,
feeder_uuids, &nuuids);

/* Forbid connection to itself */
if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID))
Expand Down Expand Up @@ -1438,6 +1441,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
tt_uuid_str(&replica_uuid),
tt_uuid_str(&REPLICASET_UUID));
}
/* Set up relay filter, if specific UUIDs were in SUBSCRIBE request */
if (nuuids)
relay_set_black_list(replica->relay, feeder_uuids, nuuids);

/* Forbid replication with disabled WAL */
if (wal_mode() == WAL_NONE) {
Expand Down
1 change: 1 addition & 0 deletions src/box/iproto_constants.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
/* 0x26 */ MP_MAP, /* IPROTO_VCLOCK */
/* 0x27 */ MP_STR, /* IPROTO_EXPR */
/* 0x28 */ MP_ARRAY, /* IPROTO_OPS */
/* 0x29 */ MP_MAP, /* IPROTO_SUBSCRIBE_UUIDS */
/* }}} */
};

Expand Down
1 change: 1 addition & 0 deletions src/box/iproto_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ enum iproto_key {
IPROTO_VCLOCK = 0x26,
IPROTO_EXPR = 0x27, /* EVAL */
IPROTO_OPS = 0x28, /* UPSERT but not UPDATE ops, because of legacy */
IPROTO_SUBSCRIBE_UUIDS = 0x29, /* UUIDs to fetch changes from */
/* Leave a gap between request keys and response keys */
IPROTO_DATA = 0x30,
IPROTO_ERROR = 0x31,
Expand Down
25 changes: 22 additions & 3 deletions src/box/relay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ struct relay {
struct stailq pending_gc;
/** Time when last row was sent to peer. */
double last_row_tm;
/** Do not send rows initiated from replicas from this list. */
int src_black_list[VCLOCK_MAX];

struct {
/* Align to prevent false-sharing with tx thread */
Expand All @@ -146,6 +148,18 @@ relay_vclock(const struct relay *relay)
return &relay->tx.vclock;
}

void
relay_set_black_list(struct relay *relay, const struct tt_uuid *feeder_uuids,
int nuuids) {
assert(relay != NULL); // TODO: always true?
for (int i = 0; i < VCLOCK_MAX; i++)
relay->src_black_list[i] = 1;
for (int i = 0; i < nuuids; i++) {
int id = replica_by_uuid(&feeder_uuids[i])->id;
relay->src_black_list[id] = 0;
}
}

static void
relay_send(struct relay *relay, struct xrow_header *packet);
static void
Expand Down Expand Up @@ -517,6 +531,7 @@ void
relay_subscribe(int fd, uint64_t sync, struct replica *replica,
struct vclock *replica_clock, uint32_t replica_version_id)
{
say_info("relay_subscribe: set up new relay");
assert(replica->id != REPLICA_ID_NIL);
/* Don't allow multiple relays for the same replica */
if (replica->relay != NULL) {
Expand Down Expand Up @@ -594,11 +609,15 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
* data was not written to WAL, so remote master can't recover
* it). In the latter case packet's LSN is less than or equal to
* local master's LSN at the moment it received 'SUBSCRIBE' request.
* With src_black_list allows applier to subscribe to relay with a
* list of ids from which applier wants to get changes (by default
* from everyone).
*/
if (relay->replica == NULL ||
packet->replica_id != relay->replica->id ||
packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
packet->replica_id)) {
((packet->replica_id != relay->replica->id ||
packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
packet->replica_id)) &&
relay->src_black_list[packet->replica_id] == 0)) {
relay_send(relay, packet);
}
}
7 changes: 7 additions & 0 deletions src/box/relay.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ void
relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
struct vclock *stop_vclock);

/**
* Set up relay black list.
*/
void
relay_set_black_list(struct relay *relay, const struct tt_uuid *feeder_uuids,
int nuuids);

/**
* Subscribe a replica to updates.
*
Expand Down
35 changes: 32 additions & 3 deletions src/box/xrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -803,19 +803,23 @@ int
xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
const struct vclock *vclock)
const struct vclock *vclock,
const struct tt_uuid *subscr_uuids,
uint32_t nuuids)
{
memset(row, 0, sizeof(*row));
uint32_t replicaset_size = vclock_size(vclock);
// TODO: what is replicaset_size in this case?
size_t size = XROW_BODY_LEN_MAX + replicaset_size *
(mp_sizeof_uint(UINT32_MAX) + mp_sizeof_uint(UINT64_MAX));
say_info("xrow_encode_subscribe: size = %llu", size); // TODO: remove
char *buf = (char *) region_alloc(&fiber()->gc, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
return -1;
}
char *data = buf;
data = mp_encode_map(data, 4);
data = mp_encode_map(data, 5);
data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
data = xrow_encode_uuid(data, replicaset_uuid);
data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
Expand All @@ -828,6 +832,12 @@ xrow_encode_subscribe(struct xrow_header *row,
data = mp_encode_uint(data, replica.id);
data = mp_encode_uint(data, replica.lsn);
}
data = mp_encode_uint(data, IPROTO_SUBSCRIBE_UUIDS);
data = mp_encode_map(data, nuuids);
for (uint32_t i = 0; i < nuuids; ++i) {
data = mp_encode_uint(data, i);
data = xrow_encode_uuid(data, subscr_uuids + i);
}
data = mp_encode_uint(data, IPROTO_SERVER_VERSION);
data = mp_encode_uint(data, tarantool_version_id());
assert(data <= buf + size);
Expand All @@ -841,7 +851,8 @@ xrow_encode_subscribe(struct xrow_header *row,
int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
uint32_t *version_id, bool *read_only)
uint32_t *version_id, bool *read_only,
struct tt_uuid *feeder_uuids, uint32_t *nuuids)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
Expand Down Expand Up @@ -913,6 +924,24 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
}
*read_only = mp_decode_bool(&d);
break;
case IPROTO_SUBSCRIBE_UUIDS:
if (feeder_uuids == NULL)
goto skip;
if (mp_typeof(*d) != MP_MAP) {
uuid_map_error:
diag_set(ClientError, ER_INVALID_MSGPACK,
"invalid SUBSCRIBE_UUIDS");
return -1;
}
*nuuids = mp_decode_map(&d);
for (uint32_t i = 0; i < *nuuids; i++) {
if (mp_typeof(*d) != MP_UINT)
goto uuid_map_error;
uint32_t id = mp_decode_uint(&d);
if (xrow_decode_uuid(&d, feeder_uuids + id) != 0)
goto uuid_map_error;
}
break;
default: skip:
mp_next(&d); /* value */
}
Expand Down
26 changes: 17 additions & 9 deletions src/box/xrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ int
xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
const struct vclock *vclock);
const struct vclock *vclock,
const struct tt_uuid *subscr_uuids,
uint32_t nuuids);

/**
* Decode SUBSCRIBE command.
Expand All @@ -260,7 +262,8 @@ xrow_encode_subscribe(struct xrow_header *row,
int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
uint32_t *version_id, bool *read_only);
uint32_t *version_id, bool *read_only,
struct tt_uuid *feeder_uuids, uint32_t *nuuids);

/**
* Encode JOIN command.
Expand All @@ -285,7 +288,7 @@ static inline int
xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid)
{
return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL,
NULL);
NULL, NULL, NULL);
}

/**
Expand All @@ -310,7 +313,8 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock);
static inline int
xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock)
{
return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL);
return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL,
NULL, NULL);
}

/**
Expand All @@ -326,7 +330,8 @@ static inline int
xrow_decode_request_vote(struct xrow_header *row, struct vclock *vclock,
bool *read_only)
{
return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, read_only);
return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, read_only,
NULL, NULL);
}

/**
Expand Down Expand Up @@ -577,10 +582,11 @@ static inline void
xrow_encode_subscribe_xc(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
const struct vclock *vclock)
const struct vclock *vclock,
const struct tt_uuid *subscr_uuids, uint32_t nuuids)
{
if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
vclock) != 0)
vclock, subscr_uuids, nuuids) != 0)
diag_raise();
}

Expand All @@ -589,10 +595,12 @@ static inline void
xrow_decode_subscribe_xc(struct xrow_header *row,
struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
uint32_t *replica_version_id)
uint32_t *replica_version_id,
struct tt_uuid *feeder_uuids, uint32_t *nuuids)
{
if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
vclock, replica_version_id, NULL) != 0)
vclock, replica_version_id, NULL,
feeder_uuids, nuuids) != 0)
diag_raise();
}

Expand Down

0 comments on commit 676990a

Please sign in to comment.