Skip to content

Commit

Permalink
Default schema change saving to 7.0 format to allow schema changes to…
Browse files Browse the repository at this point in the history
… race with server upgrade.

To run the testcase you need to setup a mixed cluster by hand and run the script like
./runit dbname oneofthenodes

Signed-off-by: Dorin Hogea <dhogea@bloomberg.net>
  • Loading branch information
dorinhogea committed May 28, 2024
1 parent 4dda14e commit a50724b
Show file tree
Hide file tree
Showing 12 changed files with 512 additions and 110 deletions.
50 changes: 0 additions & 50 deletions bdb/llmeta.c
Original file line number Diff line number Diff line change
Expand Up @@ -11061,53 +11061,3 @@ int bdb_del_view(tran_type *t, const char *view_name)
}
return rc;
}

#include "schemachange.h"

/*
DRQS-170879936:
In version R8, some backwards incompatible changes got introduced into
the schema change object that broke the object's original deserializer
function (buf_get_schemachange()). As a result, reading an sc status object
created by R7 would fail if read by R8 (via comdb2_sc_status).
The fix was to keep the both the versions of the deserializer functions and
invoke them appropriately.
The current (potential hackish) method to pick the right version on the
deserializer function is based on the content of the first 4 bytes of the
LLMETA_SCHEMACHANGE_STATUS payload, where it is assumed that the valid
values of s->kind (between SC_INVALID and SC_LAST, exclusive) will not
coincide with the first 4 bytes of the rqid (fastseed) stored as the first
member in old (7.0's) LLMETA_SCHEMACHANGE_STATUS payload.
*/
static int buf_get_schemachange_key_type(void *p_buf, void *p_buf_end)
{
int first = 0;

if (p_buf >= p_buf_end) return -1;

buf_get(&first, sizeof(first), p_buf, p_buf_end);

if (first > SC_INVALID && first < SC_LAST) {
return LLMETA_SCHEMACHANGE_STATUS_V2;
}
return LLMETA_SCHEMACHANGE_STATUS;
}

void *buf_get_schemachange(struct schema_change_type *s, void *p_buf,
void *p_buf_end)
{
int sc_key_type = buf_get_schemachange_key_type(p_buf, p_buf_end);

switch (sc_key_type) {
case LLMETA_SCHEMACHANGE_STATUS:
return buf_get_schemachange_v1(s, (void *)p_buf, (void *)p_buf_end);
case LLMETA_SCHEMACHANGE_STATUS_V2:
return buf_get_schemachange_v2(s, (void *)p_buf, (void *)p_buf_end);
default:
break;
}
return NULL;
}
1 change: 1 addition & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ extern int gbl_timer_warn_interval;
int gbl_incoherent_clnt_wait = 0;
int gbl_new_leader_duration = 3;
extern int gbl_transaction_grace_period;
extern int gbl_sc_7format;
extern int gbl_dohsql_joins;
extern int gbl_sc_history_max_rows;
extern int gbl_sc_status_max_rows;
Expand Down
4 changes: 4 additions & 0 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -2504,6 +2504,10 @@ REGISTER_TUNABLE("transaction_grace_period",
"Time to wait for connections with pending transactions to go away on exit. (Default: 60)",
TUNABLE_INTEGER, &gbl_transaction_grace_period, 0, NULL, NULL, NULL, NULL);

REGISTER_TUNABLE("sc_7format",
"Save schema change object in 7.0 format. (Default: off)",
TUNABLE_BOOLEAN, &gbl_sc_7format, INTERNAL, NULL, NULL, NULL, NULL);

REGISTER_TUNABLE("dohsql_joins",
"Enable to support joins in parallel sql execution (default: on)",
TUNABLE_BOOLEAN, &gbl_dohsql_joins, 0, NULL, NULL, NULL, NULL);
Expand Down
38 changes: 10 additions & 28 deletions db/osqlcomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -7759,10 +7759,7 @@ int osql_send_schemachange(osql_target_t *target, unsigned long long rqid,
{

schemachange_packed_size(sc);
size_t osql_rpl_size =
((rqid == OSQL_RQID_USE_UUID) ? OSQLCOMM_UUID_RPL_TYPE_LEN
: OSQLCOMM_RPL_TYPE_LEN) +
sc->packed_len;
size_t osql_rpl_size = OSQLCOMM_UUID_RPL_TYPE_LEN + sc->packed_len;
uint8_t *buf = alloca(osql_rpl_size);
uint8_t *p_buf = buf;
uint8_t *p_buf_end = p_buf + osql_rpl_size;
Expand All @@ -7773,32 +7770,17 @@ int osql_send_schemachange(osql_target_t *target, unsigned long long rqid,

strcpy(sc->original_master_node, target->host);

if (rqid == OSQL_RQID_USE_UUID) {
osql_uuid_rpl_t hd_uuid = {0};
osql_uuid_rpl_t hd_uuid = {0};

hd_uuid.type = OSQL_SCHEMACHANGE;
comdb2uuidcpy(hd_uuid.uuid, uuid);
if (!(p_buf = osqlcomm_schemachange_uuid_rpl_type_put(
&hd_uuid, sc, p_buf, p_buf_end))) {
logmsg(LOGMSG_ERROR, "%s:%s returns NULL\n", __func__,
"osqlcomm_schemachange_uuid_rpl_type_put");
return -1;
}

type = osql_net_type_to_net_uuid_type(NET_OSQL_SOCK_RPL);
} else {
osql_rpl_t hd = {0};

hd.type = OSQL_SCHEMACHANGE;
hd.sid = rqid;

if (!(p_buf = osqlcomm_schemachange_rpl_type_put(&hd, sc, p_buf,
p_buf_end))) {
logmsg(LOGMSG_ERROR, "%s:%s returns NULL\n", __func__,
"osqlcomm_schemachange_rpl_type_put");
return -1;
}
hd_uuid.type = OSQL_SCHEMACHANGE;
comdb2uuidcpy(hd_uuid.uuid, uuid);
if (!(p_buf = osqlcomm_schemachange_uuid_rpl_type_put(
&hd_uuid, sc, p_buf, p_buf_end))) {
logmsg(LOGMSG_ERROR, "%s:%s returns NULL\n", __func__,
"osqlcomm_schemachange_uuid_rpl_type_put");
return -1;
}
type = osql_net_type_to_net_uuid_type(NET_OSQL_SOCK_RPL);

if (gbl_enable_osql_logging) {
logmsg(LOGMSG_DEBUG, "[%llu %s] send OSQL_SCHEMACHANGE %s\n", rqid,
Expand Down
Loading

0 comments on commit a50724b

Please sign in to comment.