/
db_lmdb.cpp
113 lines (98 loc) · 3.08 KB
/
db_lmdb.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#ifdef USE_LMDB
#include "caffe/util/db_lmdb.hpp"
#include <sys/stat.h>
#include <string>
namespace caffe { namespace db {
void LMDB::Open(const string& source, Mode mode) {
MDB_CHECK(mdb_env_create(&mdb_env_));
if (mode == NEW) {
CHECK_EQ(mkdir(source.c_str(), 0744), 0) << "mkdir " << source << " failed";
}
int flags = 0;
if (mode == READ) {
flags = MDB_RDONLY | MDB_NOTLS;
}
int rc = mdb_env_open(mdb_env_, source.c_str(), flags, 0664);
#ifndef ALLOW_LMDB_NOLOCK
MDB_CHECK(rc);
#else
if (rc == EACCES) {
LOG(WARNING) << "Permission denied. Trying with MDB_NOLOCK ...";
// Close and re-open environment handle
mdb_env_close(mdb_env_);
MDB_CHECK(mdb_env_create(&mdb_env_));
// Try again with MDB_NOLOCK
flags |= MDB_NOLOCK;
MDB_CHECK(mdb_env_open(mdb_env_, source.c_str(), flags, 0664));
} else {
MDB_CHECK(rc);
}
#endif
LOG_IF(INFO, Caffe::root_solver()) << "Opened lmdb " << source;
}
LMDBCursor* LMDB::NewCursor() {
MDB_txn* mdb_txn;
MDB_cursor* mdb_cursor;
MDB_CHECK(mdb_txn_begin(mdb_env_, NULL, MDB_RDONLY, &mdb_txn));
MDB_CHECK(mdb_dbi_open(mdb_txn, NULL, 0, &mdb_dbi_));
MDB_CHECK(mdb_cursor_open(mdb_txn, mdb_dbi_, &mdb_cursor));
return new LMDBCursor(mdb_txn, mdb_cursor);
}
LMDBTransaction* LMDB::NewTransaction() {
return new LMDBTransaction(mdb_env_);
}
void LMDBTransaction::Put(const string& key, const string& value) {
keys.push_back(key);
values.push_back(value);
}
void LMDBTransaction::Commit() {
MDB_dbi mdb_dbi;
MDB_val mdb_key, mdb_data;
MDB_txn *mdb_txn;
// Initialize MDB variables
MDB_CHECK(mdb_txn_begin(mdb_env_, NULL, 0, &mdb_txn));
MDB_CHECK(mdb_dbi_open(mdb_txn, NULL, 0, &mdb_dbi));
for (int i = 0; i < keys.size(); i++) {
mdb_key.mv_size = keys[i].size();
mdb_key.mv_data = const_cast<char*>(keys[i].data());
mdb_data.mv_size = values[i].size();
mdb_data.mv_data = const_cast<char*>(values[i].data());
// Add data to the transaction
int put_rc = mdb_put(mdb_txn, mdb_dbi, &mdb_key, &mdb_data, 0);
if (put_rc == MDB_MAP_FULL) {
// Out of memory - double the map size and retry
mdb_txn_abort(mdb_txn);
mdb_dbi_close(mdb_env_, mdb_dbi);
DoubleMapSize();
Commit();
return;
}
// May have failed for some other reason
MDB_CHECK(put_rc);
}
// Commit the transaction
int commit_rc = mdb_txn_commit(mdb_txn);
if (commit_rc == MDB_MAP_FULL) {
// Out of memory - double the map size and retry
mdb_dbi_close(mdb_env_, mdb_dbi);
DoubleMapSize();
Commit();
return;
}
// May have failed for some other reason
MDB_CHECK(commit_rc);
// Cleanup after successful commit
mdb_dbi_close(mdb_env_, mdb_dbi);
keys.clear();
values.clear();
}
void LMDBTransaction::DoubleMapSize() {
struct MDB_envinfo current_info;
MDB_CHECK(mdb_env_info(mdb_env_, ¤t_info));
size_t new_size = current_info.me_mapsize * 2;
DLOG(INFO) << "Doubling LMDB map size to " << (new_size>>20) << "MB ...";
MDB_CHECK(mdb_env_set_mapsize(mdb_env_, new_size));
}
} // namespace db
} // namespace caffe
#endif // USE_LMDB