Skip to content

Commit

Permalink
Fix optimistic parallel replication for TokuDB.
Browse files Browse the repository at this point in the history
Make TokuDB report row lock waits with thd_rpl_deadlock_check(). This allows
parallel replication to properly detect conflicts, and kill and retry the
offending transaction.
  • Loading branch information
knielsen committed Nov 23, 2016
1 parent d145d1b commit 660a292
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 20 deletions.
2 changes: 2 additions & 0 deletions storage/tokudb/PerconaFT/buildheader/make_tdb.cc
Expand Up @@ -405,6 +405,7 @@ static void print_db_env_struct (void) {
"int (*set_lock_timeout) (DB_ENV *env, uint64_t default_lock_wait_time_msec, uint64_t (*get_lock_wait_time_cb)(uint64_t default_lock_wait_time))",
"int (*get_lock_timeout) (DB_ENV *env, uint64_t *lock_wait_time_msec)",
"int (*set_lock_timeout_callback) (DB_ENV *env, lock_timeout_callback callback)",
"int (*set_lock_wait_callback) (DB_ENV *env, lock_wait_callback callback)",
"int (*txn_xa_recover) (DB_ENV*, TOKU_XA_XID list[/*count*/], long count, /*out*/ long *retp, uint32_t flags)",
"int (*get_txn_from_xid) (DB_ENV*, /*in*/ TOKU_XA_XID *, /*out*/ DB_TXN **)",
"DB* (*get_db_for_directory) (DB_ENV*)",
Expand Down Expand Up @@ -751,6 +752,7 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) {
printf("void toku_dbt_array_resize(DBT_ARRAY *dbts, uint32_t size) %s;\n", VISIBLE);

printf("typedef void (*lock_timeout_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid);\n");
printf("typedef void (*lock_wait_callback)(void *arg, uint64_t requesting_txnid, uint64_t blocking_txnid);\n");
printf("typedef int (*iterate_row_locks_callback)(DB **db, DBT *left_key, DBT *right_key, void *extra);\n");
printf("typedef int (*iterate_transactions_callback)(DB_TXN *dbtxn, iterate_row_locks_callback cb, void *locks_extra, void *extra);\n");
printf("typedef int (*iterate_requests_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid, uint64_t start_time, void *extra);\n");
Expand Down
12 changes: 12 additions & 0 deletions storage/tokudb/PerconaFT/ftcxx/db_env.hpp
Expand Up @@ -202,6 +202,7 @@ namespace ftcxx {
typedef uint64_t (*get_lock_wait_time_cb_func)(uint64_t);
get_lock_wait_time_cb_func _get_lock_wait_time_cb;
lock_timeout_callback _lock_timeout_callback;
lock_wait_callback _lock_wait_needed_callback;
uint64_t (*_loader_memory_size_callback)(void);

uint32_t _cachesize_gbytes;
Expand Down Expand Up @@ -231,6 +232,7 @@ namespace ftcxx {
_lock_wait_time_msec(0),
_get_lock_wait_time_cb(nullptr),
_lock_timeout_callback(nullptr),
_lock_wait_needed_callback(nullptr),
_loader_memory_size_callback(nullptr),
_cachesize_gbytes(0),
_cachesize_bytes(0),
Expand Down Expand Up @@ -296,6 +298,11 @@ namespace ftcxx {
handle_ft_retval(r);
}

if (_lock_wait_needed_callback) {
r = env->set_lock_wait_callback(env, _lock_wait_needed_callback);
handle_ft_retval(r);
}

if (_loader_memory_size_callback) {
env->set_loader_memory_size(env, _loader_memory_size_callback);
}
Expand Down Expand Up @@ -419,6 +426,11 @@ namespace ftcxx {
return *this;
}

DBEnvBuilder& set_lock_wait_callback(lock_wait_callback callback) {
_lock_wait_needed_callback = callback;
return *this;
}

DBEnvBuilder& set_loader_memory_size(uint64_t (*callback)(void)) {
_loader_memory_size_callback = callback;
return *this;
Expand Down
45 changes: 40 additions & 5 deletions storage/tokudb/PerconaFT/locktree/lock_request.cc
Expand Up @@ -199,7 +199,8 @@ int lock_request::wait(uint64_t wait_time_ms) {
return wait(wait_time_ms, 0, nullptr);
}

int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void)) {
int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void),
void (*lock_wait_callback)(void *, TXNID, TXNID)) {
uint64_t t_now = toku_current_time_microsec();
uint64_t t_start = t_now;
uint64_t t_end = t_start + wait_time_ms * 1000;
Expand All @@ -208,7 +209,13 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil

// check again, this time locking out other retry calls
if (m_state == state::PENDING) {
retry();
GrowableArray<TXNID> conflicts_collector;
conflicts_collector.init();
retry(&conflicts_collector);
if (m_state == state::PENDING) {
report_waits(&conflicts_collector, lock_wait_callback);
}
conflicts_collector.deinit();
}

while (m_state == state::PENDING) {
Expand Down Expand Up @@ -287,7 +294,7 @@ TXNID lock_request::get_conflicting_txnid(void) const {
return m_conflicting_txnid;
}

int lock_request::retry(void) {
int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) {
invariant(m_state == state::PENDING);
int r;

Expand All @@ -308,13 +315,14 @@ int lock_request::retry(void) {
toku_cond_broadcast(&m_wait_cond);
} else {
m_conflicting_txnid = conflicts.get(0);
add_conflicts_to_waits(&conflicts, conflicts_collector);
}
conflicts.destroy();

return r;
}

void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_test_callback)(void)) {
void lock_request::retry_all_lock_requests(locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID), void (*after_retry_all_test_callback)(void)) {
lt_lock_request_info *info = lt->get_lock_request_info();

info->retry_want++;
Expand All @@ -327,6 +335,9 @@ void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_

toku_mutex_lock(&info->mutex);

GrowableArray<TXNID> conflicts_collector;
conflicts_collector.init();

// here is the group retry algorithm.
// get the latest retry_want count and use it as the generation number of this retry operation.
// if this retry generation is > the last retry generation, then do the lock retries. otherwise,
Expand All @@ -344,7 +355,7 @@ void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_
// move on to the next lock request. otherwise
// the request is gone from the list so we may
// read the i'th entry for the next one.
r = request->retry();
r = request->retry(&conflicts_collector);
if (r != 0) {
i++;
}
Expand All @@ -354,6 +365,30 @@ void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_
}

toku_mutex_unlock(&info->mutex);

report_waits(&conflicts_collector, lock_wait_callback);
conflicts_collector.deinit();
}

void lock_request::add_conflicts_to_waits(txnid_set *conflicts,
GrowableArray<TXNID> *wait_conflicts) {
size_t num_conflicts = conflicts->size();
for (size_t i = 0; i < num_conflicts; i++) {
wait_conflicts->push(m_txnid);
wait_conflicts->push(conflicts->get(i));
}
}

void lock_request::report_waits(GrowableArray<TXNID> *wait_conflicts,
void (*lock_wait_callback)(void *, TXNID, TXNID)) {
if (!lock_wait_callback)
return;
size_t num_conflicts = wait_conflicts->get_size();
for (size_t i = 0; i < num_conflicts; i += 2) {
TXNID blocked_txnid = wait_conflicts->fetch_unchecked(i);
TXNID blocking_txnid = wait_conflicts->fetch_unchecked(i+1);
(*lock_wait_callback)(nullptr, blocked_txnid, blocking_txnid);
}
}

void *lock_request::get_extra(void) const {
Expand Down
12 changes: 9 additions & 3 deletions storage/tokudb/PerconaFT/locktree/lock_request.h
Expand Up @@ -89,7 +89,8 @@ class lock_request {
// returns: The return code of locktree::acquire_[write,read]_lock()
// or simply DB_LOCK_NOTGRANTED if the wait time expired.
int wait(uint64_t wait_time_ms);
int wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void));
int wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void),
void (*lock_wait_callback)(void *, TXNID, TXNID) = nullptr);

// return: left end-point of the lock range
const DBT *get_left_key(void) const;
Expand All @@ -109,7 +110,7 @@ class lock_request {
// effect: Retries all of the lock requests for the given locktree.
// Any lock requests successfully restarted is completed and woken up.
// The rest remain pending.
static void retry_all_lock_requests(locktree *lt, void (*after_retry_test_callback)(void) = nullptr);
static void retry_all_lock_requests(locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID) = nullptr, void (*after_retry_test_callback)(void) = nullptr);

void set_start_test_callback(void (*f)(void));
void set_start_before_pending_test_callback(void (*f)(void));
Expand Down Expand Up @@ -162,7 +163,7 @@ class lock_request {

// effect: tries again to acquire the lock described by this lock request
// returns: 0 if retrying the request succeeded and is now complete
int retry(void);
int retry(GrowableArray<TXNID> *conflict_collector);

void complete(int complete_r);

Expand Down Expand Up @@ -194,6 +195,11 @@ class lock_request {

static int find_by_txnid(lock_request * const &request, const TXNID &txnid);

// Report list of conflicts to lock wait callback.
static void report_waits(GrowableArray<TXNID> *wait_conflicts,
void (*lock_wait_callback)(void *, TXNID, TXNID));
void add_conflicts_to_waits(txnid_set *conflicts, GrowableArray<TXNID> *wait_conflicts);

void (*m_start_test_callback)(void);
void (*m_start_before_pending_test_callback)(void);
void (*m_retry_test_callback)(void);
Expand Down
Expand Up @@ -87,7 +87,7 @@ static void run_locker(locktree *lt, TXNID txnid, const DBT *key, pthread_barrie
buffer.destroy();

// retry pending lock requests
lock_request::retry_all_lock_requests(lt, after_retry_all);
lock_request::retry_all_lock_requests(lt, nullptr, after_retry_all);
}

request.destroy();
Expand Down
1 change: 1 addition & 0 deletions storage/tokudb/PerconaFT/src/ydb-internal.h
Expand Up @@ -105,6 +105,7 @@ struct __toku_db_env_internal {
TOKULOGGER logger;
toku::locktree_manager ltm;
lock_timeout_callback lock_wait_timeout_callback; // Called when a lock request times out waiting for a lock.
lock_wait_callback lock_wait_needed_callback; // Called when a lock request requires a wait.

DB *directory; // Maps dnames to inames
DB *persistent_environment; // Stores environment settings, can be used for upgrade
Expand Down
7 changes: 7 additions & 0 deletions storage/tokudb/PerconaFT/src/ydb.cc
Expand Up @@ -1804,6 +1804,12 @@ env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) {
return 0;
}

static int
env_set_lock_wait_callback(DB_ENV *env, lock_wait_callback callback) {
env->i->lock_wait_needed_callback = callback;
return 0;
}

static void
format_time(const time_t *timer, char *buf) {
ctime_r(timer, buf);
Expand Down Expand Up @@ -2704,6 +2710,7 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
USENV(get_lock_timeout);
USENV(set_lock_timeout);
USENV(set_lock_timeout_callback);
USENV(set_lock_wait_callback);
USENV(set_redzone);
USENV(log_flush);
USENV(log_archive);
Expand Down
15 changes: 11 additions & 4 deletions storage/tokudb/PerconaFT/src/ydb_row_lock.cc
Expand Up @@ -193,7 +193,10 @@ int toku_db_start_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT
toku::lock_request::type lock_type, toku::lock_request *request) {
DB_TXN *txn_anc = txn_oldest_ancester(txn);
TXNID txn_anc_id = txn_anc->id64(txn_anc);
request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type, toku_is_big_txn(txn_anc));
uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type, toku_is_big_txn(txn_anc), client_extra);

const int r = request->start();
if (r == 0) {
Expand Down Expand Up @@ -221,7 +224,8 @@ int toku_db_wait_range_lock(DB *db, DB_TXN *txn, toku::lock_request *request) {
uint64_t killed_time_msec = env->i->default_killed_time_msec;
if (env->i->get_killed_time_callback)
killed_time_msec = env->i->get_killed_time_callback(killed_time_msec);
const int r = request->wait(wait_time_msec, killed_time_msec, env->i->killed_callback);
const int r = request->wait(wait_time_msec, killed_time_msec, env->i->killed_callback,
env->i->lock_wait_needed_callback);
if (r == 0) {
db_txn_note_row_lock(db, txn_anc, left_key, right_key);
} else if (r == DB_LOCK_NOTGRANTED) {
Expand All @@ -248,7 +252,10 @@ void toku_db_grab_write_lock (DB *db, DBT *key, TOKUTXN tokutxn) {
// This lock request must succeed, so we do not want to wait
toku::lock_request request;
request.create();
request.set(db->i->lt, txn_anc_id, key, key, toku::lock_request::type::WRITE, toku_is_big_txn(txn_anc));
uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
request.set(db->i->lt, txn_anc_id, key, key, toku::lock_request::type::WRITE, toku_is_big_txn(txn_anc), client_extra);
int r = request.start();
invariant_zero(r);
db_txn_note_row_lock(db, txn_anc, key, key);
Expand All @@ -268,7 +275,7 @@ void toku_db_release_lt_key_ranges(DB_TXN *txn, txn_lt_key_ranges *ranges) {

// all of our locks have been released, so first try to wake up
// pending lock requests, then release our reference on the lt
toku::lock_request::retry_all_lock_requests(lt);
toku::lock_request::retry_all_lock_requests(lt, txn->mgrp->i->lock_wait_needed_callback);

// Release our reference on this locktree
toku::locktree_manager *ltm = &txn->mgrp->i->ltm;
Expand Down

0 comments on commit 660a292

Please sign in to comment.