diff --git a/.gitignore b/.gitignore index e6c53270..8a6c8120 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ rundb runcl 3TS 3TS-DAI +contrib/deneva/obj/deps diff --git a/contrib/deneva/Makefile b/contrib/deneva/Makefile index 4d6db35c..90b32c27 100644 --- a/contrib/deneva/Makefile +++ b/contrib/deneva/Makefile @@ -1,4 +1,4 @@ -CC=/usr/local/bin/gcc10/bin/g++ +CC=g++ CFLAGS=-Wall -Werror -std=c++17 -static-libstdc++ -g3 -ggdb -O0 -fno-strict-aliasing -fno-omit-frame-pointer -D_GLIBCXX_USE_CXX11_ABI=0 -static-libasan -fsanitize=address #CFLAGS += -fsanitize=address -fno-stack-protector -fno-omit-frame-pointer NNMSG=./nanomsg-0.5-beta diff --git a/contrib/deneva/client/client_main.cpp b/contrib/deneva/client/client_main.cpp index 719b9d99..41a1f7df 100644 --- a/contrib/deneva/client/client_main.cpp +++ b/contrib/deneva/client/client_main.cpp @@ -165,7 +165,9 @@ int main(int argc, char *argv[]) { // spawn and run txns again. starttime = get_server_clock(); simulation->run_starttime = starttime; +#if WORKLOAD == DA simulation->last_da_query_time = starttime; +#endif uint64_t id = 0; for (uint64_t i = 0; i < thd_cnt; i++) { #if SET_AFFINITY diff --git a/contrib/deneva/concurrency_control/dli.cpp b/contrib/deneva/concurrency_control/dli.cpp deleted file mode 100644 index 8ed04c0f..00000000 --- a/contrib/deneva/concurrency_control/dli.cpp +++ /dev/null @@ -1,269 +0,0 @@ -#include "dli.h" - -#include "../system/helper.h" -#include "../system/manager.h" -#include "../system/global.h" -#include "dta.h" -#include "row_dli_based.h" -#include "txn.h" -#if CC_ALG == DLI_BASE || CC_ALG == DLI_OCC || CC_ALG == DLI_MVCC_OCC || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 || \ - CC_ALG == DLI_MVCC -void dli_merge_set(Dli::RWSet& r_tc, Dli::RWSet& r_tl) { - r_tc.insert(r_tl.begin(), r_tl.end()); -} - -void dli_compare_set(const Dli::RWSet& r_tc, uint32_t& newer_tc, - const Dli::RWSet& r_tl, uint32_t& newer_tl, bool rw, - const uint32_t offset) { - for (const auto& i : r_tl) { - if (r_tc.count(i.first)) { - newer_tl |= (i.second > r_tc.at(i.first)) << offset; - if (!rw) { - newer_tc |= (i.second < r_tc.at(i.first)) << offset; - } else { - newer_tc |= (i.second <= r_tc.at(i.first)) << offset; - } - } - } -} - -#define IDENTIFY_ANOMALY(tc_off, tl_off, ano_name) \ - do { \ - if ((newer_tc & (1 << (tc_off))) && (newer_tl & (1 << (tl_off)))) { \ - INC_STATS(txn->get_thd_id(), ano_name, 1); \ - return true; \ - } \ - } while (0) - -bool dli_has_conflict(TxnManager* txn, const uint32_t newer_tc, const uint32_t newer_tl) { - if (!newer_tc || !newer_tl) return false; - IDENTIFY_ANOMALY(3, 3, ano_4_trans_read_skew); - IDENTIFY_ANOMALY(3, 1, ano_3_trans_read_skew_1); - IDENTIFY_ANOMALY(2, 3, ano_3_trans_write_skew_1); - IDENTIFY_ANOMALY(2, 1, ano_2_trans_write_skew_1); - IDENTIFY_ANOMALY(1, 3, ano_3_trans_read_skew_2); - IDENTIFY_ANOMALY(1, 1, ano_2_trans_read_skew); - IDENTIFY_ANOMALY(0, 3, ano_3_trans_write_skew_2); - IDENTIFY_ANOMALY(0, 1, ano_2_trans_write_skew_2); - INC_STATS(txn->get_thd_id(), ano_unknown, 1); - return true; -} - -#undef IDENTIFY_ANOMALY -#if CC_ALG == DLI_BASE || CC_ALG == DLI_OCC || CC_ALG == DLI_MVCC_OCC || CC_ALG == DLI_MVCC -static bool check_conflict(TxnManager* txn, Dli::RWSet& r_tc, Dli::RWSet& w_tc, Dli::RWSet& r_tl, - Dli::RWSet& w_tl) { - //(txn, rset, wset, r_tl, w_tl); - //r_tc:rset w_tc:wset - // - uint32_t newer_tl = 0, newer_tc = 0; - dli_compare_set(r_tc, newer_tc, r_tl, newer_tl, false, 3); - if (dli_has_conflict(txn, newer_tc, newer_tl)) return true; - dli_compare_set(w_tc, newer_tc, w_tl, newer_tl, false, 2); - if (dli_has_conflict(txn, newer_tc, newer_tl)) return true; - dli_compare_set(r_tc, newer_tc, w_tl, newer_tl, true, 1); - if (dli_has_conflict(txn, newer_tc, newer_tl)) return true; - dli_compare_set(r_tl, newer_tl, w_tc, newer_tc, true, 0); - if (dli_has_conflict(txn, newer_tc, newer_tl)) return true; - - dli_merge_set(r_tc, r_tl); - dli_merge_set(w_tc, w_tl); - - return false; -} - -#endif - -#if CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 -static bool check_conflict(TxnManager* txn, Dli::RWSet& r_tc, Dli::RWSet& w_tc, Dli::RWSet& r_tl, - Dli::RWSet& w_tl, uint64_t& lower, uint64_t& upper, uint64_t lower_tl, - uint64_t upper_tl) { - uint32_t newer_tl = 0, newer_tc = 0; - - dli_compare_set(r_tc, newer_tc, w_tl, newer_tl, true, 1); - if (dli_has_conflict(txn, newer_tc, newer_tl)) return true; - dli_compare_set(r_tl, newer_tl, w_tc, newer_tc, true, 0); - if (dli_has_conflict(txn, newer_tc, newer_tl)) return true; - if (newer_tc) { - lower = std::max(lower, upper_tl); - } else if (newer_tl) { - upper = std::min(lower_tl, upper); - } - if (lower >= upper) return true; - dli_compare_set(r_tc, newer_tc, r_tl, newer_tl, false, 3); - if (dli_has_conflict(txn, newer_tc, newer_tl)) return true; -#if CC_ALG != DLI_DTA3 - dli_compare_set(w_tc, newer_tc, w_tl, newer_tl, false, 2); - if (dli_has_conflict(txn, newer_tc, newer_tl)) return true; -#endif - -#if CC_ALG == DLI_DTA - dli_merge_set(r_tc, r_tl); - dli_merge_set(w_tc, w_tl); -#endif - return false; -} -#endif -#endif -static RC validate_main(TxnManager* txn, Dli* dli, const bool final_validate) { - RC rc = RCOK; -#if CC_ALG == DLI_MVCC_OCC || CC_ALG == DLI_BASE || CC_ALG == DLI_OCC || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 || \ - CC_ALG == DLI_MVCC - uint64_t start_time = get_sys_clock(); - uint64_t expect = 0; - Dli::RWSet rset, wset; - std::unordered_set cur_trans; - Dli::get_rw_set(txn, rset, wset); - Dli::RWSet rset_bak = rset, wset_bak = wset; - ts_t ts = txn->get_start_timestamp(); - -#if CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - uint64_t tid = txn->get_thd_id(); - txnid_t txnid = txn->get_txn_id(); - uint64_t lower = dta_time_table.get_lower(tid, txnid); - uint64_t upper = dta_time_table.get_upper(tid, txnid); - if (lower >= upper) rc = Abort; -#endif - - uint64_t timespan = get_sys_clock() - start_time; - txn->txn_stats.cc_time += timespan; - txn->txn_stats.cc_time_short += timespan; - start_time += timespan; - - timespan = get_sys_clock() - start_time; - txn->txn_stats.cc_block_time += timespan; - txn->txn_stats.cc_block_time_short += timespan; - start_time += timespan; - - if (rc == RCOK) { - for (auto& i : wset) { - if (i.first->manager->w_trans != ts && - !i.first->manager->w_trans.compare_exchange_weak(expect, ts)) { - rc = Abort; - break; - } - } - } - - if (rc == RCOK && !wset.empty()) { - for (auto& i : wset) { - i.second = UINT64_MAX; -#if CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - row_t* cur_wrow = i.first; - if (lower <= cur_wrow->manager->timestamp_last_write) { - lower = cur_wrow->manager->timestamp_last_write + 1; - } -#endif - } - bool res = false; - for (TSNode* tl = dli->validated_txns_.load(); - tl != nullptr && tl != txn->history_dli_txn_head; - tl = tl->next_) {//cur_trans is active tran table - if (tl->is_abort_.load()) continue; -#if CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - res = check_conflict(txn, rset, wset, tl->rset_, tl->wset_, lower, upper, - tl->lower_, tl->upper_); - // dli->release(dli->commit_trans_lowup_mutex_); -#else - res = check_conflict(txn, rset, wset, tl->rset_, tl->wset_); -#endif - // dli->release(dli->commit_trans_rset_mutex_); - if (res) { - rc = Abort; - break; - } - } - } -#if CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - if (lower >= upper) rc = Abort; -#endif - if (rc == RCOK && final_validate) { -#if CC_ALG != DLI_DTA && CC_ALG != DLI_DTA2 && CC_ALG != DLI_DTA3 - txn->dli_txn = TSNode::push_front(dli->validated_txns_, std::move(rset), std::move(wset), ts); -#else - txn->dli_txn = TSNode::push_front(dli->validated_txns_, std::move(rset), std::move(wset), ts, lower, upper); - dta_time_table.set_state(tid, txnid, DTA_VALIDATED); - dta_time_table.set_lower(tid, txnid, lower); - dta_time_table.set_upper(tid, txnid, upper); - } else { - dta_time_table.set_state(tid, txnid, DTA_ABORTED); -#endif - } - timespan = get_sys_clock() - start_time; - txn->txn_stats.cc_time += timespan; - txn->txn_stats.cc_time_short += timespan; - start_time += timespan; - -#endif - return rc; -} - -void Dli::get_rw_set(TxnManager* txn, Dli::RWSet& rset, Dli::RWSet& wset) { - uint64_t len = txn->get_access_cnt(); - for (uint64_t i = 0; i < len; i++) { - if (txn->get_access_type(i) == WR) { - wset.emplace(txn->get_access_original_row(i), txn->get_access_version(i)); - } else { - rset.emplace(txn->get_access_original_row(i), txn->get_access_version(i)); - } - } -#if WORKLOAD == TPCC - for (const auto& i : wset) rset[i.first] = i.second; -#endif -} - -void Dli::init() { - // pthread_mutex_init(&validated_trans_mutex_, NULL); - // pthread_mutex_init(&commit_trans_rset_mutex_, NULL); - // pthread_mutex_init(&commit_trans_wset_mutex_, NULL); - // pthread_mutex_init(&commit_trans_lowup_mutex_, NULL); - pthread_mutex_init(&mtx_, NULL); -} - -RC Dli::validate(TxnManager* txn, const bool final_validate) { - RC rc = RCOK; - uint64_t starttime = get_sys_clock(); - txnid_t tid = txn->get_thd_id(); - rc = validate_main(txn, this, final_validate); - INC_STATS(tid, dli_mvcc_occ_validate_time, get_sys_clock() - starttime); - if (rc == RCOK) { - INC_STATS(tid, dli_mvcc_occ_check_cnt, 1); - } else { - INC_STATS(tid, dli_mvcc_occ_abort_check_cnt, 1); - } - return rc; -} - -void Dli::finish_trans(RC rc, TxnManager* txn) { -#if CC_ALG == DLI_BASE || CC_ALG == DLI_MVCC || CC_ALG == DLI_MVCC_OCC || CC_ALG == DLI_OCC || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - if (rc != RCOK && txn->dli_txn != nullptr) { - txn->dli_txn->is_abort_ = true; - } else if (rc == RCOK) { -#if CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - // transaction commit successfully and the timestamp of the transaction is certained - txn->dli_txn->upper_ = txn->dli_txn->lower_.load() + 1; - txn->dli_txn->commit_ts_ = txn->commit_timestamp; -#endif - } -#endif -} -#if CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 -RC Dli::find_bound(TxnManager* txn) { - RC rc = RCOK; - txnid_t tid = txn->get_thd_id(); - txnid_t txnid = txn->get_txn_id(); - uint64_t lower = dta_time_table.get_lower(tid, txnid); - uint64_t upper = dta_time_table.get_upper(tid, txnid); - if (lower >= upper) { - dta_time_table.set_state(tid, txnid, DTA_VALIDATED); - rc = Abort; - } else { - dta_time_table.set_state(tid, txnid, DTA_COMMITTED); - txn->commit_timestamp = lower + 1; - } - DEBUG("DTA Bound %ld: %d [%lu,%lu] %lu\n", tid, rc, lower, upper, txn->commit_timestamp); - return rc; -} -#endif -void Dli::latch() { pthread_mutex_lock(&mtx_); } -void Dli::release() { pthread_mutex_unlock(&mtx_); } diff --git a/contrib/deneva/concurrency_control/dli.h b/contrib/deneva/concurrency_control/dli.h deleted file mode 100644 index e407327b..00000000 --- a/contrib/deneva/concurrency_control/dli.h +++ /dev/null @@ -1,54 +0,0 @@ -#ifndef SI_H -#define SI_H -#include -#include - -#include "../storage/row.h" -#include "../system/global.h" -#include "semaphore.h" - -template class TSNode; - -struct DliValidatedTxn; - -class Dli { - public: - using RWSet = std::map; - - void init(); - RC validate(TxnManager* txn, const bool lock_rows = true); - void finish_trans(RC rc, TxnManager* txn); - RC find_bound(TxnManager* txn); - static void get_rw_set(TxnManager* txn, RWSet& rset, - RWSet& wset); - void latch(); - void release(); - - std::atomic*> validated_txns_; - - // pthread_mutex_t validated_trans_mutex_; - // pthread_mutex_t commit_trans_rset_mutex_; - // pthread_mutex_t commit_trans_wset_mutex_; - // pthread_mutex_t commit_trans_lowup_mutex_; - pthread_mutex_t mtx_; -}; - -struct DliValidatedTxn { -#if CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - DliValidatedTxn(const Dli::RWSet& rset, const Dli::RWSet& wset, const ts_t start_ts, const uint64_t lower, const uint64_t upper) : is_abort_(false), rset_(rset), wset_(wset), start_ts_(start_ts), lower_(lower), upper_(upper) {} -#else - DliValidatedTxn(const Dli::RWSet& rset, const Dli::RWSet& wset, const ts_t start_ts) : is_abort_(false), rset_(rset), wset_(wset), start_ts_(start_ts) {} -#endif - DliValidatedTxn(DliValidatedTxn&&) = default; - std::atomic is_abort_; - Dli::RWSet rset_; - Dli::RWSet wset_; - const ts_t start_ts_; - ts_t commit_ts_; -#if CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - std::atomic lower_; - std::atomic upper_; -#endif -}; - -#endif diff --git a/contrib/deneva/concurrency_control/dta.cpp b/contrib/deneva/concurrency_control/dta.cpp index 27ad2a13..e5c78d67 100644 --- a/contrib/deneva/concurrency_control/dta.cpp +++ b/contrib/deneva/concurrency_control/dta.cpp @@ -21,7 +21,6 @@ #include "../system/manager.h" #include "../system/mem_alloc.h" #include "../system/txn.h" -#include "dli.h" #include "row_dta.h" void get_rw_set(TxnManager* txn, std::list& rset, std::list& wset) { diff --git a/contrib/deneva/concurrency_control/row_dli_based.cpp b/contrib/deneva/concurrency_control/row_dli_based.cpp deleted file mode 100644 index d53a471e..00000000 --- a/contrib/deneva/concurrency_control/row_dli_based.cpp +++ /dev/null @@ -1,62 +0,0 @@ -/* - Copyright 2016 Massachusetts Institute of Technology - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -#include "row_dli_based.h" - -#include "mem_alloc.h" -#include "row.h" -#include "txn.h" - -void Row_dli_base::init(row_t *row) { - _row = row; - _latch = (pthread_mutex_t *)mem_allocator.alloc(sizeof(pthread_mutex_t)); - pthread_mutex_init(_latch, NULL); - sem_init(&_semaphore, 0, 1); - _cur_version = 0; - _rw_transs = new std::vector(); - _rw_transs->emplace_back(0); - w_trans = 0; -} - -RC Row_dli_base::access(TxnManager *txn, TsType type, uint64_t &version) { - RC rc = RCOK; - sem_wait(&_semaphore); - if (type == R_REQ) { - txn->cur_row->copy(_row); - version = _cur_version; - _rw_transs->back().r_trans_ts.insert(txn->get_start_timestamp()); - } else if (type == P_REQ) { - txn->cur_row->copy(_row); - version = UINT64_MAX; - } else - assert(false); - sem_post(&_semaphore); - return rc; -} - -uint64_t Row_dli_base::write(row_t *data, TxnManager* txn, const access_t type) { - uint64_t res = 0; - if (type == WR) { - sem_wait(&_semaphore); - res = _rw_transs->size(); - //assert(txn->get_commit_timestamp() >= _rw_transs->at(res - 1).w_ts); - _rw_transs->emplace_back(txn->get_commit_timestamp()); - sem_post(&_semaphore); - } - if (w_trans == txn->get_start_timestamp()) w_trans = 0; - return res; -} - diff --git a/contrib/deneva/concurrency_control/row_dli_based.h b/contrib/deneva/concurrency_control/row_dli_based.h deleted file mode 100644 index 033e8746..00000000 --- a/contrib/deneva/concurrency_control/row_dli_based.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - Copyright 2016 Massachusetts Institute of Technology - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -#ifndef ROW_DLI1_H -#define ROW_DLI1_H -#include - -#include "../storage/row.h" - -class table_t; -class Catalog; -class TxnManager; -struct TsReqEntry; - -class Row_dli_base { - public: - struct Entry { - Entry(const ts_t w_ts) : w_ts(w_ts), r_trans_ts() {} - ts_t w_ts; - std::set r_trans_ts; - }; - void init(row_t* row); - RC access(TxnManager* txn, TsType type, uint64_t& version); - uint64_t write(row_t* data, TxnManager* txn, const access_t type); - bool has_version(uint64_t version) const { return version < _rw_transs->size(); } - Entry* get_version(uint64_t version) { - assert(has_version(version)); - return &(_rw_transs->at(version)); - } - - std::atomic w_trans; - - private: - pthread_mutex_t* _latch; - sem_t _semaphore; - row_t* _row; - uint64_t _cur_version; - std::vector* _rw_transs; -}; - -#endif diff --git a/contrib/deneva/concurrency_control/row_dta.cpp b/contrib/deneva/concurrency_control/row_dta.cpp index 347b6c02..928d908c 100644 --- a/contrib/deneva/concurrency_control/row_dta.cpp +++ b/contrib/deneva/concurrency_control/row_dta.cpp @@ -63,7 +63,7 @@ RC Row_dta::access(TsType type, TxnManager* txn, row_t* row, uint64_t& version) } RC Row_dta::read_and_write(TsType type, TxnManager* txn, row_t* row, uint64_t& version) { - assert(CC_ALG == DTA || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3); + assert(CC_ALG == DTA); RC rc = RCOK; uint64_t mtx_wait_starttime = get_sys_clock(); @@ -171,7 +171,7 @@ RC Row_dta::read_and_write(TsType type, TxnManager* txn, row_t* row, uint64_t& v } RC Row_dta::prewrite(TxnManager* txn) { - assert(CC_ALG == DTA || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3); + assert(CC_ALG == DTA); RC rc = RCOK; uint64_t mtx_wait_starttime = get_sys_clock(); diff --git a/contrib/deneva/concurrency_control/row_si.cpp b/contrib/deneva/concurrency_control/row_si.cpp index 84464cd9..9f77cd8d 100644 --- a/contrib/deneva/concurrency_control/row_si.cpp +++ b/contrib/deneva/concurrency_control/row_si.cpp @@ -4,7 +4,6 @@ #include "system/manager.h" #include "system/mem_alloc.h" #include "system/txn.h" -#include "dli.h" inline bool TupleSatisfiesMVCC(const SIEntry& tuple, const ts_t start_ts) { return tuple.commit_ts < start_ts; diff --git a/contrib/deneva/concurrency_control/row_unified.h b/contrib/deneva/concurrency_control/row_unified.h index 18f42763..140664bd 100644 --- a/contrib/deneva/concurrency_control/row_unified.h +++ b/contrib/deneva/concurrency_control/row_unified.h @@ -1,3 +1,13 @@ +/* Tencent is pleased to support the open source community by making 3TS available. + * + * Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved. The below software + * in this distribution may have been modified by THL A29 Limited ("Tencent Modifications"). All + * Tencent Modifications are Copyright (C) THL A29 Limited. + * + * Author: williamcliu@tencent.com + * + */ + #ifndef _ROW_UNIFIED_H_ #define _ROW_UNIFIED_H_ diff --git a/contrib/deneva/concurrency_control/unified_util.h b/contrib/deneva/concurrency_control/unified_util.h index 41ac9cbb..6551861d 100644 --- a/contrib/deneva/concurrency_control/unified_util.h +++ b/contrib/deneva/concurrency_control/unified_util.h @@ -1,17 +1,27 @@ +/* Tencent is pleased to support the open source community by making 3TS available. + * + * Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved. The below software + * in this distribution may have been modified by THL A29 Limited ("Tencent Modifications"). All + * Tencent Modifications are Copyright (C) THL A29 Limited. + * + * Author: williamcliu@tencent.com + * + */ + #ifndef _ROW_UNIFIED_UTIL_H_ #define _ROW_UNIFIED_UTIL_H_ #include "../unified_concurrency_control/row_prece.h" #include "../unified_concurrency_control/alg_dli_identify_cycle.h" -#include "../unified_concurrency_control/alg_dli_identify_merge.h" +#include "../unified_concurrency_control/alg_dli_identify_chain.h" #include "../unified_concurrency_control/txn_dli_identify.h" class row_t; template constexpr const ttts::UniAlgs uni_alg; -template <> constexpr const ttts::UniAlgs uni_alg = ttts::UniAlgs::UNI_DLI_IDENTIFY_CYCLE; -template <> constexpr const ttts::UniAlgs uni_alg = ttts::UniAlgs::UNI_DLI_IDENTIFY_MERGE; +template <> constexpr const ttts::UniAlgs uni_alg = ttts::UniAlgs::UNI_DLI_IDENTIFY_CYCLE; +template <> constexpr const ttts::UniAlgs uni_alg = ttts::UniAlgs::UNI_DLI_IDENTIFY_CHAIN; template using UniRowManager = ttts::RowManager, row_t*>; template using UniTxnManager = ttts::TxnManager, row_t*>; diff --git a/contrib/deneva/config.h b/contrib/deneva/config.h index 6f434357..a8d8f1b8 100644 --- a/contrib/deneva/config.h +++ b/contrib/deneva/config.h @@ -92,7 +92,7 @@ // # of transactions to run for warmup #define WARMUP 0 // YCSB or TPCC or PPS or DA -#define WORKLOAD DA +#define WORKLOAD TPCC // print the transaction latency distribution #define PRT_LAT_DISTR false #define STATS_ENABLE true @@ -152,7 +152,7 @@ // WAIT_DIE, NO_WAIT, TIMESTAMP, MVCC, CALVIN, MAAT, SUNDIAL, SILO, BOCC, FOCC, SSI, WSI #define ISOLATION_LEVEL SERIALIZABLE -#define CC_ALG DLI_IDENTIFY +#define CC_ALG DLI_IDENTIFY_CHAIN #define YCSB_ABORT_MODE false #define QUEUE_CAPACITY_NEW 1000000 // all transactions acquire tuples according to the primary key order. @@ -388,21 +388,14 @@ enum PPSTxnType { #define SSI 17 #define WSI 18 -#define DLI_BASE 19 -#define DLI_OCC 20 -#define DLI_MVCC_OCC 21 -#define DLI_DTA 22 -#define DLI_MVCC 23 -#define DLI_DTA2 24 -#define DLI_DTA3 25 #define DTA 26 #define SILO 27 #define CNULL 28 -#define DLI_IDENTIFY 29 -#define DLI_IDENTIFY_2 30 +#define DLI_IDENTIFY_CYCLE 29 +#define DLI_IDENTIFY_CHAIN 30 -#define IS_GENERIC_ALG (CC_ALG == DLI_IDENTIFY || CC_ALG == DLI_IDENTIFY_2) +#define IS_GENERIC_ALG (CC_ALG == DLI_IDENTIFY_CYCLE || CC_ALG == DLI_IDENTIFY_CHAIN) // TIMESTAMP allocation method. #define TS_MUTEX 1 diff --git a/contrib/deneva/storage/row.cpp b/contrib/deneva/storage/row.cpp index bc143748..588208b1 100644 --- a/contrib/deneva/storage/row.cpp +++ b/contrib/deneva/storage/row.cpp @@ -43,9 +43,7 @@ #include "row_silo.h" #include "row_si.h" #include "row_dta.h" -#include "row_dli_based.h" #include "row_unified.h" -#include "dli.h" #include "mem_alloc.h" #include "manager.h" @@ -96,10 +94,6 @@ void row_t::init_manager(row_t * row) { manager = (Row_null *) mem_allocator.align_alloc(sizeof(Row_null)); #elif CC_ALG == SILO manager = (Row_silo *) mem_allocator.align_alloc(sizeof(Row_silo)); -#elif CC_ALG == DLI_BASE || CC_ALG == DLI_OCC - manager = (Row_dli_base *)mem_allocator.align_alloc(sizeof(Row_dli_base)); -#elif CC_ALG == DLI_MVCC_OCC || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 || CC_ALG == DLI_MVCC - manager = (Row_si *)mem_allocator.align_alloc(sizeof(Row_si)); #elif IS_GENERIC_ALG void* const p = mem_allocator.align_alloc(sizeof(Row_unified)); manager = new(p) Row_unified(); @@ -334,34 +328,6 @@ RC row_t::get_row(access_t type, TxnManager *txn, Access *access) { rc = this->manager->access(txn, R_REQ); access->data = txn->cur_row; goto end; -#elif CC_ALG == DLI_BASE || CC_ALG == DLI_OCC - // DLI always make a local copy regardless of read or write - DEBUG_M("row_t::get_row DLI alloc \n"); - txn->cur_row = (row_t *)mem_allocator.alloc(sizeof(row_t)); - txn->cur_row->init(get_table(), get_part_id()); - rc = this->manager->access(txn, type == WR ? P_REQ : R_REQ, access->version); - access->data = txn->cur_row; -#if CC_ALG == DLI_BASE - if (rc == RCOK) { - rc = dli_man.validate(txn, false); - } -#endif - goto end; -#elif CC_ALG == DLI_MVCC_OCC || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 || CC_ALG == DLI_MVCC - rc = this->manager->access(txn, R_REQ, access->version); - if (type == WR) { - DEBUG_M("row_t::get_row SI alloc \n"); - row_t *newer = (row_t *)mem_allocator.alloc(sizeof(row_t)); - newer->init(get_table(), get_part_id()); - newer->copy(txn->cur_row); - txn->cur_row = newer; - } - - access->data = txn->cur_row; -#if CC_ALG == DLI_MVCC - rc = dli_man.validate(txn, false); -#endif - goto end; #elif CC_ALG == SILO // like OCC, sundial also makes a local copy for each read/write DEBUG_M("row_t::get_row SILO alloc \n"); @@ -427,7 +393,7 @@ RC row_t::get_row_post_wait(access_t type, TxnManager * txn, row_t *& row) { //ts_t endtime = get_sys_clock(); row = this; -#elif CC_ALG == MVCC || CC_ALG == TIMESTAMP || CC_ALG == DTA || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 +#elif CC_ALG == MVCC || CC_ALG == TIMESTAMP || CC_ALG == DTA assert(txn->ts_ready); //INC_STATS(thd_id, time_wait, t2 - t1); row = txn->cur_row; @@ -436,7 +402,7 @@ RC row_t::get_row_post_wait(access_t type, TxnManager * txn, row_t *& row) { assert(row->get_table() != NULL); assert(row->get_schema() == this->get_schema()); assert(row->get_table_name() != NULL); - if (( CC_ALG == MVCC || CC_ALG == SUNDIAL || CC_ALG == SSI || CC_ALG == WSI || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3) && type == WR) { + if (( CC_ALG == MVCC || CC_ALG == SUNDIAL || CC_ALG == SSI || CC_ALG == WSI) && type == WR) { DEBUG_M("row_t::get_row_post_wait MVCC alloc \n"); row_t * newr = (row_t *) mem_allocator.alloc(sizeof(row_t)); newr->init(this->get_table(), get_part_id()); @@ -500,25 +466,6 @@ uint64_t row_t::return_row(RC rc, access_t type, TxnManager *txn, row_t *row) { mem_allocator.free(row, sizeof(row_t)); manager->release(); return 0; -#elif CC_ALG == DLI_BASE || CC_ALG == DLI_OCC - assert(row != NULL); - uint64_t version = 0; - version = manager->write(row, txn, type); - row->free_row(); - DEBUG_M("row_t::return_row DLT1 free \n"); - mem_allocator.free(row, sizeof(row_t)); - return version; -#elif CC_ALG == DLI_MVCC_OCC || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 || CC_ALG == DLI_MVCC - assert(row != NULL); - uint64_t version = 0; - if (type == WR) { - version = manager->write(row, txn, type); - } else if (type == XP) { - manager->write(row, txn, type); - row->free_row(); - mem_allocator.free(row, sizeof(row_t)); - } - return version; #elif CC_ALG == CNULL assert (row != NULL); if (rc == Abort) { diff --git a/contrib/deneva/storage/row.h b/contrib/deneva/storage/row.h index 119e241a..00a2f924 100644 --- a/contrib/deneva/storage/row.h +++ b/contrib/deneva/storage/row.h @@ -60,7 +60,6 @@ class Row_sundial; class Row_si; class Row_null; class Row_silo; -class Row_dli_base; template class Row_unified; class row_t { @@ -140,10 +139,6 @@ class row_t { Row_null * manager; #elif CC_ALG == SILO Row_silo * manager; - #elif CC_ALG == DLI_BASE || CC_ALG == DLI_OCC - Row_dli_base *manager; - #elif CC_ALG == DLI_MVCC_OCC || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 || CC_ALG == DLI_MVCC - Row_si *manager; #elif IS_GENERIC_ALG Row_unified *manager; #endif diff --git a/contrib/deneva/system/client_thread.cpp b/contrib/deneva/system/client_thread.cpp index 1a502eb6..10271b78 100644 --- a/contrib/deneva/system/client_thread.cpp +++ b/contrib/deneva/system/client_thread.cpp @@ -82,7 +82,9 @@ RC ClientThread::run() { INC_STATS(get_thd_id(),cl_send_intv,get_sys_clock() - last_send_time); } last_send_time = get_sys_clock(); + #if WORKLOAD == DA simulation->last_da_query_time = get_sys_clock(); + #endif #elif LOAD_METHOD == LOAD_RATE if ((inf_cnt = client_man.inc_inflight(next_node)) < 0) continue; diff --git a/contrib/deneva/system/extend_enum.h b/contrib/deneva/system/extend_enum.h deleted file mode 100644 index 92846ead..00000000 --- a/contrib/deneva/system/extend_enum.h +++ /dev/null @@ -1,51 +0,0 @@ -#ifdef ENUM_FILE - -template >> inline constexpr uint32_t Count(); -template >> inline const char* ToString(const EnumType e); -template >> inline std::ostream& operator<<(std::ostream& os, const EnumType e) { return os << ToString(e); } -template >> inline const std::array()>& Members(); - -#define ENUM_BEGIN(name) enum class name : uint32_t { -#define ENUM_MEMBER(_, member) member, -#define ENUM_END(name) name##_MAX }; template <> constexpr uint32_t Count() { return static_cast(name::name##_MAX); } - -#include ENUM_FILE - -#undef ENUM_BEGIN -#undef ENUM_MEMBER -#undef ENUM_END - -#define ENUM_BEGIN(name)\ -template <> inline const char* ToString(const name e)\ -{\ - static std::array()> strings { -#define ENUM_MEMBER(_, member) #member, -#define ENUM_END(name)\ - };\ - return strings.at(static_cast(e));\ -}\ - -#include ENUM_FILE - -#undef ENUM_BEGIN -#undef ENUM_MEMBER -#undef ENUM_END - -#define ENUM_BEGIN(name)\ -template <> inline const std::array()>& Members()\ -{\ - static const std::array()> members { -#define ENUM_MEMBER(name, member) name::member, -#define ENUM_END(_)\ - };\ - return members;\ -} - -#include ENUM_FILE - -#undef ENUM_BEGIN -#undef ENUM_MEMBER -#undef ENUM_END - -#undef ENUM_FILE -#endif diff --git a/contrib/deneva/system/global.cpp b/contrib/deneva/system/global.cpp index 7b9a9747..0861114a 100644 --- a/contrib/deneva/system/global.cpp +++ b/contrib/deneva/system/global.cpp @@ -27,7 +27,6 @@ #include "ssi.h" #include "wsi.h" #include "dta.h" -#include "dli.h" #include "transport.h" #include "work_queue.h" #include "abort_queue.h" @@ -72,7 +71,6 @@ ssi ssi_man; wsi wsi_man; Sundial sundial_man; Dta dta_man; -Dli dli_man; #if IS_GENERIC_ALG UniAlgManager uni_alg_man; #endif diff --git a/contrib/deneva/system/global.h b/contrib/deneva/system/global.h index 51184a2b..2eadf079 100644 --- a/contrib/deneva/system/global.h +++ b/contrib/deneva/system/global.h @@ -68,7 +68,6 @@ class wsi; class Maat; class Sundial; class Dta; -class Dli; class Transport; class Remote_query; class TxnManPool; @@ -124,7 +123,6 @@ extern wsi wsi_man; extern Maat maat_man; extern Sundial sundial_man; extern Dta dta_man; -extern Dli dli_man; #if IS_GENERIC_ALG extern UniAlgManager uni_alg_man; #endif diff --git a/contrib/deneva/system/io_thread.cpp b/contrib/deneva/system/io_thread.cpp index 26ee1de9..30edcc63 100644 --- a/contrib/deneva/system/io_thread.cpp +++ b/contrib/deneva/system/io_thread.cpp @@ -35,12 +35,6 @@ static std::atomic g_one_io_thread_exit(false); -static void try_close_socket() { - if (g_one_io_thread_exit.exchange(true)) { - tport_man.destroy(); - } -} - void InputThread::setup() { std::vector * msgs; @@ -94,7 +88,6 @@ RC InputThread::run() { } else { server_recv_loop(); } - try_close_socket(); return FINISH; } @@ -268,7 +261,6 @@ RC OutputThread::run() { heartbeat(); messager->run(); } - try_close_socket(); printf("OutputThread FINISH %ld:%ld\n", _node_id, _thd_id); fflush(stdout); return FINISH; diff --git a/contrib/deneva/system/main.cpp b/contrib/deneva/system/main.cpp index c5e8446f..debde27b 100644 --- a/contrib/deneva/system/main.cpp +++ b/contrib/deneva/system/main.cpp @@ -53,7 +53,6 @@ #include "focc.h" #include "bocc.h" #include "dta.h" -#include "dli.h" #include "client_query.h" #include "sundial.h" #include "http.h" @@ -307,7 +306,7 @@ int main(int argc, char *argv[]) { printf("Done\n"); #endif -#if CC_ALG == DTA || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 +#if CC_ALG == DTA printf("Initializing DTA Time Table... "); fflush(stdout); dta_time_table.init(); diff --git a/contrib/deneva/system/sim_manager.cpp b/contrib/deneva/system/sim_manager.cpp index 9fd74704..3eb17fcd 100644 --- a/contrib/deneva/system/sim_manager.cpp +++ b/contrib/deneva/system/sim_manager.cpp @@ -48,8 +48,10 @@ void SimManager::init() { void SimManager::set_starttime(uint64_t starttime) { if(ATOM_CAS(start_set, false, true)) { run_starttime = starttime; +#if WORKLOAD == DA last_da_query_time = starttime; last_da_recv_query_time = starttime; +#endif last_worker_epoch_time = starttime; sim_done = false; printf("Starttime set to %ld\n",run_starttime); @@ -84,9 +86,11 @@ bool SimManager::timeout() { #endif } +#if WORKLOAD == DA bool SimManager::da_server_iothread_timeout() { return da_has_recved_query && da_timeout_(last_da_recv_query_time); } +#endif bool SimManager::is_done() { bool done = sim_done || timeout(); diff --git a/contrib/deneva/system/txn.cpp b/contrib/deneva/system/txn.cpp index 9e1b7618..ca26c967 100644 --- a/contrib/deneva/system/txn.cpp +++ b/contrib/deneva/system/txn.cpp @@ -34,7 +34,6 @@ #include "focc.h" #include "bocc.h" #include "row_occ.h" -#include "dli.h" #include "dta.h" #include "table.h" #include "catalog.h" @@ -301,7 +300,6 @@ void Transaction::release_inserts(uint64_t thd_id) { for(uint64_t i = 0; i < insert_rows.size(); i++) { row_t * row = insert_rows[i]; #if CC_ALG != MAAT && CC_ALG != OCC && CC_ALG != SUNDIAL && CC_ALG != BOCC && CC_ALG != FOCC -// TODO: does DLI need free row->manager? DEBUG_M("TxnManager::cleanup row->manager free\n"); mem_allocator.free(row->manager, 0); #endif @@ -374,10 +372,6 @@ void TxnManager::init(uint64_t thd_id, Workload * h_wl) { // write_set = (int *) mem_allocator.alloc(sizeof(int) * 100); #endif -#if CC_ALG == DLI_BASE || CC_ALG == DLI_MVCC || CC_ALG == DLI_MVCC_OCC || CC_ALG == DLI_OCC || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - history_dli_txn_head = dli_man.validated_txns_.load(); -#endif - registed_ = false; txn_ready = true; twopl_wait_start = 0; @@ -511,7 +505,7 @@ RC TxnManager::abort() { //assert(time_table.get_state(get_txn_id()) == MAAT_ABORTED); time_table.release(get_thd_id(),get_txn_id()); #endif -#if CC_ALG == DTA || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 +#if CC_ALG == DTA dta_time_table.release(get_thd_id(), get_txn_id()); #endif uint64_t timespan = get_sys_clock() - txn_stats.restart_starttime; @@ -578,7 +572,7 @@ RC TxnManager::start_commit() { send_prepare_messages(); rc = WAIT_REM; } - } else if (!query->readonly() || CC_ALG == OCC || CC_ALG == MAAT || CC_ALG == SILO || CC_ALG == BOCC || CC_ALG == SSI || CC_ALG == DLI_BASE || CC_ALG == DLI_OCC) { + } else if (!query->readonly() || CC_ALG == OCC || CC_ALG == MAAT || CC_ALG == SILO || CC_ALG == BOCC || CC_ALG == SSI) { // send prepare messages #if CC_ALG == FOCC rc = focc_man.start_critical_section(this); @@ -854,16 +848,6 @@ void TxnManager::cleanup_row(RC rc, uint64_t rid) { } #endif -#if CC_ALG == DLI_BASE || CC_ALG == DLI_MVCC || CC_ALG == DLI_MVCC_OCC || CC_ALG == DLI_OCC || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - Dli::RWSet::iterator it; - if (type == WR && dli_txn && (it = dli_txn->wset_.find(orig_r)) != dli_txn->wset_.end()) { - // We need not lock wset_ because once dli_txn can be visiable to other transactions, the size - // of wset will not be changed. We only update the version of each row has been written. - // TODO: the value of wset should be atomical - it->second = version; - } -#endif - #endif if (type == WR) txn->accesses[rid]->version = version; #if CC_ALG == SUNDIAL @@ -892,9 +876,6 @@ void TxnManager::cleanup(RC rc) { #if (CC_ALG == WSI) && MODE == NORMAL_MODE wsi_man.finish(rc,this); #endif -#if (CC_ALG == DLI_BASE || CC_ALG == DLI_OCC || CC_ALG == DLI_MVCC_OCC || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 || CC_ALG == DLI_MVCC_BASE) && MODE == NORMAL_MODE - dli_man.finish_trans(rc, this); -#endif #if IS_GENERIC_ALG && MODE == NORMAL_MODE if (rc == RCOK) { uni_alg_man.Commit(*uni_txn_man_); @@ -1156,16 +1137,6 @@ RC TxnManager::validate() { } } else if (CC_ALG == SUNDIAL) { rc = sundial_man.validate(this); - } else if (CC_ALG == DLI_BASE || CC_ALG == DLI_OCC || CC_ALG == DLI_MVCC_OCC || - CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 || CC_ALG == DLI_MVCC) { - rc = dli_man.validate(this); - if (IS_LOCAL(get_txn_id()) && rc == RCOK) { -#if CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - rc = dli_man.find_bound(this); -#else - set_commit_timestamp(glob_manager.get_ts(get_thd_id())); -#endif - } } else if (CC_ALG == DTA) { rc = dta_man.validate(this); // Note: home node must be last to validate diff --git a/contrib/deneva/system/txn.h b/contrib/deneva/system/txn.h index 3277355f..1f083bb4 100644 --- a/contrib/deneva/system/txn.h +++ b/contrib/deneva/system/txn.h @@ -300,11 +300,6 @@ class TxnManager { int last_txn_id; Message* last_msg; -#if CC_ALG == DLI_BASE || CC_ALG == DLI_MVCC || CC_ALG == DLI_MVCC_OCC || CC_ALG == DLI_OCC || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - DliValidatedTxn* dli_txn = nullptr; - DliValidatedTxn* history_dli_txn_head = nullptr; -#endif - #if IS_GENERIC_ALG std::unique_ptr, row_t*>> uni_txn_man_; #endif diff --git a/contrib/deneva/system/worker_thread.cpp b/contrib/deneva/system/worker_thread.cpp index 4f227e00..15da5718 100644 --- a/contrib/deneva/system/worker_thread.cpp +++ b/contrib/deneva/system/worker_thread.cpp @@ -46,7 +46,6 @@ #include "ssi.h" #include "focc.h" #include "bocc.h" -#include "dli.h" #include "dta.h" #include "da.h" @@ -291,7 +290,8 @@ void WorkerThread::abort() { // current data and will not be initialized. release_txn_man(); #else - abort_queue.enqueue(get_thd_id(), txn_man->get_txn_id(), txn_man->get_abort_cnt()); + uint64_t penalty = + abort_queue.enqueue(get_thd_id(), txn_man->get_txn_id(), txn_man->get_abort_cnt()); txn_man->txn_stats.total_abort_time += penalty; #endif } @@ -393,7 +393,9 @@ RC WorkerThread::run() { if (idle_starttime == 0) idle_starttime = get_sys_clock(); continue; } +#if WORKLOAD == DA simulation->last_da_query_time = get_sys_clock(); +#endif #if WORKLOAD == DA && DA_PRINT_LOG == true printf("thd_id:%lu stxn_id:%lu batch_id:%lu seq_id:%lu type:%c rtype:%d trans_id:%lu item:%c laststate:%lu state:%lu next_state:%lu\n", this->_thd_id, @@ -534,7 +536,7 @@ RC WorkerThread::process_rack_prep(Message * msg) { time_table.set_state(get_thd_id(),msg->get_txn_id(),MAAT_ABORTED); } #endif -#if CC_ALG == DTA || CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 +#if CC_ALG == DTA // Integrate bounds uint64_t lower = ((AckMessage*)msg)->lower; uint64_t upper = ((AckMessage*)msg)->upper; @@ -656,10 +658,6 @@ RC WorkerThread::process_rqry(Message * msg) { #if CC_ALG == DTA txn_table.update_min_ts(get_thd_id(), txn_man->get_txn_id(), 0, txn_man->get_timestamp()); dta_time_table.init(get_thd_id(), txn_man->get_txn_id(), txn_man->get_timestamp()); -#endif -#if CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - txn_table.update_min_ts(get_thd_id(), txn_man->get_txn_id(), 0, txn_man->get_start_timestamp()); - dta_time_table.init(get_thd_id(), txn_man->get_txn_id(), txn_man->get_start_timestamp()); #endif rc = txn_man->run_txn(); @@ -819,9 +817,7 @@ RC WorkerThread::process_rtxn(Message * msg) { #if CC_ALG == WSI || CC_ALG == SSI txn_table.update_min_ts(get_thd_id(),txn_man->get_txn_id(),0,txn_man->get_start_timestamp()); #endif -#if CC_ALG == OCC || CC_ALG == FOCC || CC_ALG == BOCC || CC_ALG == SSI || CC_ALG == WSI ||\ - CC_ALG == DLI_BASE || CC_ALG == DLI_OCC || CC_ALG == DLI_MVCC_OCC || CC_ALG == DLI_DTA ||\ - CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 || CC_ALG == DLI_MVCC +#if CC_ALG == OCC || CC_ALG == FOCC || CC_ALG == BOCC || CC_ALG == SSI || CC_ALG == WSI #if WORKLOAD==DA if(da_start_stamp_tab.count(txn_man->get_txn_id())==0) { @@ -857,10 +853,6 @@ RC WorkerThread::process_rtxn(Message * msg) { // assert(dta_time_table.get_lower(get_thd_id(),txn_man->get_txn_id()) == 0); assert(dta_time_table.get_upper(get_thd_id(), txn_man->get_txn_id()) == UINT64_MAX); assert(dta_time_table.get_state(get_thd_id(), txn_man->get_txn_id()) == DTA_RUNNING); -#endif -#if CC_ALG == DLI_DTA || CC_ALG == DLI_DTA2 || CC_ALG == DLI_DTA3 - txn_table.update_min_ts(get_thd_id(), txn_man->get_txn_id(), 0, txn_man->get_start_timestamp()); - dta_time_table.init(get_thd_id(), txn_man->get_txn_id(), txn_man->get_start_timestamp()); #endif rc = init_phase(); if (rc != RCOK) return rc; diff --git a/contrib/deneva/transport/message.cpp b/contrib/deneva/transport/message.cpp index 7e293540..e1fdf2b8 100644 --- a/contrib/deneva/transport/message.cpp +++ b/contrib/deneva/transport/message.cpp @@ -494,7 +494,7 @@ void YCSBClientQueryMessage::copy_from_buf(char * buf) { DEBUG_M("YCSBClientQueryMessage::copy ycsb_request alloc\n"); ycsb_request * req = (ycsb_request*)mem_allocator.alloc(sizeof(ycsb_request)); *req = *reinterpret_cast(buf + ptr); - + ptr += sizeof(ycsb_request); assert(req->key < g_synth_table_size); requests.add(req); } @@ -1412,6 +1412,7 @@ void YCSBQueryMessage::copy_from_buf(char * buf) { DEBUG_M("YCSBQueryMessage::copy ycsb_request alloc\n"); ycsb_request * req = (ycsb_request*)mem_allocator.alloc(sizeof(ycsb_request)); *req = *reinterpret_cast(buf + ptr); + ptr += sizeof(ycsb_request); ASSERT(req->key < g_synth_table_size); requests.add(req); } diff --git a/contrib/deneva/transport/transport.cpp b/contrib/deneva/transport/transport.cpp index 2eaf6040..3ceb8039 100644 --- a/contrib/deneva/transport/transport.cpp +++ b/contrib/deneva/transport/transport.cpp @@ -240,20 +240,6 @@ void Transport::init() { fflush(stdout); } -void Transport::destroy() { - for (auto& sock : recv_sockets) { - sock->~Socket(); - mem_allocator.free(sock, sizeof(Socket)); - } - recv_sockets.clear(); - for (auto& sock_pair : send_sockets) { - sock_pair.second->~Socket(); - mem_allocator.free(sock_pair.second, sizeof(Socket)); - } - send_sockets.clear(); - printf("Tport Destroy %d: %ld\n",g_node_id,_sock_cnt); -} - // rename sid to send thread id void Transport::send_msg(uint64_t send_thread_id, uint64_t dest_node_id, void * sbuf,int size) { uint64_t starttime = get_sys_clock(); diff --git a/contrib/deneva/transport/transport.h b/contrib/deneva/transport/transport.h index 197ac524..b4d6341c 100644 --- a/contrib/deneva/transport/transport.h +++ b/contrib/deneva/transport/transport.h @@ -49,7 +49,6 @@ class Transport { void read_ifconfig(const char * ifaddr_file); void init(); void shutdown(); - void destroy(); uint64_t get_socket_count(); string get_path(); Socket * get_socket(); diff --git a/contrib/deneva/unified_concurrency_control/alg_dli_identify_chain.h b/contrib/deneva/unified_concurrency_control/alg_dli_identify_chain.h new file mode 100644 index 00000000..88221486 --- /dev/null +++ b/contrib/deneva/unified_concurrency_control/alg_dli_identify_chain.h @@ -0,0 +1,140 @@ +//#pragma once +#ifndef TTTS_DENEVA_ALG_DLI_IDENTIFY_CHAIN_H_ +#define TTTS_DENEVA_ALG_DLI_IDENTIFY_CHAIN_H_ + +#include "dli_identify_util.h" +#include "row_prece.h" +#include +#include + +namespace ttts { + +template +class AlgManager> +{ + public: + using Txn = TxnManager; + + bool Validate(Txn& txn) + { + txn.node_->state() = TxnNode::State::PREPARING; + + { + std::scoped_lock l(m_); + cc_txns_.emplace_back(txn.node_); + } + + Path cycle_part = DirtyCycle(*txn.node_); + + if (!cycle_part.Passable()) { + cycle_part = CyclePart_(txn); + } + + if (!cycle_part.Passable()) { + return true; + } else { + txn.cycle_ = std::make_unique(std::move(cycle_part)); + return false; + } + } + + void Commit(Txn& txn) + { + txn.node_->Commit(); + } + + void Abort(Txn& txn) + { + if (!txn.cycle_) { + // we can only identify the dirty write/read anomaly rather than avoiding it + Path cycle = DirtyCycle(*txn.node_); + if (cycle.Passable()) { + txn.cycle_ = std::make_unique(std::move(cycle)); + } + } + if (const std::unique_ptr& cycle = txn.cycle_) { + txn.node_->Abort(true /*clear_to_txns*/); // break cycles to prevent memory leak + } else { + txn.node_->Abort(false /*clear_to_txns*/); + } + } + + void CheckConcurrencyTxnEmpty() { + std::scoped_lock l(m_); + bool is_empty = true; + for (const auto& weak_txn : cc_txns_) { + if (const auto txn = weak_txn.lock()) { + std::cerr << "** Txn Leak ** " << *txn; + is_empty = false; + } + } + // assert failed here means there is actually a cycle but we miss it + assert(is_empty); + cc_txns_.clear(); + } + + static AnomalyType IdentifyAnomaly(const std::vector& preces) + { + assert(preces.size() == 2); + const auto& p1 = preces.front(); + const auto& p2 = preces.back(); + if (std::any_of(preces.begin(), preces.end(), + [](const PreceInfo& prece) { return prece.type() == PreceType::WA || + prece.type() == PreceType::WC; })) { + return AnomalyType::WAT_1_DIRTY_WRITE; + } else if (std::any_of(preces.begin(), preces.end(), + [](const PreceInfo& prece) { return prece.type() == PreceType::RA; })) { + return AnomalyType::RAT_1_DIRTY_READ; + } else if (p1.from_txn_id() != p2.to_txn_id()) { + return IdentifyAnomalyMultiple(preces); + // [Note] When build path, later happened precedence is sorted to back, which is DIFFERENT from 3TS-DA + } else if (p1.row_id() != p2.row_id()) { + return IdentifyAnomalyDouble(p1.type(), p2.type()); + } else if (p1.to_ver_id() < p2.to_ver_id() || + (p1.to_ver_id() == p2.to_ver_id() && p1.from_ver_id() < p2.from_ver_id())) { + return IdentifyAnomalySingle(p1.type(), p2.type()); + } else { + return IdentifyAnomalySingle(p2.type(), p1.type()); + } + } + + private: + Path CyclePart_(Txn& txn) + { + // Validate failed if has a from_txn and a to_txn which are both finished but not released. + std::scoped_lock l(txn.node_->mutex()); + std::shared_ptr from_prece; + for (const auto& [_, weak_from_prece] : txn.node_->UnsafeGetFromPreces()) { + from_prece = weak_from_prece.lock(); + if (from_prece == nullptr) { + continue; + } + const auto& from_txn = from_prece->from_txn(); + if (from_txn == nullptr || from_txn->state() == TxnNode::State::ACTIVE) { + from_prece = nullptr; + continue; + } + break; + } + + std::shared_ptr to_prece; + for (const auto& [_, to_prece_tmp] : txn.node_->UnsafeGetToPreces()) { + if (to_prece_tmp->to_txn()->state() != TxnNode::State::ACTIVE) { + to_prece = to_prece_tmp; + break; + } + } + + if (from_prece && to_prece) { + return Path(std::vector{*from_prece, *to_prece}); + } + return {}; + } + + std::mutex m_; + std::vector> cc_txns_; // only for debug +}; + +} + +#endif // TTTS_DENEVA_ALG_DLI_IDENTIFY_CHAIN_H_ diff --git a/contrib/deneva/unified_concurrency_control/alg_dli_identify_cycle.h b/contrib/deneva/unified_concurrency_control/alg_dli_identify_cycle.h index 51d6f764..c7dcb8bf 100644 --- a/contrib/deneva/unified_concurrency_control/alg_dli_identify_cycle.h +++ b/contrib/deneva/unified_concurrency_control/alg_dli_identify_cycle.h @@ -1,3 +1,13 @@ +/* Tencent is pleased to support the open source community by making 3TS available. + * + * Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved. The below software + * in this distribution may have been modified by THL A29 Limited ("Tencent Modifications"). All + * Tencent Modifications are Copyright (C) THL A29 Limited. + * + * Author: williamcliu@tencent.com + * + */ + #pragma once #include "dli_identify_util.h" @@ -55,7 +65,9 @@ class AlgManager(*txn.node_); - txn.cycle_ = std::make_unique(std::move(cycle)); + if (cycle.Passable()) { + txn.cycle_ = std::make_unique(std::move(cycle)); + } } if (const std::unique_ptr& cycle = txn.cycle_) { txn.node_->Abort(true /*clear_to_txns*/); // break cycles to prevent memory leak diff --git a/contrib/deneva/unified_concurrency_control/alg_dli_identify_merge.h b/contrib/deneva/unified_concurrency_control/alg_dli_identify_merge.h index 57a6e35d..80acb011 100644 --- a/contrib/deneva/unified_concurrency_control/alg_dli_identify_merge.h +++ b/contrib/deneva/unified_concurrency_control/alg_dli_identify_merge.h @@ -1,3 +1,13 @@ +/* Tencent is pleased to support the open source community by making 3TS available. + * + * Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved. The below software + * in this distribution may have been modified by THL A29 Limited ("Tencent Modifications"). All + * Tencent Modifications are Copyright (C) THL A29 Limited. + * + * Author: williamcliu@tencent.com + * + */ + #pragma once #include "dli_identify_util.h" diff --git a/contrib/deneva/unified_concurrency_control/dli_identify_util.h b/contrib/deneva/unified_concurrency_control/dli_identify_util.h index cfc68c3f..2c6b07bf 100644 --- a/contrib/deneva/unified_concurrency_control/dli_identify_util.h +++ b/contrib/deneva/unified_concurrency_control/dli_identify_util.h @@ -1,4 +1,15 @@ +/* Tencent is pleased to support the open source community by making 3TS available. + * + * Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved. The below software + * in this distribution may have been modified by THL A29 Limited ("Tencent Modifications"). All + * Tencent Modifications are Copyright (C) THL A29 Limited. + * + * Author: williamcliu@tencent.com + * + */ + #pragma once + #include "row_prece.h" #include #include @@ -39,42 +50,31 @@ class Path { std::vector preces_; }; -template inline Path DirtyCycle(TxnNode& txn_node); - static Path DirtyPath_(const PreceInfo& rw_prece, TxnNode& txn_to_finish, const PreceType type) { PreceInfo dirty_prece(rw_prece.to_txn(), txn_to_finish.shared_from_this(), type, rw_prece.row_id(), rw_prece.to_ver_id(), UINT64_MAX); return Path(std::vector{rw_prece, std::move(dirty_prece)}); } -template <> -inline Path DirtyCycle(TxnNode& txn_to_finish) { +template +inline Path DirtyCycle(TxnNode& txn_to_finish) { std::lock_guard l(txn_to_finish.mutex()); - const auto& prece = txn_to_finish.UnsafeGetDirtyToTxn(); - if (!prece) { - return {}; - } else if (prece->type() == PreceType::WW) { - return DirtyPath_(*prece, txn_to_finish, PreceType::WC); - } else { - assert(prece->type() == PreceType::WR); - return {}; - } -} - -template <> -inline Path DirtyCycle(TxnNode& txn_to_finish) { - std::lock_guard l(txn_to_finish.mutex()); - const auto& prece = txn_to_finish.UnsafeGetDirtyToTxn(); - if (!prece) { - return {}; - } else if (prece->type() == PreceType::WW) { - return DirtyPath_(*prece, txn_to_finish, PreceType::WA); - } else if (prece->type() == PreceType::WR) { - return DirtyPath_(*prece, txn_to_finish, PreceType::RA); - } else { - assert(false); - return {}; + const auto& preces = txn_to_finish.UnsafeGetDirtyToPreces(); + for (const auto& prece : preces) { + if (!prece || prece->to_txn()->state() == TxnNode::State::ABORTED) { + // do nothing and continue + } else if (prece->type() == PreceType::WW) { + return DirtyPath_(*prece, txn_to_finish, IS_COMMIT ? PreceType::WC : PreceType::WA); + } else if (prece->type() == PreceType::WR) { + if (!IS_COMMIT) { + return DirtyPath_(*prece, txn_to_finish, PreceType::RA); + } + } else { + assert(false); + return {}; + } } + return {}; // no dirty cycle } Path::Path() {} @@ -247,3 +247,4 @@ static AnomalyType IdentifyAnomalyMultiple(const std::vector& preces) } } + diff --git a/contrib/deneva/unified_concurrency_control/operation_type.h b/contrib/deneva/unified_concurrency_control/operation_type.h new file mode 100644 index 00000000..f2393ab8 --- /dev/null +++ b/contrib/deneva/unified_concurrency_control/operation_type.h @@ -0,0 +1,24 @@ +#ifdef ENUM_BEGIN +#ifdef ENUM_MEMBER +#ifdef ENUM_END + +ENUM_BEGIN(OperationType) +ENUM_MEMBER(OperationType, W) +ENUM_MEMBER(OperationType, R) +ENUM_MEMBER(OperationType, C) +ENUM_MEMBER(OperationType, A) +ENUM_END(OperationType) + +#endif +#endif +#endif + +#ifndef TTTS_DENEVA_OPERATION_TYPE_H_ +#define TTTS_DENEVA_OPERATION_TYPE_H_ + +namespace ttts { +#define ENUM_FILE "../../../../contrib/deneva/unified_concurrency_control/operation_type.h" +#include "../../../src/3ts/backend/util/extend_enum.h" +} + +#endif // TTTS_DENEVA_OPERATION_TYPE_H_ diff --git a/contrib/deneva/unified_concurrency_control/row_prece.h b/contrib/deneva/unified_concurrency_control/row_prece.h index e223ff51..f1243485 100644 --- a/contrib/deneva/unified_concurrency_control/row_prece.h +++ b/contrib/deneva/unified_concurrency_control/row_prece.h @@ -1,84 +1,15 @@ -/* - Copyright 2016 Massachusetts Institute of Technology - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -#ifdef ENUM_BEGIN -#ifdef ENUM_MEMBER -#ifdef ENUM_END - -ENUM_BEGIN(OperationType) -ENUM_MEMBER(OperationType, W) -ENUM_MEMBER(OperationType, R) -ENUM_MEMBER(OperationType, C) -ENUM_MEMBER(OperationType, A) -ENUM_END(OperationType) - -ENUM_BEGIN(PreceType) -ENUM_MEMBER(PreceType, RW) -ENUM_MEMBER(PreceType, WR) -ENUM_MEMBER(PreceType, WCR) -ENUM_MEMBER(PreceType, WW) -ENUM_MEMBER(PreceType, WCW) -ENUM_MEMBER(PreceType, RA) -ENUM_MEMBER(PreceType, WC) -ENUM_MEMBER(PreceType, WA) -ENUM_END(PreceType) - -ENUM_BEGIN(AnomalyType) -// ======== WAT - 1 ========= -ENUM_MEMBER(AnomalyType, WAT_1_DIRTY_WRITE) -ENUM_MEMBER(AnomalyType, WAT_1_FULL_WRITE) -ENUM_MEMBER(AnomalyType, WAT_1_LOST_SELF_UPDATE) -ENUM_MEMBER(AnomalyType, WAT_1_LOST_UPDATE) -// ======== WAT - 2 ========= -ENUM_MEMBER(AnomalyType, WAT_2_DOUBLE_WRITE_SKEW_1) -ENUM_MEMBER(AnomalyType, WAT_2_DOUBLE_WRITE_SKEW_2) -ENUM_MEMBER(AnomalyType, WAT_2_READ_WRITE_SKEW_1) -ENUM_MEMBER(AnomalyType, WAT_2_READ_WRITE_SKEW_2) -ENUM_MEMBER(AnomalyType, WAT_2_FULL_WRITE_SKEW) -// ======== WAT - 3 ========= -ENUM_MEMBER(AnomalyType, WAT_STEP) -// ======== RAT - 1 ========= -ENUM_MEMBER(AnomalyType, RAT_1_DIRTY_READ) -ENUM_MEMBER(AnomalyType, RAT_1_INTERMEDIATE_READ) -ENUM_MEMBER(AnomalyType, RAT_1_NON_REPEATABLE_READ) -// ======== RAT - 2 ========= -ENUM_MEMBER(AnomalyType, RAT_2_WRITE_READ_SKEW) -ENUM_MEMBER(AnomalyType, RAT_2_DOUBLE_WRITE_SKEW_COMMITTED) -ENUM_MEMBER(AnomalyType, RAT_2_READ_SKEW) -ENUM_MEMBER(AnomalyType, RAT_2_READ_SKEW_2) -// ======== RAT - 3 ========= -ENUM_MEMBER(AnomalyType, RAT_STEP) -// ======== IAT - 1 ========= -ENUM_MEMBER(AnomalyType, IAT_1_LOST_UPDATE_COMMITTED) -// ======== IAT - 2 ========= -ENUM_MEMBER(AnomalyType, IAT_2_READ_WRITE_SKEW_COMMITTED) -ENUM_MEMBER(AnomalyType, IAT_2_WRITE_SKEW) -// ======== IAT - 3 ========= -ENUM_MEMBER(AnomalyType, IAT_STEP) -// ======== Unknown ========= -ENUM_MEMBER(AnomalyType, UNKNOWN_1) -ENUM_MEMBER(AnomalyType, UNKNOWN_2) -ENUM_END(AnomalyType) - -#endif -#endif -#endif - -#ifndef ROW_PRECE_H -#define ROW_PRECE_H +/* Tencent is pleased to support the open source community by making 3TS available. + * + * Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved. The below software + * in this distribution may have been modified by THL A29 Limited ("Tencent Modifications"). All + * Tencent Modifications are Copyright (C) THL A29 Limited. + * + * Author: williamcliu@tencent.com + * + */ + +#ifndef TTTS_DENEVA_ROW_PRECE_H_ +#define TTTS_DENEVA_ROW_PRECE_H_ #include #include @@ -88,12 +19,12 @@ ENUM_END(AnomalyType) #include #include #include "util.h" +#include "../../../src/3ts/backend/cca/anomaly_type.h" +#include "../../../src/3ts/backend/cca/prece_type.h" +#include "operation_type.h" namespace ttts { -#define ENUM_FILE "../unified_concurrency_control/row_prece.h" -#include "../system/extend_enum.h" - class TxnNode; inline std::pair DividePreceType(const PreceType prece) @@ -198,10 +129,8 @@ class TxnNode : public std::enable_shared_from_this to_preces_.try_emplace(to_txn_id, prece); to_txn_node->from_preces_.try_emplace(txn_id_, prece); // For dirty precedence, W1W2 has higher priority than W1R2 because W1R2C1 is not dirty read - if ((type == PreceType::WR || type == PreceType::WW) && - (!dirty_to_prece_ || (dirty_to_prece_->type() == PreceType::WR && - type == PreceType::WW))) { - dirty_to_prece_ = prece; + if (type == PreceType::WR || type == PreceType::WW) { + dirty_to_preces_.emplace_back(prece); } } } @@ -210,7 +139,7 @@ class TxnNode : public std::enable_shared_from_this const auto& UnsafeGetToPreces() const { return to_preces_; } const auto& UnsafeGetFromPreces() const { return from_preces_; } - const std::shared_ptr& UnsafeGetDirtyToTxn() const { return dirty_to_prece_; } + const auto& UnsafeGetDirtyToPreces() const { return dirty_to_preces_; } std::mutex& mutex() const { return m_; } @@ -218,14 +147,14 @@ class TxnNode : public std::enable_shared_from_this { std::lock_guard l(m_); state_ = State::COMMITTED; - dirty_to_prece_ = nullptr; + dirty_to_preces_.clear(); } void Abort(const bool clear_to_preces) { std::lock_guard l(m_); state_ = State::ABORTED; - dirty_to_prece_ = nullptr; + dirty_to_preces_.clear(); if (clear_to_preces) { to_preces_.clear(); } @@ -291,7 +220,7 @@ class TxnNode : public std::enable_shared_from_this // reference. Key is txn_id; std::unordered_map> from_preces_; // The prece may also stored in to_preces_. - std::shared_ptr dirty_to_prece_; + std::vector> dirty_to_preces_; }; inline uint64_t PreceInfo::to_txn_id() const { return to_txn_->txn_id(); } @@ -347,7 +276,7 @@ class VersionInfo template class RowManager> + ALG == UniAlgs::UNI_DLI_IDENTIFY_CHAIN>> { public: using Txn = TxnManager; @@ -358,6 +287,7 @@ class RowManager Read(Txn& txn) { std::lock_guard l(m_); @@ -430,4 +360,4 @@ class RowManager +#include +#include + +template +class TxnNode> +{ + public: + TxnNode() : txn_id_(txn_id), start_time_(start_time), state_(State::ACTIVE), w_side_conf_(false), + r_side_conf_(false) {} + + private: + const uint64_t txn_id_; + const uint64_t start_ts_; + std::atomic state_; + uint64_t commit_ts_; + std::atomic w_side_conf_; + std::atomic r_side_conf_; +}; + +template +class Version +{ + + const std::weak_ptr w_txn_; + const uint64_t w_ts_; + Data const data_; + std::vector> r_txns_; +}; + +template +class RowManager> +{ + public: + using Txn = TxnManager; + + RowManager(const uint64_t row_id, Data init_data) {} + + std::optional Read(Txn& txn) + { + std::lock_guard l(m_); + const auto& [latest_w_ts, latest_version] = versions_.back(); + if (latest_w_ts > l) + } + + bool Prewrite(Data data, Txn& txn) + { + std::lock_guard l(m_); + assert(!version_.empty()); + const auto& [latest_w_ts, latest_version] = versions_.back(); + if (latest_w_ts > txn.start_ts_ || w_lock_txn_ != nullptr) { // prevent S1W2 precedence + return false; + } + version.foreach_r_txn([](TxnNode& from_txn) { + if (from_txn.state() == from_txnNode::State::ACTIVE || + from_txn.state() == from_txnNode::State::PREPARING || + (from_txn.state() == from_txnNode::State::COMMITTED && from_txn.commit_ts_ > txn.start_ts_)) { + build_preces_from_r_txns_(from_txn, txn); + } + }); + + } + + void Write(Data _, Txn& txn) + { + } + + void Revoke(Data _, Txn& txn) + { + // do nothing + } + + private: + const uint64_t row_id_; + std::mutex m_; + std::deque>> versions_; // first is w_ts + std::shared_ptr w_lock_txn_; +}; diff --git a/contrib/deneva/unified_concurrency_control/txn_dli_identify.h b/contrib/deneva/unified_concurrency_control/txn_dli_identify.h index 6a54acc7..d90d6bb5 100644 --- a/contrib/deneva/unified_concurrency_control/txn_dli_identify.h +++ b/contrib/deneva/unified_concurrency_control/txn_dli_identify.h @@ -1,3 +1,13 @@ +/* Tencent is pleased to support the open source community by making 3TS available. + * + * Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved. The below software + * in this distribution may have been modified by THL A29 Limited ("Tencent Modifications"). All + * Tencent Modifications are Copyright (C) THL A29 Limited. + * + * Author: williamcliu@tencent.com + * + */ + #pragma once #include @@ -11,11 +21,11 @@ namespace ttts { template class TxnManager> + ALG == UniAlgs::UNI_DLI_IDENTIFY_CHAIN>> { public: TxnManager(const uint64_t txn_id) : node_(std::make_shared(txn_id)) {} - + TxnManager() {} const uint64_t txn_id() const { return node_->txn_id(); } std::unique_lock l_; @@ -25,3 +35,4 @@ class TxnManager +#include +#include +#include +#include "util.h" + +namespace ttts { + + +template +class TxnManager> +{ + public: + enum class State { ACTIVE, PREPARING, COMMITTED, ABORTED }; + + TxnManager(const uint64_t txn_id, const uint64_t start_time) : node_(std::make_shared(txn_id)) {} + + private: + std::shared_ptr> node_; +}; + +} diff --git a/contrib/deneva/unified_concurrency_control/util.h b/contrib/deneva/unified_concurrency_control/util.h index 83f4df0e..c80fe1a6 100644 --- a/contrib/deneva/unified_concurrency_control/util.h +++ b/contrib/deneva/unified_concurrency_control/util.h @@ -1,15 +1,39 @@ -#pragma once +/* Tencent is pleased to support the open source community by making 3TS available. + * + * Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved. The below software + * in this distribution may have been modified by THL A29 Limited ("Tencent Modifications"). All + * Tencent Modifications are Copyright (C) THL A29 Limited. + * + * Author: williamcliu@tencent.com + * + */ + +#ifdef ENUM_BEGIN +#ifdef ENUM_MEMBER +#ifdef ENUM_END + +ENUM_BEGIN(UniAlgs) +ENUM_MEMBER(UniAlgs, UNI_DLI_IDENTIFY_CHAIN) +ENUM_MEMBER(UniAlgs, UNI_DLI_IDENTIFY_CYCLE) +ENUM_MEMBER(UniAlgs, ALL) +ENUM_END(UniAlgs) + +#endif +#endif +#endif + +#ifndef TTTS_DENEVA_UNI_H_ +#define TTTS_DENEVA_UNI_H_ namespace ttts { -enum class UniAlgs -{ - UNI_DLI_IDENTIFY_CYCLE, - UNI_DLI_IDENTIFY_MERGE, -}; +#define ENUM_FILE "../../../../contrib/deneva/unified_concurrency_control/util.h" +#include "../../../src/3ts/backend/util/extend_enum.h" template class AlgManager; template class RowManager; template class TxnManager; } + +#endif // TTTS_DENEVA_UNI_ALGS_H_ diff --git a/src/3ts/backend/anomaly_identify.cc b/src/3ts/backend/anomaly_identify.cc index bbb7b00a..970205cd 100644 --- a/src/3ts/backend/anomaly_identify.cc +++ b/src/3ts/backend/anomaly_identify.cc @@ -54,18 +54,23 @@ int main() { } } else if (text.find("\\g") != text.npos || text.find("algorithm") != text.npos) { const auto index_space = text.find_first_of(" "); + bool ret = true; if (index_space != text.npos) { std::string input = text.substr(index_space); Printer::TrimSpace(input); if ("DLI_IDENTIFY_CYCLE" == input) { - printer.SetAlg(AlgType::DLI_IDENTIFY_CYCLE); + printer.SetAlg(ttts::UniAlgs::UNI_DLI_IDENTIFY_CYCLE); } else if ("DLI_IDENTIFY_CHAIN" == input) { - printer.SetAlg(AlgType::DLI_IDENTIFY_CHAIN); - } else if ("All" == input) { - printer.SetAlg(AlgType::ALL); + printer.SetAlg(ttts::UniAlgs::UNI_DLI_IDENTIFY_CHAIN); + } else if ("ALL" == input) { + printer.SetAlg(ttts::UniAlgs::ALL); } else { + ret = false; Printer::Print("Unknown Algorithm"); } + if (ret) { + std::cout << "Set algorithm success" << std::endl; + } } else { Printer::Print("Please check input format, such as \\g DLI_IDENTIFY_CYCLE"); } @@ -98,15 +103,7 @@ int main() { } else if (text.find("\\A") != text.npos || text.find("authors") != text.npos) { Printer::PrintAuthorInfo(); } else if (text.find("R") != text.npos || text.find("W") != text.npos) { - if (printer.Alg() == AlgType::DLI_IDENTIFY_CYCLE) { - checker.ExecDLI(text); - } else if (printer.Alg() == AlgType::DLI_IDENTIFY_CHAIN) { - // to do - } else if (printer.Alg() == AlgType::ALL) { - // to do - } else { - Printer::Print("alg has Unknown value"); - } + checker.ExecAnomalyIdentify(text, printer.Alg()); } } return 0; diff --git a/src/3ts/backend/anomaly_identify.h b/src/3ts/backend/anomaly_identify.h index 72cdeb2c..2f0de45e 100644 --- a/src/3ts/backend/anomaly_identify.h +++ b/src/3ts/backend/anomaly_identify.h @@ -8,41 +8,37 @@ * Author: farrisli@tencent.com * */ -#include +//#include #include #include #include "util/generic.h" #include "cca/conflict_serializable_algorithm.h" +#include "cca/unified_history_algorithm.h" #include "shape.h" - -enum class AlgType { - DLI_IDENTIFY_CYCLE, - DLI_IDENTIFY_CHAIN, - ALL -}; +#include "../../../contrib/deneva/unified_concurrency_control/util.h" class Printer { public: Printer() : anomaly_map_{ {"DirtyWrite", "WAT SDA 'R0a W0a W1a R1a W1a R1a C0 C1' Wi[xm]...Wj[xm+1]"}, - {"LostUpdate", "WAT SDA 'R0a R1a W0a R0a W0a W1a A1 C0' Ri[xm]...Wj[xm+1]...Wi[xm+2]"}, + {"LostUpdate", "WAT SDA 'R0a R1a W0a R0a W0a W1a C1 C0' Ri[xm]...Wj[xm+1]...Wi[xm+2]"}, {"LostSelfUpdate", "WAT SDA 'R0a W0a R0a W1a R0a W1a C0 C1' Wi[xm]...Wj[xm+1]...Ri[xm+1]"}, {"Full-Write", "WAT SDA 'R0a W0a R1a W1a W0a C0 W1a C1' Wi[xm]...Wj[xm+1]...Wi[xm+2]"}, - {"Read-WriteSkew1", "WAT DDA 'R0a W0a R0a R1b W0b W1a A0 C1' Ri[xm]...Wj[xm+1]...Wj[yn]...Wi[yn+1]"}, + {"Read-WriteSkew1", "WAT DDA 'R0a W0a R0a R1b W0b W1a C0 C1' Ri[xm]...Wj[xm+1]...Wj[yn]...Wi[yn+1]"}, {"Read-WriteSkew2", "WAT DDA 'R0a W0a W0b W1a R1b W0b C0 C1' Wi[xm]...Wj[xm+1]...Wj[yn]...Ri[yn]"}, - {"Double-WriteSkew1", "WAT DDA 'R0a W0a R1a W1a W1b W0b A1 C0' Wi[xm]...Rj[xm]...Wj[yn]...Wi[yn+1]"}, + {"Double-WriteSkew1", "WAT DDA 'R0a W0a R1a W1a W1b W0b C1 C0' Wi[xm]...Rj[xm]...Wj[yn]...Wi[yn+1]"}, {"Double-WriteSkew2", "WAT DDA 'R0a W0a R0a W1a W1b R0b C1 C0' Wi[xm]...Wj[xm+1]...Rj[yn]...Wi[yn+1]"}, - {"Full-WriteSkew", "WAT DDA 'R0a W0a W1a W1b W0b R1a A0 C1' Wi[xm]...Wj[xm+1]...Wj[yn]...Wi[yn+1]"}, - {"StepWAT", "WAT MDA 'R0a R0b W1b W2c W0c A0 C1 W2b C2' ...Wi[xm]...Wj[xm+1]..."}, + {"Full-WriteSkew", "WAT DDA 'R0a W0a W1a W1b W0b R1a C0 C1' Wi[xm]...Wj[xm+1]...Wj[yn]...Wi[yn+1]"}, + {"StepWAT", "WAT MDA 'R0a R0b W1b W2c W0c C0 C1 W2b C2' ...Wi[xm]...Wj[xm+1]..."}, {"DirtyRead", "RAT SDA 'R0a W0a R1a R0a R1a R0a C1 A0' Wi[xm]...Rj[xm+1]"}, - {"Non-RepeatableRead", "RAT SDA 'R0a R1a R0a W1a R0a R1a C1 C0' Ri[xm]...Wj[xm+1]...Ri[xm+1]"}, + {"Non-RepeatableRead", "RAT SDA 'R0a R1a R0a W1a R0a R1a C1 C0' Ri[xm]...Wj[xm+1]...Ri[xm+1]"}, {"IntermediateRead", "RAT SDA 'R0a W0a R1a W0a R1a W0a C1 C0' Wi[xm]...Rj[xm+1]...Wi[xm+2]"}, - {"ReadSkew", "RAT DDA 'R0a W0a R1b R0b W0b R1a C0 A1' Ri[xm]...Wj[xm+1]...Wj[yn]...Ri[yn]"}, + {"ReadSkew", "RAT DDA 'R0a W0a R1b R0b W0b R1a C0 C1' Ri[xm]...Wj[xm+1]...Wj[yn]...Ri[yn]"}, {"ReadSkew2", "RAT DDA 'R0a W0a W0b R1b R1a C1 W0a C0' Wi[xm]...Rj[xm]...Rj[yn]...Wi[yn+1]"}, - {"Write-ReadSkew", "RAT DDA 'R0a W0a R1a W1b R0b R1a A0 C1' Wi[xm]...Rj[xm]...Wj[yn]...Ri[yn]"}, + {"Write-ReadSkew", "RAT DDA 'R0a W0a R1a W1b R0b R1a C0 C1' Wi[xm]...Rj[xm]...Wj[yn]...Ri[yn]"}, {"StepRAT", "RAT MDA 'R0a R0b W1a R2a R2c W0c C0 C1 C2' ...Wi[xm]...Rj[xm]..., and not include (...Wii[xm]...Wjj[xm+1]...)"}, {"WriteSkew", "IAT DDA 'R0a R0b R1a W0a R1b W1b C1 C0' Ri[xm]...Wj[xm+1]...Rj[yn]...Wi[yn+1]"}, - {"StepIAT", "IAT MDA 'R0a R0b R1c W1a W2c A1 C2 W0c C0' ...Ri[xm]...Wj[xm+1]..., and not include (...Wii[xm]...Rjj[xm]...and ...Wiii[xm]...Wjjj[xm+1]...)"} + {"StepIAT", "IAT MDA 'R0a R0b R1c W1a W2c C1 C2 W0c C0' ...Ri[xm]...Wj[xm+1]..., and not include (...Wii[xm]...Rjj[xm]...and ...Wiii[xm]...Wjjj[xm+1]...)"} }, info_map_{ {"History", "The sequence of operations that produces the data anomaly, one history contains several operations."}, {"Operation", "One operation contains 3 character, such as R0a, first character is operation type, second character is transaction id, third character is data item.\n Operation Type -> Such as R W C A(R: Read, W: Write, C: Commit, A: Aort)\n Transaction ID -> Such as 0 1 2 ...(must be a number and less than 10)\n Data Item -> Such as a b c ...(must be lowercase letter)"}, @@ -133,124 +129,140 @@ class Printer { str.erase(itor, str.end()); } - AlgType Alg() const { return alg_; }; - void SetAlg(AlgType alg) { alg_ = alg; }; + ttts::UniAlgs Alg() const { return alg_; }; + void SetAlg(ttts::UniAlgs alg) { alg_ = alg; }; std::unordered_map InfoMap() const { return info_map_; }; std::unordered_map AnomalyMap() const { return anomaly_map_; }; private: - AlgType alg_ = AlgType::DLI_IDENTIFY_CYCLE; + ttts::UniAlgs alg_ = ttts::UniAlgs::UNI_DLI_IDENTIFY_CYCLE; std::unordered_map info_map_; std::unordered_map anomaly_map_; }; class Checker { public: - void ExecDLI(const std::string& text) { + void ExecAnomalyIdentify(const std::string& text, ttts::UniAlgs alg_type) { ttts::History history; std::istringstream is(text); - if ((is >> history)) { - ttts::ConflictSerializableAlgorithm alg; + + const auto get_and_print_anomaly = [&] (auto&& alg, auto&& alg_type) { const std::optional anomaly = alg.GetAnomaly(history, nullptr); - if (!anomaly.has_value()) { - std::cout << "No Data Anomaly\n" << std::endl; - } else { - const std::vector anomaly_info = AnomalyInfo(ttts::ToString(anomaly.value())); - std::cout << "DLI_IDENTIFY_CYCLE's Identification Result:" << std::endl; - std::cout << "Anomaly Type: " << anomaly_info[0] << "\nAnomaly SubType: " << anomaly_info[1] << "\nAnomaly Name: " << anomaly_info[2] << "\nAnomaly Format: " << anomaly_info[3] << "\n" << std::endl; + PrintAnomalyInfo(anomaly, alg_type); + }; + + if ((is >> history)) { + if (alg_type == ttts::UniAlgs::UNI_DLI_IDENTIFY_CYCLE) { + get_and_print_anomaly(ttts::ConflictSerializableAlgorithm(), alg_type); + } else if (alg_type == ttts::UniAlgs::UNI_DLI_IDENTIFY_CHAIN) { + ttts::UnifiedHistoryAlgorithm alg; + get_and_print_anomaly(ttts::UnifiedHistoryAlgorithm(), alg_type); + } else if (alg_type == ttts::UniAlgs::ALL) { + get_and_print_anomaly(ttts::ConflictSerializableAlgorithm(), ttts::UniAlgs::UNI_DLI_IDENTIFY_CYCLE); + get_and_print_anomaly(ttts::UnifiedHistoryAlgorithm(), ttts::UniAlgs::UNI_DLI_IDENTIFY_CHAIN); } } } - std::vector AnomalyInfo(const std::string& anomaly) { - auto index = anomaly.find_first_of("_"); - std::vector anomaly_info; - if (index != anomaly.npos) { - // get anomaly_type - anomaly_info.emplace_back(anomaly.substr(0, index)); - // get anomaly_subtype - std::string anomaly_subtype = ""; - std::string m = anomaly.substr(index + 1, 1); - if ("1" == m) { - anomaly_subtype = "SDA"; - } else if ("2" == m) { - anomaly_subtype = "DDA"; - } else { - anomaly_subtype = "MDA"; - } - anomaly_info.emplace_back(anomaly_subtype); - // get anomaly_name - std::string name = anomaly.substr(index + 3); - if (anomaly.find("STEP") == anomaly.npos) { - bool is_head = false; - for (size_t i = 0; i < name.size(); i++) { - if (i == 0) { - continue; - } else if (name[i] == '_') { - name[i] = ' '; - is_head = true; - } else if (is_head == true) { - is_head = false; - } else if (name[i] >= 'A' && name[i] <= 'Z') { - name[i] += 'a' - 'A'; // Convert to lowercase - } + void PrintAnomalyInfo(const std::optional anomaly, ttts::UniAlgs alg_type) { + if (!anomaly.has_value()) { + std::cout << "No Data Anomaly\n" << std::endl; + } else { + const std::vector anomaly_info = AnomalyInfo(ttts::ToString(anomaly.value())); + std::cout << ttts::ToString(alg_type) << "'s Identification Result:" << std::endl; + std::cout << "Anomaly Type: " << anomaly_info[0] << "\nAnomaly SubType: " << anomaly_info[1] << "\nAnomaly Name: " << anomaly_info[2] << "\nAnomaly Format: " << anomaly_info[3] << "\n" << std::endl; + } + } + + std::vector AnomalyInfo(const std::string& anomaly) { + auto index = anomaly.find_first_of("_"); + std::vector anomaly_info; + if (index != anomaly.npos) { + // get anomaly_type + anomaly_info.emplace_back(anomaly.substr(0, index)); + // get anomaly_subtype + std::string anomaly_subtype = ""; + std::string m = anomaly.substr(index + 1, 1); + if ("1" == m) { + anomaly_subtype = "SDA"; + } else if ("2" == m) { + anomaly_subtype = "DDA"; + } else { + anomaly_subtype = "MDA"; + } + anomaly_info.emplace_back(anomaly_subtype); + // get anomaly_name + std::string name = anomaly.substr(index + 3); + if (anomaly.find("STEP") == anomaly.npos) { + bool is_head = false; + for (size_t i = 0; i < name.size(); i++) { + if (i == 0) { + continue; + } else if (name[i] == '_') { + name[i] = ' '; + is_head = true; + } else if (is_head == true) { + is_head = false; + } else if (name[i] >= 'A' && name[i] <= 'Z') { + name[i] += 'a' - 'A'; // Convert to lowercase } - } else { - name = "Step " + anomaly_info[0]; - } - anomaly_info.emplace_back(name); - // get anomaly_format - std::string format = ""; - if ("Dirty Write" == name) { - format = "Wi[xm]...Wj[xm+1]"; - } else if ("Full Write" == name) { - format = "Wi[xm]...Wj[xm+1]...Wi[xm+2]"; - } else if ("Lost Self Update" == name) { - format = "Wi[xm]...Wj[xm+1]...Ri[xm+1]"; - } else if ("Lost Update" == name) { - format = "Ri[xm]...Wj[xm+1]...Wi[xm+2]"; - } else if ("Double Write Skew 1" == name) { - format = "Wi[xm]...Rj[xm]...Wj[yn]...Wi[yn+1]"; - } else if ("Double Write Skew 2" == name) { - format = "Wi[xm]...Wj[xm+1]...Rj[yn]...Wi[yn+1]"; - } else if ("Read Write Skew 1" == name) { - format = "Ri[xm]...Wj[xm+1]...Wj[yn]...Wi[yn+1]"; - } else if ("Read Write Skew 2" == name) { - format = "Wi[xm]...Wj[xm+1]...Wj[yn]...Ri[yn]"; - } else if ("Full Write Skew" == name) { - format = "Wi[xm]...Wj[xm+1]...Wj[yn]...Wi[yn+1]"; - } else if ("WAT_STEP" == anomaly) { - format = "...Wi[xm]...Wj[xm+1]..."; - } else if ("Dirty Read" == name) { - format = "Wi[xm]...Rj[xm+1]"; - } else if ("Non Repeatable Read" == name) { - format = "Ri[xm]...Wj[xm+1]...Ri[xm+1]"; - } else if ("Intermediate Read" == name) { - format = "Wi[xm]...Rj[xm+1]...Wi[xm+2]"; - } else if ("Write Read Skew" == name) { - format = "Wi[xm]...Rj[xm]...Wj[yn]...Ri[yn]"; - } else if ("DOUBLE_WRITE_SKEW_COMMITTED" == name) { - format = "Wi[xm]...Rj[xm+1]...Wj[yn]...Wi[yn+1]"; - } else if ("Read Skew" == name) { - format = "Ri[xm]...Wj[xm+1]...Wj[yn]...Ri[yn]"; - } else if ("Read Skew 2" == name) { - format = "Wi[xm]...Rj[xm]...Rj[yn]...Wi[yn+1]"; - } else if ("RAT_STEP" == anomaly) { - format = "...Wi[xm]...Rj[xm]..., and not include (...Wii[xm]...Wjj[xm+1]...)"; - } else if ("LOST_UPDATE_COMMITTED" == name) { - format = "Ri[xm]...Wj[xm]...Wj[yn]...Wi[yn+1]"; - } else if ("READ_WRITE_SKEW_COMMITTED" == name) { - format = "Ri[xm]...Wj[xm]...Wj[yn]...Wi[yn+1]"; - } else if ("Write Skew" == name) { - format = "Ri[xm]...Wj[xm+1]...Rj[yn]...Wi[yn+1]"; - } else if ("IAT_STEP" == anomaly) { - format = "...Ri[xm]...Wj[xm+1]..., and not include (...Wii[xm]...Rjj[xm]...and ...Wiii[xm]...Wjjj[xm+1]...)"; } - anomaly_info.emplace_back(format); } else { - std::cerr << "get AnomalyType failed" << std::endl; + name = "Step " + anomaly_info[0]; } - return anomaly_info; + anomaly_info.emplace_back(name); + // get anomaly_format + std::string format = ""; + if ("Dirty Write" == name) { + format = "Wi[xm]...Wj[xm+1]"; + } else if ("Full Write" == name) { + format = "Wi[xm]...Wj[xm+1]...Wi[xm+2]"; + } else if ("Lost Self Update" == name) { + format = "Wi[xm]...Wj[xm+1]...Ri[xm+1]"; + } else if ("Lost Update" == name) { + format = "Ri[xm]...Wj[xm+1]...Wi[xm+2]"; + } else if ("Double Write Skew 1" == name) { + format = "Wi[xm]...Rj[xm]...Wj[yn]...Wi[yn+1]"; + } else if ("Double Write Skew 2" == name) { + format = "Wi[xm]...Wj[xm+1]...Rj[yn]...Wi[yn+1]"; + } else if ("Read Write Skew 1" == name) { + format = "Ri[xm]...Wj[xm+1]...Wj[yn]...Wi[yn+1]"; + } else if ("Read Write Skew 2" == name) { + format = "Wi[xm]...Wj[xm+1]...Wj[yn]...Ri[yn]"; + } else if ("Full Write Skew" == name) { + format = "Wi[xm]...Wj[xm+1]...Wj[yn]...Wi[yn+1]"; + } else if ("WAT_STEP" == anomaly) { + format = "...Wi[xm]...Wj[xm+1]..."; + } else if ("Dirty Read" == name) { + format = "Wi[xm]...Rj[xm+1]"; + } else if ("Non Repeatable Read" == name) { + format = "Ri[xm]...Wj[xm+1]...Ri[xm+1]"; + } else if ("Intermediate Read" == name) { + format = "Wi[xm]...Rj[xm+1]...Wi[xm+2]"; + } else if ("Write Read Skew" == name) { + format = "Wi[xm]...Rj[xm]...Wj[yn]...Ri[yn]"; + } else if ("DOUBLE_WRITE_SKEW_COMMITTED" == name) { + format = "Wi[xm]...Rj[xm+1]...Wj[yn]...Wi[yn+1]"; + } else if ("Read Skew" == name) { + format = "Ri[xm]...Wj[xm+1]...Wj[yn]...Ri[yn]"; + } else if ("Read Skew 2" == name) { + format = "Wi[xm]...Rj[xm]...Rj[yn]...Wi[yn+1]"; + } else if ("RAT_STEP" == anomaly) { + format = "...Wi[xm]...Rj[xm]..., and not include (...Wii[xm]...Wjj[xm+1]...)"; + } else if ("LOST_UPDATE_COMMITTED" == name) { + format = "Ri[xm]...Wj[xm]...Wj[yn]...Wi[yn+1]"; + } else if ("READ_WRITE_SKEW_COMMITTED" == name) { + format = "Ri[xm]...Wj[xm]...Wj[yn]...Wi[yn+1]"; + } else if ("Write Skew" == name) { + format = "Ri[xm]...Wj[xm+1]...Rj[yn]...Wi[yn+1]"; + } else if ("IAT_STEP" == anomaly) { + format = "...Ri[xm]...Wj[xm+1]..., and not include (...Wii[xm]...Rjj[xm]...and ...Wiii[xm]...Wjjj[xm+1]...)"; + } + anomaly_info.emplace_back(format); + } else { + std::cerr << "get AnomalyType failed" << std::endl; } + return anomaly_info; + } }; diff --git a/src/3ts/backend/cca/conflict_serializable_algorithm.h b/src/3ts/backend/cca/conflict_serializable_algorithm.h index f7edc9c9..7a9ff9f2 100644 --- a/src/3ts/backend/cca/conflict_serializable_algorithm.h +++ b/src/3ts/backend/cca/conflict_serializable_algorithm.h @@ -23,18 +23,18 @@ namespace ttts { -class PreceInfo { +class DAPreceInfo { public: - PreceInfo(const uint64_t pre_trans_id, const uint64_t trans_id, const uint64_t item_id, const PreceType type, const uint32_t order) + DAPreceInfo(const uint64_t pre_trans_id, const uint64_t trans_id, const uint64_t item_id, const PreceType type, const uint32_t order) : pre_trans_id_(pre_trans_id), trans_id_(trans_id), item_id_(item_id), type_(type), order_(order) {} - PreceInfo(const PreceInfo&) = default; + DAPreceInfo(const DAPreceInfo&) = default; - friend std::ostream& operator<<(std::ostream& os, const PreceInfo prece) { + friend std::ostream& operator<<(std::ostream& os, const DAPreceInfo prece) { return os << 'T' << static_cast('0' + prece.pre_trans_id_) << "-[" << prece.type_ << "-" << static_cast('a' + prece.item_id_) << "]->" << 'T' << static_cast('0' + prece.trans_id_); } - bool operator>(const PreceInfo& p) const { return order_ > p.order_; } - bool operator<(const PreceInfo& p) const { return order_ < p.order_; } + bool operator>(const DAPreceInfo& p) const { return order_ > p.order_; } + bool operator<(const DAPreceInfo& p) const { return order_ < p.order_; } uint64_t item_id() const { return item_id_; } PreceType type() const { return type_; } @@ -55,19 +55,19 @@ class ConflictGraphNode { bool HasNoPreTrans() const { return pre_trans_set_.empty(); } void AddPreTrans(const uint64_t pre_trans_id, const uint64_t item_id, const PreceType type, const uint32_t order) { // we only record the first precedence between the two specific transactions - pre_trans_set_.try_emplace(pre_trans_id, PreceInfo{pre_trans_id, trans_id_, item_id, type, order}); + pre_trans_set_.try_emplace(pre_trans_id, DAPreceInfo{pre_trans_id, trans_id_, item_id, type, order}); } void RemovePreTrans(const uint64_t pre_trans_id) { pre_trans_set_.erase(pre_trans_id); } void Remove() { removed_ = true; } bool IsRemoved() const { return removed_; } std::optional& is_committed() { return is_committed_; } const std::optional& is_committed() const { return is_committed_; } - const std::map& pre_trans_set() const { return pre_trans_set_; } + const std::map& pre_trans_set() const { return pre_trans_set_; } uint64_t trans_id() const { return trans_id_; } private: const uint64_t trans_id_; - std::map pre_trans_set_; + std::map pre_trans_set_; bool removed_; std::optional is_committed_; }; @@ -79,17 +79,17 @@ static void sort(Container&& container, Compare&& comp) { } } -class Path { +class DAPath { public: - Path() {} - Path(std::vector&& preces) : preces_((sort(preces, std::greater()), std::move(preces))) {} - Path(const PreceInfo& prece) : preces_{prece} {} - Path(const Path&) = default; - Path(Path&&) = default; - Path& operator=(const Path&) = default; - Path& operator=(Path&&) = default; - - bool operator<(const Path& p) const { + DAPath() {} + DAPath(std::vector&& preces) : preces_((sort(preces, std::greater()), std::move(preces))) {} + DAPath(const DAPreceInfo& prece) : preces_{prece} {} + DAPath(const DAPath&) = default; + DAPath(DAPath&&) = default; + DAPath& operator=(const DAPath&) = default; + DAPath& operator=(DAPath&&) = default; + + bool operator<(const DAPath& p) const { // impassable has the greatest weight if (!passable()) { return false; @@ -100,30 +100,30 @@ class Path { return std::lexicographical_compare(preces_.begin(), preces_.end(), p.preces_.begin(), p.preces_.end()); } - Path operator+(const Path& p) const { + DAPath operator+(const DAPath& p) const { if (!passable() || !p.passable()) { return {}; } - std::vector preces; - std::merge(preces_.begin(), preces_.end(), p.preces_.begin(), p.preces_.end(), std::back_inserter(preces), std::greater()); + std::vector preces; + std::merge(preces_.begin(), preces_.end(), p.preces_.begin(), p.preces_.end(), std::back_inserter(preces), std::greater()); return preces; } - friend std::ostream& operator<<(std::ostream& os, const Path& path) { + friend std::ostream& operator<<(std::ostream& os, const DAPath& path) { if (path.preces_.empty()) { os << "Empty path"; } else { - std::copy(path.preces_.begin(), path.preces_.end(), std::ostream_iterator(os, ", ")); + std::copy(path.preces_.begin(), path.preces_.end(), std::ostream_iterator(os, ", ")); } return os; } bool passable() const { return !preces_.empty(); } - const std::vector& preces() const { return preces_; } + const std::vector& preces() const { return preces_; } private: - std::vector preces_; + std::vector preces_; }; class ConflictGraph { @@ -166,9 +166,9 @@ class ConflictGraph { std::optional& is_committed(const uint64_t trans_id) { return nodes_[trans_id].is_committed(); } // Find the first conflict cycle in history. The latest precedence is at the head of return path. - Path MinCycleByFloyd() const { + DAPath MinCycleByFloyd() const { const size_t trans_num = nodes_.size(); - Path matrix[trans_num][trans_num]; + DAPath matrix[trans_num][trans_num]; // init matrix for (uint64_t pre_trans_id = 0; pre_trans_id < trans_num; ++pre_trans_id) { @@ -181,13 +181,13 @@ class ConflictGraph { } } - static auto update_path = [](Path& path, Path&& new_path) { + static auto update_path = [](DAPath& path, DAPath&& new_path) { if (new_path < path) { path = std::move(new_path); // do not use std::min because there is a copy cost when assign self } }; - Path min_cycle; + DAPath min_cycle; for (uint64_t mid = 0; mid < trans_num; ++mid) { // find mini cycle when pass mid node for (uint64_t start = 0; start < mid; ++start) { @@ -346,7 +346,7 @@ class ConflictSerializableAlgorithm : public HistoryAlgorithm { } private: - static uint64_t ItemCount_(const std::vector& preces) { + static uint64_t ItemCount_(const std::vector& preces) { std::set item_ids; for (const auto prece : preces) { item_ids.emplace(prece.item_id()); @@ -354,12 +354,12 @@ class ConflictSerializableAlgorithm : public HistoryAlgorithm { return item_ids.size(); } - static AnomalyType IdentifyAnomaly_(const std::vector& preces) { + static AnomalyType IdentifyAnomaly_(const std::vector& preces) { assert(preces.size() >= 2); - if (std::any_of(preces.begin(), preces.end(), [](const PreceInfo& prece) { return prece.type() == PreceType::WA || prece.type() == PreceType::WC; })) { + if (std::any_of(preces.begin(), preces.end(), [](const DAPreceInfo& prece) { return prece.type() == PreceType::WA || prece.type() == PreceType::WC; })) { // WA and WC precedence han only appear return AnomalyType::WAT_1_DIRTY_WRITE; - } else if (std::any_of(preces.begin(), preces.end(), [](const PreceInfo& prece) { return prece.type() == PreceType::RA; })) { + } else if (std::any_of(preces.begin(), preces.end(), [](const DAPreceInfo& prece) { return prece.type() == PreceType::RA; })) { return AnomalyType::RAT_1_DIRTY_READ; } else if (preces.size() >= 3) { return IdentifyAnomalyMultiple_(preces); @@ -427,11 +427,11 @@ class ConflictSerializableAlgorithm : public HistoryAlgorithm { } } - static AnomalyType IdentifyAnomalyMultiple_(const std::vector& preces) { - if (std::any_of(preces.begin(), preces.end(), [](const PreceInfo& prece) { return prece.type() == PreceType::WW; })) { + static AnomalyType IdentifyAnomalyMultiple_(const std::vector& preces) { + if (std::any_of(preces.begin(), preces.end(), [](const DAPreceInfo& prece) { return prece.type() == PreceType::WW; })) { return AnomalyType::WAT_STEP; } - if (std::any_of(preces.begin(), preces.end(), [](const PreceInfo& prece) { return prece.type() == PreceType::WR || prece.type() == PreceType::WCR; })) { + if (std::any_of(preces.begin(), preces.end(), [](const DAPreceInfo& prece) { return prece.type() == PreceType::WR || prece.type() == PreceType::WCR; })) { return AnomalyType::RAT_STEP; } return AnomalyType::IAT_STEP; diff --git a/src/3ts/backend/cca/unified_history_algorithm.h b/src/3ts/backend/cca/unified_history_algorithm.h new file mode 100644 index 00000000..da1cc8f8 --- /dev/null +++ b/src/3ts/backend/cca/unified_history_algorithm.h @@ -0,0 +1,105 @@ +#include "algorithm.h" +#include "../../../../contrib/deneva/unified_concurrency_control/util.h" +#include "anomaly_type.h" +#include "prece_type.h" +#include "algorithm.h" +#include "../../../../contrib/deneva/unified_concurrency_control/dli_identify_util.h" +#include "../../../../contrib/deneva/unified_concurrency_control/txn_dli_identify.h" +#include "../../../../contrib/deneva/unified_concurrency_control/row_prece.h" +#include "../../../../contrib/deneva/unified_concurrency_control/alg_dli_identify_chain.h" +#include "../../../../contrib/deneva/unified_concurrency_control/alg_dli_identify_cycle.h" +#include + +namespace ttts { + +template +class UnifiedHistoryAlgorithm : public HistoryAlgorithm { +public: + UnifiedHistoryAlgorithm() : HistoryAlgorithm(ToString(ALG)), anomaly_counts_{0} {} + + virtual bool Check(const History& history, std::ostream* const os) const override { + return !(GetAnomaly(history, os).has_value()); + } + + std::optional GetAnomaly(const History& history, std::ostream* const os) const { + std::unique_ptr> alg_manager = std::make_unique>(); + std::unordered_map>> txn_map; + std::unordered_map>> row_map; + std::unordered_map row_value_map; + std::vector>> trans_write_set_list(history.trans_num()); + for (size_t i = 0, size = history.size(); i < size; ++i) { + const Operation& operation = history.operations()[i]; + const uint64_t trans_id = operation.trans_id(); + // init txn_map + if (txn_map.count(trans_id) == 0) { + std::unique_ptr> txn = std::make_unique>(trans_id); + txn_map.emplace(trans_id, std::move(txn)); + } + // check operation whether R or W + if (operation.IsPointDML()) { + const uint64_t item_id = operation.item_id(); + // If it is an unaccessed variable, the value is initialized to 0. + if (row_value_map.count(0) == 0) { + row_value_map.emplace(item_id, 0); + } + // If it is an unaccessed variable, row obj will be put into row_map. + if (row_map.count(item_id) == 0) { + std::unique_ptr> row = std::make_unique>(item_id, 0); + row_map.emplace(item_id, std::move(row)); + } + // Exec Read + if (Operation::Type::READ == operation.type()) { + row_map[item_id]->Read(*txn_map[trans_id]); + // Exec Prewrite + } else if (Operation::Type::WRITE == operation.type()) { + row_value_map[item_id] += 1; + row_map[item_id]->Prewrite(row_value_map[item_id], *txn_map[trans_id]); + trans_write_set_list[trans_id].emplace_back(std::pair(item_id, row_value_map[item_id])); + //std::cout << "W->tx_id:" << trans_id << " item_id:" << item_id << " value:" << row_value_map[item_id] << std::endl; + // Exec Abort + } + } else if (Operation::Type::ABORT == operation.type()) { + alg_manager->Abort(*txn_map[trans_id]); + // rollback written row + for (const auto& item_write : trans_write_set_list[trans_id]) { + row_map[item_write.first]->Revoke(item_write.second, *txn_map[trans_id]); + } + // check data anomaly in abort + if (txn_map[trans_id]->cycle_ != nullptr) { + //std::cout << "abort::preces_size:" << txn_map[trans_id]->cycle_->Preces().size() << std::endl; + const auto anomaly = AlgManager::IdentifyAnomaly(txn_map[trans_id]->cycle_->Preces()); + ++(anomaly_counts_.at(static_cast(anomaly))); + return anomaly; + } + } else if (Operation::Type::COMMIT == operation.type()) { + bool ret = alg_manager->Validate(*txn_map[trans_id]); + if (ret) { + alg_manager->Commit(*txn_map[trans_id]); + // data persistence + for (const auto& row : row_map) { + row.second->Write(row_value_map[row.first], *txn_map[trans_id]); + } + } else { + alg_manager->Abort(*txn_map[trans_id]); + // rollback written row + for (const auto& item_write : trans_write_set_list[trans_id]) { + row_map[item_write.first]->Revoke(item_write.second, *txn_map[trans_id]); + } + } + // check data anomaly in commit + if (txn_map[trans_id]->cycle_ != nullptr) { + //std::cout << "commit::preces_size" << txn_map[trans_id]->cycle_->Preces().size() << std::endl; + const auto anomaly = AlgManager::IdentifyAnomaly(txn_map[trans_id]->cycle_->Preces()); + ++(anomaly_counts_.at(static_cast(anomaly))); + return anomaly; + } + } + } + return {}; + } + +private: + mutable std::array, Count()> anomaly_counts_; +}; + +} diff --git a/src/3ts/backend/history/parse_config.h b/src/3ts/backend/history/parse_config.h index 8d03bccf..a66e7eed 100644 --- a/src/3ts/backend/history/parse_config.h +++ b/src/3ts/backend/history/parse_config.h @@ -19,6 +19,7 @@ #include "../cca/occ_algorithm/trans/wsi_trans.h" #include "../cca/occ_algorithm/trans/dli_trans.h" #include "../cca/serializable_algorithm.h" +#include "../cca/unified_history_algorithm.h" #include "../util/generic.h" #include "generator.h" #include "outputter.h" @@ -129,6 +130,10 @@ void AlgorithmParseInternal_(const libconfig::Config &cfg, const std::string &al add_algorithm(std::make_shared>()); } else if (algorithm_name == "DLI_IDENTIFY") { add_algorithm(std::make_shared>()); + } else if (algorithm_name == "DLI_IDENTIFY_CYCLE") { + add_algorithm(std::make_shared>()); + } else if (algorithm_name == "DLI_IDENTIFY_CHAIN") { + add_algorithm(std::make_shared>()); } else { throw "Unknown algorithm name " + algorithm_name; } diff --git a/src/3ts/backend/util/extend_enum.h b/src/3ts/backend/util/extend_enum.h index 6fc5277a..2d4b8dec 100644 --- a/src/3ts/backend/util/extend_enum.h +++ b/src/3ts/backend/util/extend_enum.h @@ -1,20 +1,8 @@ -/* - * Tencent is pleased to support the open source community by making 3TS available. - * - * Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved. The below software - * in this distribution may have been modified by THL A29 Limited ("Tencent Modifications"). All - * Tencent Modifications are Copyright (C) THL A29 Limited. - * - * Author: williamcliu@tencent.com - * - */ - #ifdef ENUM_FILE -template >> constexpr uint32_t Count(); -template >> const char* ToString(const EnumType e); -template >> std::ostream& operator<<(std::ostream& os, const EnumType e); -template >> const std::array()>& Members(); +template >> inline constexpr uint32_t Count(); +template >> inline const char* ToString(const EnumType e); +template >> inline const std::array()>& Members(); #define ENUM_BEGIN(name) enum class name : uint32_t { #define ENUM_MEMBER(_, member) member, @@ -27,7 +15,7 @@ template const char* ToString(const name e)\ +template <> inline const char* ToString(const name e)\ {\ static std::array()> strings { #define ENUM_MEMBER(_, member) #member, @@ -35,7 +23,6 @@ template <> const char* ToString(const name e)\ };\ return strings.at(static_cast(e));\ }\ -template <> std::ostream& operator<<(std::ostream& os, const name e) { return os << ToString(e); } #include ENUM_FILE @@ -44,7 +31,7 @@ template <> std::ostream& operator<<(std::ostream& os, const name e) { ret #undef ENUM_END #define ENUM_BEGIN(name)\ -template <> const std::array()>& Members()\ +template <> inline const std::array()>& Members()\ {\ static const std::array()> members { #define ENUM_MEMBER(name, member) name::member, @@ -61,3 +48,10 @@ template <> const std::array()>& Members()\ #undef ENUM_FILE #endif + +#ifndef EXTEND_ENUM_H_ +#define EXTEND_ENUM_H_ + +template >> inline std::ostream& operator<<(std::ostream& os, const EnumType e) { return os << ToString(e); } + +#endif