Skip to content

Commit

Permalink
clusterer: Make the data sync interface more robust
Browse files Browse the repository at this point in the history
This patch improves the data sync interface so that during a sync,
modules are no longer forced to micro-manage the data packets they are
receiving from the interface.  They can now freely abort the processing
of a sync chunk at any time, without disrupting the processing of the
entire sync packet (composed of many more of such data chunks).

Additionally, since the sync packet format has changed (an extra integer
is needed for each chunk in order to allow the "skip" mechanism), the
sync packet version is now bumped from 1 -> 2, in order to prevent any
compatibility issues with OpenSIPS nodes without this patch.

(cherry picked from commit 0b3ad43)
  • Loading branch information
liviuchircu committed Jul 30, 2019
1 parent 9e951c9 commit 31352d9
Showing 1 changed file with 61 additions and 22 deletions.
83 changes: 61 additions & 22 deletions modules/clusterer/sync.c
Expand Up @@ -29,6 +29,7 @@ int _sync_from_id = 0;

static bin_packet_t *sync_packet_snd;
static int sync_prev_buf_len;
static int *sync_last_chunk_sz;

int send_sync_req(str *capability, int cluster_id, int source_id)
{
Expand Down Expand Up @@ -174,6 +175,8 @@ bin_packet_t *cl_sync_chunk_start(str *capability, int cluster_id, int dst_id)

if (aloc_new_pkt) { /* next chunk will be in a new packet */
if (sync_packet_snd) {
*sync_last_chunk_sz = prev_chunk_size;

/* send and free the previous packet */
msg_add_trailer(sync_packet_snd, cluster_id, dst_id);

Expand All @@ -183,67 +186,98 @@ bin_packet_t *cl_sync_chunk_start(str *capability, int cluster_id, int dst_id)
bin_free_packet(sync_packet_snd);
pkg_free(sync_packet_snd);
sync_packet_snd = NULL;
sync_last_chunk_sz = NULL;
}

new_packet = pkg_malloc(sizeof *new_packet);
if (!new_packet) {
LM_ERR("No more pkg memory\n");
return NULL;
}
sync_packet_snd = new_packet;

if (bin_init(new_packet,&cl_extra_cap,CLUSTERER_SYNC,BIN_VERSION,0)<0) {
LM_ERR("Failed to init bin packet\n");
pkg_free(sync_packet_snd);
sync_packet_snd = NULL;
pkg_free(new_packet);
return NULL;
}

bin_push_str(new_packet, capability);
bin_push_int(new_packet, SYNC_CHUNK_START_MARKER);
sync_packet_snd = new_packet;
}

bin_get_buffer(new_packet, &bin_buffer);
sync_prev_buf_len = bin_buffer.len;
if (sync_last_chunk_sz)
*sync_last_chunk_sz = prev_chunk_size;

return new_packet;
} else { /* next chunk will be in the same packet */
bin_push_int(sync_packet_snd, SYNC_CHUNK_START_MARKER);
/* reserve and remember a holder for the upcoming data chunk size */
bin_get_buffer(sync_packet_snd, &bin_buffer);
bin_push_int(sync_packet_snd, 0);
sync_last_chunk_sz = (int *)(bin_buffer.s + bin_buffer.len);

bin_get_buffer(sync_packet_snd, &bin_buffer);
sync_prev_buf_len = bin_buffer.len;
bin_push_int(sync_packet_snd, SYNC_CHUNK_START_MARKER);

return sync_packet_snd;
}
bin_get_buffer(sync_packet_snd, &bin_buffer);
sync_prev_buf_len = bin_buffer.len;

return sync_packet_snd;
}

/* this mechanism allows modules to ignore all or part of a sync chunk
* without disrupting the sequencing / consuming of the remaining data */
char *next_data_chunk;

int cl_sync_chunk_iter(bin_packet_t *packet)
{
int start_marker;
str bin_buffer;
int next_chunk_sz, start_marker;
int rc;

if (!packet) {
LM_ERR("No sync packet\n");
return 0;
}

if (next_data_chunk) {
bin_get_buffer(packet, &bin_buffer);
if (next_data_chunk < bin_buffer.s ||
next_data_chunk >= bin_buffer.s + bin_buffer.len) {
next_data_chunk = NULL; /* no more chunks */
return 0;
}

packet->front_pointer = next_data_chunk;
}

rc = bin_pop_int(packet, &next_chunk_sz);
if (rc < 0) {
LM_ERR("error retrieving next sync chunk size\n");
return 0;
} else if (rc > 0) {
/* no more chunks in this packet */
return 0;
}

rc = bin_pop_int(packet, &start_marker);
if (rc < 0) {
LM_ERR("Error retrieving sync chunk start marker\n");
return 0;
} else if (rc == 0) {
if (start_marker != SYNC_CHUNK_START_MARKER) {
LM_ERR("Bad sync chunk start marker\n");
return 0;
}
return 1;
} else /* no more chunks in this packet */
} else if (rc > 0) {
LM_ERR("no more data: failed to read sync chunk start marker\n");
return 0;
} else if (start_marker != SYNC_CHUNK_START_MARKER) {
LM_ERR("Bad sync chunk start marker\n");
return 0;
}

next_data_chunk = packet->front_pointer + next_chunk_sz;
return 1;
}

int send_sync_repl(cluster_info_t *cluster, int node_id, str *cap_name)
{
bin_packet_t sync_end_pkt;
struct local_cap *cap;
int rc;
str bin_buffer;

for (cap = cluster->capabilities; cap; cap = cap->next)
if (!str_strcmp(cap_name, &cap->reg.name))
Expand All @@ -257,7 +291,10 @@ int send_sync_repl(cluster_info_t *cluster, int node_id, str *cap_name)
cap->reg.event_cb(SYNC_REQ_RCV, node_id);

if (sync_packet_snd) {
/* send and free the previously built packet */
bin_get_buffer(sync_packet_snd, &bin_buffer);
*sync_last_chunk_sz = bin_buffer.len - sync_prev_buf_len;

/* send and free the lastly built packet */
msg_add_trailer(sync_packet_snd, cluster->cluster_id, node_id);

if ((rc = clusterer_send_msg(sync_packet_snd, cluster->cluster_id, node_id))<0)
Expand All @@ -266,6 +303,7 @@ int send_sync_repl(cluster_info_t *cluster, int node_id, str *cap_name)
bin_free_packet(sync_packet_snd);
pkg_free(sync_packet_snd);
sync_packet_snd = NULL;
sync_last_chunk_sz = NULL;
}

/* send indication that all sync packets were sent */
Expand Down Expand Up @@ -352,6 +390,7 @@ void handle_sync_packet(bin_packet_t *packet, int packet_type,
packet->type = SYNC_PACKET_TYPE;
packet->src_id = source_id;

next_data_chunk = NULL;
cap->reg.packet_cb(packet);
} else { /* CLUSTERER_SYNC_END */
LM_DBG("Received all sync packets for capability: %.*s\n", cap_name.len,
Expand Down

0 comments on commit 31352d9

Please sign in to comment.