Skip to content

Commit

Permalink
Merge pull request #6 from horazont/feature/nested-transactions
Browse files Browse the repository at this point in the history
Implement support for nesting transactions
  • Loading branch information
ahupowerdns authored Nov 20, 2019
2 parents c0cc016 + 7ce9a82 commit 469de3c
Show file tree
Hide file tree
Showing 13 changed files with 681 additions and 304 deletions.
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,16 @@ transaction is aborted automatically. To commit or abort, use `commit()` or
`abort()`, after which going out of scope has no further effect.

```
txn.put(dbi, "lmdb", "great");
txn->put(dbi, "lmdb", "great");
string_view data;
if(!txn.get(dbi, "lmdb", data)) {
if(!txn->get(dbi, "lmdb", data)) {
cout<< "Within RW transaction, found that lmdb = " << data <<endl;
}
else
cout<<"Found nothing" << endl;
txn.commit();
txn->commit();
```

LMDB is so fast because it does not copy data unless it really needs to.
Expand Down Expand Up @@ -129,8 +129,8 @@ For example, to store `double` values for 64 bit IDs:
auto txn = env->getRWTransaction();
uint64_t id=12345678901;
double score=3.14159;
txn.put(dbi, id, score);
txn.commit();
txn->put(dbi, id, score);
txn->commit();
```

Behind the scenes, the `id` and `score` values are wrapped by `MDBInVal`
Expand All @@ -142,7 +142,7 @@ works similary:
uint64_t id=12345678901;
MDBOutValue val;
txn.get(dbi, id, val);
txn->get(dbi, id, val);
cout << "Score: " << val.get<double>() << "\n";
```
Expand Down Expand Up @@ -170,10 +170,10 @@ struct Coordinate
C c{12.0, 13.0};
txn.put(dbi, MDBInVal::fromStruct(c), 12.0);
txn->put(dbi, MDBInVal::fromStruct(c), 12.0);
MDBOutVal res;
txn.get(dbi, MDBInVal::fromStruct(c), res);
txn->get(dbi, MDBInVal::fromStruct(c), res);
auto c1 = res.get_struct<Coordinate>();
```
Expand All @@ -193,7 +193,7 @@ calls to mdb.
This is the usual opening sequence.

```
auto cursor=txn.getCursor(dbi);
auto cursor=txn->getCursor(dbi);
MDBOutVal key, data;
int count=0;
cout<<"Counting records.. "; cout.flush();
Expand All @@ -212,7 +212,7 @@ records in under a second (!).

```
cout<<"Clearing records.. "; cout.flush();
mdb_drop(txn, dbi, 0); // clear records
mdb_drop(*txn, dbi, 0); // clear records
cout<<"Done!"<<endl;
```

Expand All @@ -224,11 +224,11 @@ native `mdb_drop` function which we did not wrap. This is possible because
```
cout << "Adding "<<limit<<" values .. "; cout.flush();
for(unsigned int n = 0 ; n < limit; ++n) {
txn.put(dbi, n, n);
txn->put(dbi, n, n);
}
cout <<"Done!"<<endl;
cout <<"Calling commit.. "; cout.flush();
txn.commit();
txn->commit();
cout<<"Done!"<<endl;
```

Expand Down
14 changes: 7 additions & 7 deletions basic-example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ void checkLMDB(MDBEnv* env, MDBDbi dbi)
{
auto rotxn = env->getROTransaction();
MDBOutVal data;
if(!rotxn.get(dbi, "lmdb", data)) {
if(!rotxn->get(dbi, "lmdb", data)) {
cout<< "Outside RW transaction, found that lmdb = " << data.get<string_view>() <<endl;
}
else
Expand All @@ -18,11 +18,11 @@ int main()
auto dbi = env->openDB("example", MDB_CREATE);

auto txn = env->getRWTransaction();
mdb_drop(txn, dbi, 0);
txn.put(dbi, "lmdb", "great");
mdb_drop(*txn, dbi, 0);
txn->put(dbi, "lmdb", "great");

MDBOutVal data;
if(!txn.get(dbi, "lmdb", data)) {
if(!txn->get(dbi, "lmdb", data)) {
cout<< "Within RW transaction, found that lmdb = " << data.get<string_view>() <<endl;
}
else
Expand All @@ -31,12 +31,12 @@ int main()
std::thread elsewhere(checkLMDB, env.get(), dbi);
elsewhere.join();

txn.commit();
txn->commit();

cout<<"Committed data"<<endl;

checkLMDB(env.get(), dbi);
txn = env->getRWTransaction();
mdb_drop(txn, dbi, 0);
txn.commit();
mdb_drop(*txn, dbi, 0);
txn->commit();
}
188 changes: 162 additions & 26 deletions lmdb-safe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,97 +139,233 @@ MDBDbi MDBEnv::openDB(const string_view dbname, int flags)

if(!(envflags & MDB_RDONLY)) {
auto rwt = getRWTransaction();
MDBDbi ret = rwt.openDB(dbname, flags);
rwt.commit();
MDBDbi ret = rwt->openDB(dbname, flags);
rwt->commit();
return ret;
}

MDBDbi ret;
{
auto rwt = getROTransaction();
ret = rwt.openDB(dbname, flags);
ret = rwt->openDB(dbname, flags);
}
return ret;
}

MDBRWTransaction::MDBRWTransaction(MDBEnv* parent, int flags) : d_parent(parent)
MDBRWTransactionImpl::MDBRWTransactionImpl(MDBEnv *parent, MDB_txn *txn):
MDBROTransactionImpl(parent, txn)

{

}

MDB_txn *MDBRWTransactionImpl::openRWTransaction(MDBEnv *env, MDB_txn *parent, int flags)
{
if(d_parent->getROTX() || d_parent->getRWTX())
MDB_txn *result;
if(env->getROTX() || env->getRWTX())
throw std::runtime_error("Duplicate RW transaction");

for(int tries =0 ; tries < 3; ++tries) { // it might happen twice, who knows
if(int rc=mdb_txn_begin(d_parent->d_env, 0, flags, &d_txn)) {
if(int rc=mdb_txn_begin(env->d_env, parent, flags, &result)) {
if(rc == MDB_MAP_RESIZED && tries < 2) {
// "If the mapsize is increased by another process (..) mdb_txn_begin() will return MDB_MAP_RESIZED.
// call mdb_env_set_mapsize with a size of zero to adopt the new size."
mdb_env_set_mapsize(d_parent->d_env, 0);
mdb_env_set_mapsize(env->d_env, 0);
continue;
}
throw std::runtime_error("Unable to start RW transaction: "+std::string(mdb_strerror(rc)));
}
break;
}
d_parent->incRWTX();
env->incRWTX();
return result;
}

MDBRWTransactionImpl::MDBRWTransactionImpl(MDBEnv* parent, int flags):
MDBRWTransactionImpl(parent, openRWTransaction(parent, nullptr, flags))
{
}

MDBRWTransactionImpl::~MDBRWTransactionImpl()
{
abort();
}

void MDBRWTransactionImpl::commit()
{
closeRORWCursors();
if (!d_txn) {
return;
}

if(int rc = mdb_txn_commit(d_txn)) {
throw std::runtime_error("committing: " + std::string(mdb_strerror(rc)));
}
environment().decRWTX();
d_txn = nullptr;
}

void MDBRWTransactionImpl::abort()
{
closeRORWCursors();
if (!d_txn) {
return;
}

mdb_txn_abort(d_txn);
// prevent the RO destructor from cleaning up the transaction itself
environment().decRWTX();
d_txn = nullptr;
}

MDBROTransactionImpl::MDBROTransactionImpl(MDBEnv *parent, MDB_txn *txn):
d_parent(parent),
d_cursors(),
d_txn(txn)
{

}

MDBROTransaction::MDBROTransaction(MDBEnv* parent, int flags) : d_parent(parent)
MDB_txn *MDBROTransactionImpl::openROTransaction(MDBEnv *env, MDB_txn *parent, int flags)
{
if(d_parent->getRWTX())
if(env->getRWTX())
throw std::runtime_error("Duplicate RO transaction");

/*
A transaction and its cursors must only be used by a single thread, and a thread may only have a single transaction at a time. If MDB_NOTLS is in use, this does not apply to read-only transactions. */

MDB_txn *result = nullptr;
for(int tries =0 ; tries < 3; ++tries) { // it might happen twice, who knows
if(int rc=mdb_txn_begin(d_parent->d_env, 0, MDB_RDONLY | flags, &d_txn)) {
if(int rc=mdb_txn_begin(env->d_env, parent, MDB_RDONLY | flags, &result)) {
if(rc == MDB_MAP_RESIZED && tries < 2) {
// "If the mapsize is increased by another process (..) mdb_txn_begin() will return MDB_MAP_RESIZED.
// call mdb_env_set_mapsize with a size of zero to adopt the new size."
mdb_env_set_mapsize(d_parent->d_env, 0);
mdb_env_set_mapsize(env->d_env, 0);
continue;
}

throw std::runtime_error("Unable to start RO transaction: "+string(mdb_strerror(rc)));
}
break;
}
d_parent->incROTX();
env->incROTX();

return result;
}

void MDBROTransactionImpl::closeROCursors()
{
// we need to move the vector away to ensure that the cursors don’t mess with our iteration.
std::vector<MDBROCursor*> buf;
std::swap(d_cursors, buf);
for (auto &cursor: buf) {
cursor->close();
}
}

MDBROTransactionImpl::MDBROTransactionImpl(MDBEnv *parent, int flags):
MDBROTransactionImpl(parent, openROTransaction(parent, nullptr, flags))
{

void MDBRWTransaction::clear(MDB_dbi dbi)
}

MDBROTransactionImpl::~MDBROTransactionImpl()
{
// this is safe because C++ will not call overrides of virtual methods in destructors.
commit();
}

void MDBROTransactionImpl::abort()
{
closeROCursors();
// if d_txn is non-nullptr here, either the transaction object was invalidated earlier (e.g. by moving from it), or it is an RW transaction which has already cleaned up the d_txn pointer (with an abort).
if (d_txn) {
d_parent->decROTX();
mdb_txn_abort(d_txn); // this appears to work better than abort for r/o database opening
d_txn = nullptr;
}
}

void MDBROTransactionImpl::commit()
{
closeROCursors();
// if d_txn is non-nullptr here, either the transaction object was invalidated earlier (e.g. by moving from it), or it is an RW transaction which has already cleaned up the d_txn pointer (with an abort).
if (d_txn) {
d_parent->decROTX();
mdb_txn_commit(d_txn); // this appears to work better than abort for r/o database opening
d_txn = nullptr;
}
}



void MDBRWTransactionImpl::clear(MDB_dbi dbi)
{
if(int rc = mdb_drop(d_txn, dbi, 0)) {
throw runtime_error("Error clearing database: " + MDBError(rc));
}
}

MDBRWCursor MDBRWTransaction::getCursor(const MDBDbi& dbi)
MDBRWCursor MDBRWTransactionImpl::getRWCursor(const MDBDbi& dbi)
{
return MDBRWCursor(this, dbi);
MDB_cursor *cursor;
int rc= mdb_cursor_open(d_txn, dbi, &cursor);
if(rc) {
throw std::runtime_error("Error creating RO cursor: "+std::string(mdb_strerror(rc)));
}
return MDBRWCursor(d_rw_cursors, cursor);
}

MDBRWCursor MDBRWTransactionImpl::getCursor(const MDBDbi &dbi)
{
return getRWCursor(dbi);
}

MDBRWTransaction MDBRWTransactionImpl::getRWTransaction()
{
MDB_txn *txn;
if (int rc = mdb_txn_begin(environment(), *this, 0, &txn)) {
throw std::runtime_error(std::string("failed to start child transaction: ")+mdb_strerror(rc));
}
// we need to increase the counter here because commit/abort on the child transaction will decrease it
environment().incRWTX();
return MDBRWTransaction(new MDBRWTransactionImpl(&environment(), txn));
}

MDBROTransaction MDBRWTransactionImpl::getROTransaction()
{
return std::move(getRWTransaction());
}

MDBROTransaction MDBEnv::getROTransaction()
{
return MDBROTransaction(this);
return MDBROTransaction(new MDBROTransactionImpl(this));
}
MDBRWTransaction MDBEnv::getRWTransaction()
{
return MDBRWTransaction(this);
return MDBRWTransaction(new MDBRWTransactionImpl(this));
}


void MDBRWTransaction::closeCursors()
void MDBRWTransactionImpl::closeRWCursors()
{
for(auto& c : d_cursors)
c->close();
d_cursors.clear();
decltype(d_rw_cursors) buf;
std::swap(d_rw_cursors, buf);
for (auto &cursor: buf) {
cursor->close();
}
}

MDBROCursor MDBROTransaction::getCursor(const MDBDbi& dbi)
MDBROCursor MDBROTransactionImpl::getCursor(const MDBDbi& dbi)
{
return MDBROCursor(this, dbi);
return getROCursor(dbi);
}


MDBROCursor MDBROTransactionImpl::getROCursor(const MDBDbi &dbi)
{
MDB_cursor *cursor;
int rc= mdb_cursor_open(d_txn, dbi, &cursor);
if(rc) {
throw std::runtime_error("Error creating RO cursor: "+std::string(mdb_strerror(rc)));
}
return MDBROCursor(d_cursors, cursor);
}
Loading

0 comments on commit 469de3c

Please sign in to comment.