Skip to content

Commit

Permalink
2021-p4: Concurrency Control with deadlock prevention (#202)
Browse files Browse the repository at this point in the history
* Removed Deadlock Detection and added Deadlock Prevention

* Using promise for timing in basic test
  • Loading branch information
Mike Xu committed Nov 15, 2021
1 parent 6efd294 commit a865ed8
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 176 deletions.
19 changes: 0 additions & 19 deletions src/concurrency/lock_manager.cpp
Expand Up @@ -39,23 +39,4 @@ bool LockManager::Unlock(Transaction *txn, const RID &rid) {
return true;
}

void LockManager::AddEdge(txn_id_t t1, txn_id_t t2) {}

void LockManager::RemoveEdge(txn_id_t t1, txn_id_t t2) {}

bool LockManager::HasCycle(txn_id_t *txn_id) { return false; }

std::vector<std::pair<txn_id_t, txn_id_t>> LockManager::GetEdgeList() { return {}; }

void LockManager::RunCycleDetection() {
while (enable_cycle_detection_) {
std::this_thread::sleep_for(cycle_detection_interval);
{
std::unique_lock<std::mutex> l(latch_);
// TODO(student): remove the continue and add your cycle detection and abort code here
continue;
}
}
}

} // namespace bustub
59 changes: 15 additions & 44 deletions src/include/concurrency/lock_manager.h
Expand Up @@ -21,6 +21,7 @@
#include <utility>
#include <vector>

#include "common/config.h"
#include "common/rid.h"
#include "concurrency/transaction.h"

Expand All @@ -46,31 +47,27 @@ class LockManager {
class LockRequestQueue {
public:
std::list<LockRequest> request_queue_;
std::condition_variable cv_; // for notifying blocked transactions on this rid
bool upgrading_ = false;
// for notifying blocked transactions on this rid
std::condition_variable cv_;
// txn_id of an upgrading transaction (if any)
txn_id_t upgrading_ = INVALID_TXN_ID;
};

public:
/**
* Creates a new lock manager configured for the deadlock detection policy.
* Creates a new lock manager configured for the deadlock prevention policy.
*/
LockManager() {
enable_cycle_detection_ = true;
cycle_detection_thread_ = new std::thread(&LockManager::RunCycleDetection, this);
}
LockManager() = default;

~LockManager() {
enable_cycle_detection_ = false;
cycle_detection_thread_->join();
delete cycle_detection_thread_;
}
~LockManager() = default;

/*
* [LOCK_NOTE]: For all locking functions, we:
* 1. return false if the transaction is aborted; and
* 2. block on wait, return true when the lock request is granted; and
* 3. it is undefined behavior to try locking an already locked RID in the same transaction, i.e. the transaction
* is responsible for keeping track of its current locks.
* 3. it is undefined behavior to try locking an already locked RID in the
* same transaction, i.e. the transaction is responsible for keeping track of
* its current locks.
*/

/**
Expand All @@ -92,52 +89,26 @@ class LockManager {
/**
* Upgrade a lock from a shared lock to an exclusive lock.
* @param txn the transaction requesting the lock upgrade
* @param rid the RID that should already be locked in shared mode by the requesting transaction
* @param rid the RID that should already be locked in shared mode by the
* requesting transaction
* @return true if the upgrade is successful, false otherwise
*/
bool LockUpgrade(Transaction *txn, const RID &rid);

/**
* Release the lock held by the transaction.
* @param txn the transaction releasing the lock, it should actually hold the lock
* @param txn the transaction releasing the lock, it should actually hold the
* lock
* @param rid the RID that is locked by the transaction
* @return true if the unlock is successful, false otherwise
*/
bool Unlock(Transaction *txn, const RID &rid);

/*** Graph API ***/
/**
* Adds edge t1->t2
*/

/** Adds an edge from t1 -> t2. */
void AddEdge(txn_id_t t1, txn_id_t t2);

/** Removes an edge from t1 -> t2. */
void RemoveEdge(txn_id_t t1, txn_id_t t2);

/**
* Checks if the graph has a cycle, returning the newest transaction ID in the cycle if so.
* @param[out] txn_id if the graph has a cycle, will contain the newest transaction ID
* @return false if the graph has no cycle, otherwise stores the newest transaction ID in the cycle to txn_id
*/
bool HasCycle(txn_id_t *txn_id);

/** @return the set of all edges in the graph, used for testing only! */
std::vector<std::pair<txn_id_t, txn_id_t>> GetEdgeList();

/** Runs cycle detection in the background. */
void RunCycleDetection();

private:
std::mutex latch_;
std::atomic<bool> enable_cycle_detection_;
std::thread *cycle_detection_thread_;

/** Lock table for lock requests. */
std::unordered_map<RID, LockRequestQueue> lock_table_;
/** Waits-for graph representation. */
std::unordered_map<txn_id_t, std::vector<txn_id_t>> waits_for_;
};

} // namespace bustub
139 changes: 38 additions & 101 deletions test/concurrency/lock_manager_test.cpp
Expand Up @@ -152,119 +152,56 @@ void UpgradeTest() {
}
TEST(LockManagerTest, DISABLED_UpgradeLockTest) { UpgradeTest(); }

TEST(LockManagerTest, DISABLED_GraphEdgeTest) {
void WoundWaitBasicTest() {
LockManager lock_mgr{};
TransactionManager txn_mgr{&lock_mgr};
const int num_nodes = 100;
const int num_edges = num_nodes / 2;
const int seed = 15445;
std::srand(seed);

// Create txn ids and shuffle
std::vector<txn_id_t> txn_ids;
txn_ids.reserve(num_nodes);
for (int i = 0; i < num_nodes; i++) {
txn_ids.emplace_back(i);
}
EXPECT_EQ(num_nodes, txn_ids.size());
auto rng = std::default_random_engine{};
std::shuffle(std::begin(txn_ids), std::end(txn_ids), rng);
EXPECT_EQ(num_nodes, txn_ids.size());

// Create edges by pairing adjacent txn_ids
std::vector<std::pair<txn_id_t, txn_id_t>> edges;
for (int i = 0; i < num_nodes; i += 2) {
EXPECT_EQ(i / 2, lock_mgr.GetEdgeList().size());
auto t1 = txn_ids[i];
auto t2 = txn_ids[i + 1];
lock_mgr.AddEdge(t1, t2);
edges.emplace_back(t1, t2);
EXPECT_EQ((i / 2) + 1, lock_mgr.GetEdgeList().size());
}
RID rid{0, 0};

auto lock_mgr_edges = lock_mgr.GetEdgeList();
EXPECT_EQ(num_edges, lock_mgr_edges.size());
EXPECT_EQ(num_edges, edges.size());
int id_hold = 0;
int id_die = 1;

std::sort(lock_mgr_edges.begin(), lock_mgr_edges.end());
std::sort(edges.begin(), edges.end());
std::promise<void> t1done;
std::shared_future<void> t1_future(t1done.get_future());

for (int i = 0; i < num_edges; i++) {
EXPECT_EQ(edges[i], lock_mgr_edges[i]);
}
}
auto wait_die_task = [&]() {
// younger transaction acquires lock first
Transaction txn_die(id_die);
txn_mgr.Begin(&txn_die);
bool res = lock_mgr.LockExclusive(&txn_die, rid);
EXPECT_TRUE(res);

TEST(LockManagerTest, DISABLED_BasicCycleTest) {
LockManager lock_mgr{}; /* Use Deadlock detection */
TransactionManager txn_mgr{&lock_mgr};
CheckGrowing(&txn_die);
CheckTxnLockSize(&txn_die, 0, 1);

/*** Create 0->1->0 cycle ***/
lock_mgr.AddEdge(0, 1);
lock_mgr.AddEdge(1, 0);
EXPECT_EQ(2, lock_mgr.GetEdgeList().size());
t1done.set_value();

txn_id_t txn;
EXPECT_EQ(true, lock_mgr.HasCycle(&txn));
EXPECT_EQ(1, txn);
// wait for txn 0 to call lock_exclusive(), which should wound us
std::this_thread::sleep_for(std::chrono::milliseconds(300));

lock_mgr.RemoveEdge(1, 0);
EXPECT_EQ(false, lock_mgr.HasCycle(&txn));
}
CheckAborted(&txn_die);

TEST(LockManagerTest, DISABLED_BasicDeadlockDetectionTest) {
LockManager lock_mgr{};
cycle_detection_interval = std::chrono::milliseconds(500);
TransactionManager txn_mgr{&lock_mgr};
RID rid0{0, 0};
RID rid1{1, 1};
auto *txn0 = txn_mgr.Begin();
auto *txn1 = txn_mgr.Begin();
EXPECT_EQ(0, txn0->GetTransactionId());
EXPECT_EQ(1, txn1->GetTransactionId());

std::thread t0([&] {
// Lock and sleep
bool res = lock_mgr.LockExclusive(txn0, rid0);
EXPECT_EQ(true, res);
EXPECT_EQ(TransactionState::GROWING, txn0->GetState());
std::this_thread::sleep_for(std::chrono::milliseconds(100));

// This will block
lock_mgr.LockExclusive(txn0, rid1);

lock_mgr.Unlock(txn0, rid0);
lock_mgr.Unlock(txn0, rid1);

txn_mgr.Commit(txn0);
EXPECT_EQ(TransactionState::COMMITTED, txn0->GetState());
});

std::thread t1([&] {
// Sleep so T0 can take necessary locks
std::this_thread::sleep_for(std::chrono::milliseconds(50));
bool res = lock_mgr.LockExclusive(txn1, rid1);
EXPECT_EQ(res, true);
EXPECT_EQ(TransactionState::GROWING, txn1->GetState());

// This will block
try {
res = lock_mgr.LockExclusive(txn1, rid0);
EXPECT_EQ(TransactionState::ABORTED, txn1->GetState());
txn_mgr.Abort(txn1);
} catch (TransactionAbortException &e) {
// std::cout << e.GetInfo() << std::endl;
EXPECT_EQ(TransactionState::ABORTED, txn1->GetState());
txn_mgr.Abort(txn1);
}
});
// unlock
txn_mgr.Abort(&txn_die);
};

Transaction txn_hold(id_hold);
txn_mgr.Begin(&txn_hold);

// Sleep for enough time to break cycle
std::this_thread::sleep_for(cycle_detection_interval * 2);
// launch the waiter thread
std::thread wait_thread{wait_die_task};

t0.join();
t1.join();
// wait for txn1 to lock
t1_future.wait();

delete txn0;
delete txn1;
bool res = lock_mgr.LockExclusive(&txn_hold, rid);
EXPECT_TRUE(res);

wait_thread.join();

CheckGrowing(&txn_hold);
txn_mgr.Commit(&txn_hold);
CheckCommitted(&txn_hold);
}
TEST(LockManagerTest, DISABLED_WoundWaitBasicTest) { WoundWaitBasicTest(); }

} // namespace bustub
24 changes: 12 additions & 12 deletions test/concurrency/transaction_test.cpp
Expand Up @@ -185,9 +185,9 @@ TEST_F(TransactionTest, DISABLED_SimpleInsertRollbackTest) {
auto txn2 = GetTxnManager()->Begin();
auto exec_ctx2 = std::make_unique<ExecutorContext>(txn2, GetCatalog(), GetBPM(), GetTxnManager(), GetLockManager());
auto &schema = table_info->schema_;
auto col_a = MakeColumnValueExpression(schema, 0, "col_a");
auto col_b = MakeColumnValueExpression(schema, 0, "col_b");
auto out_schema = MakeOutputSchema({{"col_a", col_a}, {"col_b", col_b}});
auto col_a = MakeColumnValueExpression(schema, 0, "colA");
auto col_b = MakeColumnValueExpression(schema, 0, "colB");
auto out_schema = MakeOutputSchema({{"colA", col_a}, {"colB", col_b}});
SeqScanPlanNode scan_plan{out_schema, nullptr, table_info->oid_};

std::vector<Tuple> result_set;
Expand Down Expand Up @@ -223,9 +223,9 @@ TEST_F(TransactionTest, DISABLED_DirtyReadsTest) {
auto txn2 = GetTxnManager()->Begin(nullptr, IsolationLevel::READ_UNCOMMITTED);
auto exec_ctx2 = std::make_unique<ExecutorContext>(txn2, GetCatalog(), GetBPM(), GetTxnManager(), GetLockManager());
auto &schema = table_info->schema_;
auto col_a = MakeColumnValueExpression(schema, 0, "col_a");
auto col_b = MakeColumnValueExpression(schema, 0, "col_b");
auto out_schema = MakeOutputSchema({{"col_a", col_a}, {"col_b", col_b}});
auto col_a = MakeColumnValueExpression(schema, 0, "colA");
auto col_b = MakeColumnValueExpression(schema, 0, "colB");
auto out_schema = MakeOutputSchema({{"colA", col_a}, {"colB", col_b}});
SeqScanPlanNode scan_plan{out_schema, nullptr, table_info->oid_};

std::vector<Tuple> result_set;
Expand All @@ -235,16 +235,16 @@ TEST_F(TransactionTest, DISABLED_DirtyReadsTest) {
delete txn1;

// First value
ASSERT_EQ(result_set[0].GetValue(out_schema, out_schema->GetColIdx("col_a")).GetAs<int32_t>(), 200);
ASSERT_EQ(result_set[0].GetValue(out_schema, out_schema->GetColIdx("col_b")).GetAs<int32_t>(), 20);
ASSERT_EQ(result_set[0].GetValue(out_schema, out_schema->GetColIdx("colA")).GetAs<int32_t>(), 200);
ASSERT_EQ(result_set[0].GetValue(out_schema, out_schema->GetColIdx("colB")).GetAs<int32_t>(), 20);

// Second value
ASSERT_EQ(result_set[1].GetValue(out_schema, out_schema->GetColIdx("col_a")).GetAs<int32_t>(), 201);
ASSERT_EQ(result_set[1].GetValue(out_schema, out_schema->GetColIdx("col_b")).GetAs<int32_t>(), 21);
ASSERT_EQ(result_set[1].GetValue(out_schema, out_schema->GetColIdx("colA")).GetAs<int32_t>(), 201);
ASSERT_EQ(result_set[1].GetValue(out_schema, out_schema->GetColIdx("colB")).GetAs<int32_t>(), 21);

// Third value
ASSERT_EQ(result_set[2].GetValue(out_schema, out_schema->GetColIdx("col_a")).GetAs<int32_t>(), 202);
ASSERT_EQ(result_set[2].GetValue(out_schema, out_schema->GetColIdx("col_b")).GetAs<int32_t>(), 22);
ASSERT_EQ(result_set[2].GetValue(out_schema, out_schema->GetColIdx("colA")).GetAs<int32_t>(), 202);
ASSERT_EQ(result_set[2].GetValue(out_schema, out_schema->GetColIdx("colB")).GetAs<int32_t>(), 22);

// Size
ASSERT_EQ(result_set.size(), 3);
Expand Down

0 comments on commit a865ed8

Please sign in to comment.