Skip to content

Commit

Permalink
Merge remote-tracking branch 'my/tokudb_optimistic_parallel_replicati…
Browse files Browse the repository at this point in the history
…on' into 10.1
  • Loading branch information
knielsen committed Nov 28, 2016
2 parents a68d135 + 3bec0b3 commit e493c6b
Show file tree
Hide file tree
Showing 30 changed files with 1,756 additions and 142 deletions.
7 changes: 5 additions & 2 deletions storage/tokudb/PerconaFT/buildheader/make_tdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ static void print_db_env_struct (void) {
"int (*set_lock_timeout) (DB_ENV *env, uint64_t default_lock_wait_time_msec, uint64_t (*get_lock_wait_time_cb)(uint64_t default_lock_wait_time))",
"int (*get_lock_timeout) (DB_ENV *env, uint64_t *lock_wait_time_msec)",
"int (*set_lock_timeout_callback) (DB_ENV *env, lock_timeout_callback callback)",
"int (*set_lock_wait_callback) (DB_ENV *env, lock_wait_callback callback)",
"int (*txn_xa_recover) (DB_ENV*, TOKU_XA_XID list[/*count*/], long count, /*out*/ long *retp, uint32_t flags)",
"int (*get_txn_from_xid) (DB_ENV*, /*in*/ TOKU_XA_XID *, /*out*/ DB_TXN **)",
"DB* (*get_db_for_directory) (DB_ENV*)",
Expand All @@ -425,6 +426,7 @@ static void print_db_env_struct (void) {
"bool (*set_dir_per_db)(DB_ENV *, bool new_val)",
"bool (*get_dir_per_db)(DB_ENV *)",
"const char *(*get_data_dir)(DB_ENV *env)",
"void (*kill_waiter)(DB_ENV *, void *extra)",
NULL};

sort_and_dump_fields("db_env", true, extra);
Expand Down Expand Up @@ -545,8 +547,8 @@ static void print_db_txn_struct (void) {
"int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)",
"int (*xa_prepare) (DB_TXN*, TOKU_XA_XID *, uint32_t flags)",
"uint64_t (*id64) (DB_TXN*)",
"void (*set_client_id)(DB_TXN *, uint64_t client_id)",
"uint64_t (*get_client_id)(DB_TXN *)",
"void (*set_client_id)(DB_TXN *, uint64_t client_id, void *client_extra)",
"void (*get_client_id)(DB_TXN *, uint64_t *client_id, void **client_extra)",
"bool (*is_prepared)(DB_TXN *)",
"DB_TXN *(*get_child)(DB_TXN *)",
"uint64_t (*get_start_time)(DB_TXN *)",
Expand Down Expand Up @@ -750,6 +752,7 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) {
printf("void toku_dbt_array_resize(DBT_ARRAY *dbts, uint32_t size) %s;\n", VISIBLE);

printf("typedef void (*lock_timeout_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid);\n");
printf("typedef void (*lock_wait_callback)(void *arg, uint64_t requesting_txnid, uint64_t blocking_txnid);\n");
printf("typedef int (*iterate_row_locks_callback)(DB **db, DBT *left_key, DBT *right_key, void *extra);\n");
printf("typedef int (*iterate_transactions_callback)(DB_TXN *dbtxn, iterate_row_locks_callback cb, void *locks_extra, void *extra);\n");
printf("typedef int (*iterate_requests_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid, uint64_t start_time, void *extra);\n");
Expand Down
9 changes: 6 additions & 3 deletions storage/tokudb/PerconaFT/ft/txn/txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ static txn_child_manager tcm;
.state = TOKUTXN_LIVE,
.num_pin = 0,
.client_id = 0,
.client_extra = nullptr,
.start_time = time(NULL),
};

Expand Down Expand Up @@ -705,12 +706,14 @@ bool toku_txn_has_spilled_rollback(TOKUTXN txn) {
return txn_has_spilled_rollback_logs(txn);
}

uint64_t toku_txn_get_client_id(TOKUTXN txn) {
return txn->client_id;
void toku_txn_get_client_id(TOKUTXN txn, uint64_t *client_id, void **client_extra) {
*client_id = txn->client_id;
*client_extra = txn->client_extra;
}

void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id) {
void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id, void *client_extra) {
txn->client_id = client_id;
txn->client_extra = client_extra;
}

time_t toku_txn_get_start_time(struct tokutxn *txn) {
Expand Down
5 changes: 3 additions & 2 deletions storage/tokudb/PerconaFT/ft/txn/txn.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ struct tokutxn {
uint32_t num_pin; // number of threads (all hot indexes) that want this
// txn to not transition to commit or abort
uint64_t client_id;
void *client_extra;
time_t start_time;
};
typedef struct tokutxn *TOKUTXN;
Expand Down Expand Up @@ -293,8 +294,8 @@ void toku_txn_unpin_live_txn(struct tokutxn *txn);

bool toku_txn_has_spilled_rollback(struct tokutxn *txn);

uint64_t toku_txn_get_client_id(struct tokutxn *txn);
void toku_txn_set_client_id(struct tokutxn *txn, uint64_t client_id);
void toku_txn_get_client_id(struct tokutxn *txn, uint64_t *client_id, void **client_extra);
void toku_txn_set_client_id(struct tokutxn *txn, uint64_t client_id, void *client_extra);

time_t toku_txn_get_start_time(struct tokutxn *txn);

Expand Down
12 changes: 12 additions & 0 deletions storage/tokudb/PerconaFT/ftcxx/db_env.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ namespace ftcxx {
typedef uint64_t (*get_lock_wait_time_cb_func)(uint64_t);
get_lock_wait_time_cb_func _get_lock_wait_time_cb;
lock_timeout_callback _lock_timeout_callback;
lock_wait_callback _lock_wait_needed_callback;
uint64_t (*_loader_memory_size_callback)(void);

uint32_t _cachesize_gbytes;
Expand Down Expand Up @@ -231,6 +232,7 @@ namespace ftcxx {
_lock_wait_time_msec(0),
_get_lock_wait_time_cb(nullptr),
_lock_timeout_callback(nullptr),
_lock_wait_needed_callback(nullptr),
_loader_memory_size_callback(nullptr),
_cachesize_gbytes(0),
_cachesize_bytes(0),
Expand Down Expand Up @@ -296,6 +298,11 @@ namespace ftcxx {
handle_ft_retval(r);
}

if (_lock_wait_needed_callback) {
r = env->set_lock_wait_callback(env, _lock_wait_needed_callback);
handle_ft_retval(r);
}

if (_loader_memory_size_callback) {
env->set_loader_memory_size(env, _loader_memory_size_callback);
}
Expand Down Expand Up @@ -419,6 +426,11 @@ namespace ftcxx {
return *this;
}

DBEnvBuilder& set_lock_wait_callback(lock_wait_callback callback) {
_lock_wait_needed_callback = callback;
return *this;
}

DBEnvBuilder& set_loader_memory_size(uint64_t (*callback)(void)) {
_loader_memory_size_callback = callback;
return *this;
Expand Down
169 changes: 121 additions & 48 deletions storage/tokudb/PerconaFT/locktree/lock_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ void lock_request::create(void) {
toku_cond_init(&m_wait_cond, nullptr);

m_start_test_callback = nullptr;
m_start_before_pending_test_callback = nullptr;
m_retry_test_callback = nullptr;
}

Expand All @@ -79,7 +80,7 @@ void lock_request::destroy(void) {
}

// set the lock request parameters. this API allows a lock request to be reused.
void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn) {
void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn, void *extra) {
invariant(m_state != state::PENDING);
m_lt = lt;
m_txnid = txnid;
Expand All @@ -91,6 +92,7 @@ void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT
m_state = state::INITIALIZED;
m_info = lt ? lt->get_lock_request_info() : nullptr;
m_big_txn = big_txn;
m_extra = extra;
}

// get rid of any stored left and right key copies and
Expand Down Expand Up @@ -173,6 +175,7 @@ int lock_request::start(void) {
m_state = state::PENDING;
m_start_time = toku_current_time_microsec() / 1000;
m_conflicting_txnid = conflicts.get(0);
if (m_start_before_pending_test_callback) m_start_before_pending_test_callback();
toku_mutex_lock(&m_info->mutex);
insert_into_lock_requests();
if (deadlock_exists(conflicts)) {
Expand All @@ -196,14 +199,32 @@ int lock_request::wait(uint64_t wait_time_ms) {
return wait(wait_time_ms, 0, nullptr);
}

int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void)) {
int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void),
void (*lock_wait_callback)(void *, TXNID, TXNID)) {
uint64_t t_now = toku_current_time_microsec();
uint64_t t_start = t_now;
uint64_t t_end = t_start + wait_time_ms * 1000;

toku_mutex_lock(&m_info->mutex);

// check again, this time locking out other retry calls
if (m_state == state::PENDING) {
GrowableArray<TXNID> conflicts_collector;
conflicts_collector.init();
retry(&conflicts_collector);
if (m_state == state::PENDING) {
report_waits(&conflicts_collector, lock_wait_callback);
}
conflicts_collector.deinit();
}

while (m_state == state::PENDING) {
// check if this thread is killed
if (killed_callback && killed_callback()) {
remove_from_lock_requests();
complete(DB_LOCK_NOTGRANTED);
continue;
}

// compute next wait time
uint64_t t_wait;
Expand All @@ -221,7 +242,7 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil
invariant(r == 0 || r == ETIMEDOUT);

t_now = toku_current_time_microsec();
if (m_state == state::PENDING && (t_now >= t_end || (killed_callback && killed_callback()))) {
if (m_state == state::PENDING && t_now >= t_end) {
m_info->counters.timeout_count += 1;

// if we're still pending and we timed out, then remove our
Expand Down Expand Up @@ -273,14 +294,16 @@ TXNID lock_request::get_conflicting_txnid(void) const {
return m_conflicting_txnid;
}

int lock_request::retry(void) {
int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) {
invariant(m_state == state::PENDING);
int r;

invariant(m_state == state::PENDING);
txnid_set conflicts;
conflicts.create();
if (m_type == type::WRITE) {
r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn);
r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
} else {
r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn);
r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn);
}

// if the acquisition succeeded then remove ourselves from the
Expand All @@ -290,59 +313,105 @@ int lock_request::retry(void) {
complete(r);
if (m_retry_test_callback) m_retry_test_callback(); // test callback
toku_cond_broadcast(&m_wait_cond);
} else {
m_conflicting_txnid = conflicts.get(0);
add_conflicts_to_waits(&conflicts, conflicts_collector);
}
conflicts.destroy();

return r;
}

void lock_request::retry_all_lock_requests(locktree *lt) {
void lock_request::retry_all_lock_requests(locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID), void (*after_retry_all_test_callback)(void)) {
lt_lock_request_info *info = lt->get_lock_request_info();

// if a thread reads this bit to be true, then it should go ahead and
// take the locktree mutex and retry lock requests. we use this bit
// to prevent every single thread from waiting on the locktree mutex
// in order to retry requests, especially when no requests actually exist.
//
// it is important to note that this bit only provides an optimization.
// it is not problematic for it to be true when it should be false,
// but it can be problematic for it to be false when it should be true.
// therefore, the lock request code must ensures that when lock requests
// are added to this locktree, the bit is set.
// see lock_request::insert_into_lock_requests()
if (!info->should_retry_lock_requests) {
info->retry_want++;

// if there are no pending lock requests than there is nothing to do
// the unlocked data race on pending_is_empty is OK since lock requests
// are retried after added to the pending set.
if (info->pending_is_empty)
return;
}

toku_mutex_lock(&info->mutex);

// let other threads know that they need not retry lock requests at this time.
//
// the motivation here is that if a bunch of threads have already released
// their locks in the rangetree, then its probably okay for only one thread
// to iterate over the list of requests and retry them. otherwise, at high
// thread counts and a large number of pending lock requests, you could
// end up wasting a lot of cycles.
info->should_retry_lock_requests = false;

size_t i = 0;
while (i < info->pending_lock_requests.size()) {
lock_request *request;
int r = info->pending_lock_requests.fetch(i, &request);
invariant_zero(r);

// retry the lock request. if it didn't succeed,
// move on to the next lock request. otherwise
// the request is gone from the list so we may
// read the i'th entry for the next one.
r = request->retry();
if (r != 0) {
i++;
GrowableArray<TXNID> conflicts_collector;
conflicts_collector.init();

// here is the group retry algorithm.
// get the latest retry_want count and use it as the generation number of this retry operation.
// if this retry generation is > the last retry generation, then do the lock retries. otherwise,
// no lock retries are needed.
unsigned long long retry_gen = info->retry_want.load();
if (retry_gen > info->retry_done) {

// retry all of the pending lock requests.
for (size_t i = 0; i < info->pending_lock_requests.size(); ) {
lock_request *request;
int r = info->pending_lock_requests.fetch(i, &request);
invariant_zero(r);

// retry this lock request. if it didn't succeed,
// move on to the next lock request. otherwise
// the request is gone from the list so we may
// read the i'th entry for the next one.
r = request->retry(&conflicts_collector);
if (r != 0) {
i++;
}
}
if (after_retry_all_test_callback) after_retry_all_test_callback();
info->retry_done = retry_gen;
}

toku_mutex_unlock(&info->mutex);

report_waits(&conflicts_collector, lock_wait_callback);
conflicts_collector.deinit();
}

void lock_request::add_conflicts_to_waits(txnid_set *conflicts,
GrowableArray<TXNID> *wait_conflicts) {
size_t num_conflicts = conflicts->size();
for (size_t i = 0; i < num_conflicts; i++) {
wait_conflicts->push(m_txnid);
wait_conflicts->push(conflicts->get(i));
}
}

void lock_request::report_waits(GrowableArray<TXNID> *wait_conflicts,
void (*lock_wait_callback)(void *, TXNID, TXNID)) {
if (!lock_wait_callback)
return;
size_t num_conflicts = wait_conflicts->get_size();
for (size_t i = 0; i < num_conflicts; i += 2) {
TXNID blocked_txnid = wait_conflicts->fetch_unchecked(i);
TXNID blocking_txnid = wait_conflicts->fetch_unchecked(i+1);
(*lock_wait_callback)(nullptr, blocked_txnid, blocking_txnid);
}
}

void *lock_request::get_extra(void) const {
return m_extra;
}

// future threads should only retry lock requests if some still exist
info->should_retry_lock_requests = info->pending_lock_requests.size() > 0;
void lock_request::kill_waiter(void) {
remove_from_lock_requests();
complete(DB_LOCK_NOTGRANTED);
toku_cond_broadcast(&m_wait_cond);
}

void lock_request::kill_waiter(locktree *lt, void *extra) {
lt_lock_request_info *info = lt->get_lock_request_info();
toku_mutex_lock(&info->mutex);
for (size_t i = 0; i < info->pending_lock_requests.size(); i++) {
lock_request *request;
int r = info->pending_lock_requests.fetch(i, &request);
if (r == 0 && request->get_extra() == extra) {
request->kill_waiter();
break;
}
}
toku_mutex_unlock(&info->mutex);
}

Expand All @@ -364,9 +433,7 @@ void lock_request::insert_into_lock_requests(void) {
invariant(r == DB_NOTFOUND);
r = m_info->pending_lock_requests.insert_at(this, idx);
invariant_zero(r);

// ensure that this bit is true, now that at least one lock request is in the set
m_info->should_retry_lock_requests = true;
m_info->pending_is_empty = false;
}

// remove this lock request from the locktree's set. must hold the mutex.
Expand All @@ -378,6 +445,8 @@ void lock_request::remove_from_lock_requests(void) {
invariant(request == this);
r = m_info->pending_lock_requests.delete_at(idx);
invariant_zero(r);
if (m_info->pending_lock_requests.size() == 0)
m_info->pending_is_empty = true;
}

int lock_request::find_by_txnid(lock_request * const &request, const TXNID &txnid) {
Expand All @@ -395,6 +464,10 @@ void lock_request::set_start_test_callback(void (*f)(void)) {
m_start_test_callback = f;
}

void lock_request::set_start_before_pending_test_callback(void (*f)(void)) {
m_start_before_pending_test_callback = f;
}

void lock_request::set_retry_test_callback(void (*f)(void)) {
m_retry_test_callback = f;
}
Expand Down
Loading

0 comments on commit e493c6b

Please sign in to comment.