Skip to content

Commit

Permalink
References: lp:1299430 - initial support for tokudb replication in ma…
Browse files Browse the repository at this point in the history
…ster-slave model
  • Loading branch information
Seppo Jaakola committed Mar 29, 2014
1 parent 8fb80a5 commit f4defb0
Show file tree
Hide file tree
Showing 3 changed files with 304 additions and 1 deletion.
3 changes: 2 additions & 1 deletion sql/wsrep_hton.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ void wsrep_register_hton(THD* thd, bool all)
THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next())
{
if (i->ht()->db_type == DB_TYPE_INNODB)
if ((i->ht()->db_type == DB_TYPE_INNODB) ||
(i->ht()->db_type == DB_TYPE_TOKUDB))
{
trans_register_ha(thd, all, wsrep_hton);

Expand Down
298 changes: 298 additions & 0 deletions storage/tokudb/ha_tokudb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,38 @@ static const char *ha_tokudb_exts[] = {
NullS
};

#ifdef WITH_WSREP
#include <wsrep_mysqld.h>
#include <my_md5.h>
#include <openssl/md5.h>

extern my_bool wsrep_certify_nonPK;
class binlog_trx_data;
extern handlerton *binlog_hton;
extern "C" int thd_binlog_format(const MYSQL_THD thd);

extern bool wsrep_prepare_key_for_innodb(const uchar *cache_key,
size_t cache_key_len,
const uchar* row_id,
size_t row_id_len,
wsrep_buf_t* key,
size_t* key_len);

extern handlerton * wsrep_hton;

static inline wsrep_ws_handle_t*
//wsrep_ws_handle_t*
//ha_tokudb::wsrep_ws_handle_t*
wsrep_ws_handle(THD* thd) {
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
DB_TXN* txn = (trx->all) ? trx->all : trx->stmt;
assert(txn);
WSREP_DEBUG("txn->id: %lu", txn->id64(txn));
return wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd),
(wsrep_trx_id_t)txn->id64(txn));
}

#endif /* WITH_WSREP */
//
// This offset is calculated starting from AFTER the NULL bytes
//
Expand Down Expand Up @@ -4170,6 +4202,19 @@ int ha_tokudb::write_row(uchar * record) {
added_rows++;
trx->stmt_progress.inserted++;
track_progress(thd);
#ifdef WITH_WSREP
if (wsrep_thd_exec_mode(thd) == LOCAL_STATE &&
WSREP(thd) && !wsrep_consistency_check(thd) &&
(thd_sql_command(thd) != SQLCOM_LOAD ||
thd_binlog_format(thd) == BINLOG_FORMAT_ROW)) {

if (wsrep_append_keys(thd, false, record, NULL)) {
DBUG_PRINT("wsrep", ("row key failed"));
error = HA_ERR_INTERNAL_ERROR;
}
}

#endif
}
cleanup:
if (!num_DBs_locked_in_bulk) {
Expand Down Expand Up @@ -8400,3 +8445,256 @@ namespace tokudb {
template size_t vlq_encode_ui(uint64_t n, void *p, size_t s);
template size_t vlq_decode_ui(uint64_t *np, void *p, size_t s);
};
#ifdef WITH_WSREP
static
int
wsrep_calc_row_hash(
/*================*/
uchar* digest, /*!< in/out: md5 sum */
const uchar* row, /*!< in: row in MySQL format */
TABLE* table, /*!< in: table in MySQL data
dictionary */
THD* thd) /*!< in: user thread */
{
Field* field;
enum_field_types field_mysql_type;
uint n_fields;
unsigned long int len;
const uchar* ptr;
unsigned long int col_type;
uint i;

*digest = rand();
#ifdef REMOVED
MD5_CTX ctx;
MD5_Init (&ctx);

n_fields = table->s->fields;

for (i = 0; i < n_fields; i++) {
uchar null_byte=0;
uchar true_byte=1;
ptr = (const byte*) row + get_field_offset(table, field);

field = table->field[i];
ptr = (const uchar*) row + get_field_offset(table, field);
len = field->pack_length();

field_mysql_type = field->type();

col_type = table->cols[i].mtype;

switch (col_type) {

case DATA_BLOB:
ptr = row_mysql_read_blob_ref(&len, ptr, len);

break;

case DATA_VARCHAR:
case DATA_BINARY:
case DATA_VARMYSQL:
if (field_mysql_type == MYSQL_TYPE_VARCHAR) {
/* This is a >= 5.0.3 type true VARCHAR where
the real payload data length is stored in
1 or 2 bytes */

ptr = row_mysql_read_true_varchar(
&len, ptr,
(ulint)
(((Field_varstring*)field)->length_bytes));

}

break;
default:
;
}
/*
if (field->null_ptr &&
field_in_record_is_null(table, field, (char*) row)) {
*/

if( field->real_maybe_null() && field->is_null_in_record(row)) {
MD5_Update (&ctx, &null_byte, 1);
} else {
MD5_Update (&ctx, &true_byte, 1);
MD5_Update (&ctx, ptr, len);
}
}

MD5_Final (digest, &ctx);
#endif
return(0);
}
#endif /* WITH_WSREP */
#ifdef WITH_WSREP
int
wsrep_append_key(
/*==================*/
THD *thd,
TABLE_SHARE *table_share,
TABLE *table,
const char* key,
uint16_t key_len,
bool shared
)
{
DBUG_ENTER("wsrep_append_key");
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
bool const copy = true;
#ifdef WSREP_DEBUG_PRINT
fprintf(stderr, "%s conn %ld, trx %llu, keylen %d, table %s ",
(shared) ? "Shared" : "Exclusive",
wsrep_thd_thread_id(thd), trx->id, key_len,
table_share->table_name.str);
for (int i=0; i<key_len; i++) {
fprintf(stderr, "%hhX, ", key[i]);
}
fprintf(stderr, "\n");
#endif
wsrep_buf_t wkey_part[3];
wsrep_key_t wkey = {wkey_part, 3};
if (!wsrep_prepare_key_for_innodb(
(const uchar*)table_share->table_cache_key.str,
table_share->table_cache_key.length,
(const uchar*)key, key_len,
wkey_part,
(size_t*)&wkey.key_parts_num)) {
WSREP_WARN("key prepare failed for: %s",
(wsrep_thd_query(thd)) ?
wsrep_thd_query(thd) : "void");
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}

int rcode = (int)wsrep->append_key(
wsrep,
wsrep_ws_handle(thd),
&wkey,
1,
shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
copy);
if (rcode) {
DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
WSREP_WARN("Appending row key failed: %s, %d",
(wsrep_thd_query(thd)) ?
wsrep_thd_query(thd) : "void", rcode);
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}
DBUG_RETURN(0);
}


int
ha_tokudb::wsrep_append_keys(
/*==================*/
THD *thd,
bool shared,
const uchar* record0, /* in: row in MySQL format */
const uchar* record1) /* in: row in MySQL format */
{
int rcode;
DBUG_ENTER("wsrep_append_keys");

bool key_appended = false;
#ifdef REMOVED
trx_t *trx = thd_to_trx(thd);

if (table_share && table_share->tmp_table != NO_TMP_TABLE) {
WSREP_DEBUG("skipping tmp table DML: THD: %lu tmp: %d SQL: %s",
wsrep_thd_thread_id(thd),
table_share->tmp_table,
(wsrep_thd_query(thd)) ?
wsrep_thd_query(thd) : "void");
DBUG_RETURN(0);
}


ut_a(table->s->keys <= 256);
uint i;
for (i=0; i<table->s->keys; ++i) {
uint len;
char keyval0[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
char keyval1[WSREP_MAX_SUPPORTED_KEY_LENGTH+1] = {'\0'};
char* key0 = &keyval0[1];
char* key1 = &keyval1[1];
KEY* key_info = table->key_info + i;
ibool is_null;

dict_index_t* idx = innobase_get_index(i);
dict_table_t* tab = (idx) ? idx->table : NULL;

keyval0[0] = (char)i;
keyval1[0] = (char)i;

if (!tab) {
WSREP_WARN("MySQL-InnoDB key mismatch %s %s",
table->s->table_name.str,
key_info->name);
}
if (key_info->flags & HA_NOSAME ||
((tab &&
dict_table_get_referenced_constraint(tab, idx)) ||
(!tab && referenced_by_foreign_key()))) {

if (key_info->flags & HA_NOSAME || shared)
key_appended = true;

len = wsrep_store_key_val_for_row(
table, i, key0, key_info->key_length,
record0, &is_null);
if (!is_null) {
rcode = wsrep_append_key(
thd, trx, table_share, table,
keyval0, len+1, shared);
if (rcode) DBUG_RETURN(rcode);
}
else
{
WSREP_DEBUG("NULL key skipped: %s",
wsrep_thd_query(thd));
}
if (record1) {
len = wsrep_store_key_val_for_row(
table, i, key1, key_info->key_length,
record1, &is_null);
if (!is_null && memcmp(key0, key1, len)) {
rcode = wsrep_append_key(
thd, trx, table_share,
table,
keyval1, len+1, shared);
if (rcode) DBUG_RETURN(rcode);
}
}
}
}

#endif
/* if no PK, calculate hash of full row, to be the key value */
if (!key_appended && wsrep_certify_nonPK) {
uchar digest[16];
int rcode;

wsrep_calc_row_hash(digest, record0, table, thd);
if ((rcode = wsrep_append_key(thd, table_share, table,
(const char*) digest, 16,
shared))) {
DBUG_RETURN(rcode);
}

if (record1) {
wsrep_calc_row_hash(
digest, record1, table, thd);
if ((rcode = wsrep_append_key(thd, table_share,
table,
(const char*) digest,
16, shared))) {
DBUG_RETURN(rcode);
}
}
DBUG_RETURN(0);
}

DBUG_RETURN(0);
}
#endif
4 changes: 4 additions & 0 deletions storage/tokudb/ha_tokudb.h
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,10 @@ class ha_tokudb : public handler {
bool check_upsert(THD *thd, List<Item> &update_fields, List<Item> &update_values);
int send_upsert_message(THD *thd, List<Item> &update_fields, List<Item> &update_values, DB_TXN *txn);
#endif
#ifdef WITH_WSREP
int wsrep_append_keys(THD *thd, bool shared,
const uchar* record0, const uchar* record1);
#endif
};

static inline bool key_is_clustering(const KEY *key) {
Expand Down

0 comments on commit f4defb0

Please sign in to comment.