Skip to content

Commit

Permalink
Sync lmdb.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marco van Wieringen committed Apr 28, 2015
1 parent f4fcead commit ffc2e2e
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 47 deletions.
12 changes: 7 additions & 5 deletions src/lmdb/lmdb.h
Expand Up @@ -1043,14 +1043,16 @@ int mdb_txn_renew(MDB_txn *txn);
* The database handle may be discarded by calling #mdb_dbi_close().
* The old database handle is returned if the database was already open.
* The handle may only be closed once.
*
* The database handle will be private to the current transaction until
* the transaction is successfully committed. If the transaction is
* aborted the handle will be closed automatically.
* After a successful commit the
* handle will reside in the shared environment, and may be used
* by other transactions. This function must not be called from
* multiple concurrent transactions in the same process. A transaction
* that uses this function must finish (either commit or abort) before
* After a successful commit the handle will reside in the shared
* environment, and may be used by other transactions.
*
* This function must not be called from multiple concurrent
* transactions in the same process. A transaction that uses
* this function must finish (either commit or abort) before
* any other transaction in the process may use this function.
*
* To use named databases (with name != NULL), #mdb_env_set_maxdbs()
Expand Down
116 changes: 74 additions & 42 deletions src/lmdb/mdb.c
Expand Up @@ -1086,12 +1086,16 @@ struct MDB_txn {
* @ingroup internal
* @{
*/
#define MDB_TXN_RDONLY 0x01 /**< read-only transaction */
/** #mdb_txn_begin() flags */
#define MDB_TXN_BEGIN_FLAGS (MDB_NOMETASYNC|MDB_NOSYNC|MDB_RDONLY)
#define MDB_TXN_NOMETASYNC MDB_NOMETASYNC /**< don't sync meta for this txn on commit */
#define MDB_TXN_NOSYNC MDB_NOSYNC /**< don't sync this txn on commit */
#define MDB_TXN_RDONLY MDB_RDONLY /**< read-only transaction */
/* internal txn flags */
#define MDB_TXN_WRITEMAP MDB_WRITEMAP /**< copy of #MDB_env flag in writers */
#define MDB_TXN_ERROR 0x02 /**< txn is unusable after an error */
#define MDB_TXN_DIRTY 0x04 /**< must write, even if dirty list is empty */
#define MDB_TXN_SPILLS 0x08 /**< txn or a parent has spilled pages */
#define MDB_TXN_NOSYNC 0x10 /**< don't sync this txn on commit */
#define MDB_TXN_NOMETASYNC 0x20 /**< don't sync meta for this txn on commit */
/** @} */
unsigned int mt_flags; /**< @ref mdb_txn */
/** #dirty_list room: Array size - \#dirty pages visible to this txn.
Expand Down Expand Up @@ -2002,7 +2006,7 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp)
MDB_ID2 mid;
int rc, (*insert)(MDB_ID2L, MDB_ID2 *);

if (txn->mt_env->me_flags & MDB_WRITEMAP) {
if (txn->mt_flags & MDB_TXN_WRITEMAP) {
insert = mdb_mid2l_append;
} else {
insert = mdb_mid2l_insert;
Expand Down Expand Up @@ -2576,6 +2580,7 @@ mdb_txn_renew0(MDB_txn *txn)
int rc, new_notls = 0;

if (txn->mt_flags & MDB_TXN_RDONLY) {
txn->mt_flags &= MDB_TXN_BEGIN_FLAGS;
/* Setup db info */
txn->mt_numdbs = env->me_numdbs;
txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */
Expand Down Expand Up @@ -2641,6 +2646,7 @@ mdb_txn_renew0(MDB_txn *txn)
meta = env->me_metas[txn->mt_txnid & 1];
}
} else {
/* Not yet touching txn == env->me_txn0, it may be active */
if (ti) {
if (LOCK_MUTEX(rc, env, MDB_MUTEX(env, w)))
return rc;
Expand Down Expand Up @@ -2723,33 +2729,36 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
{
MDB_txn *txn;
MDB_ntxn *ntxn;
int rc, size, tsize = sizeof(MDB_txn);
int rc, size, tsize;

flags &= MDB_TXN_BEGIN_FLAGS;
flags |= env->me_flags & MDB_WRITEMAP;

if (env->me_flags & MDB_FATAL_ERROR) {
DPUTS("environment had fatal error, must shutdown!");
return MDB_PANIC;
}
if ((env->me_flags & MDB_RDONLY) && !(flags & MDB_RDONLY))
if (env->me_flags & MDB_RDONLY & ~flags) /* write txn in RDONLY env */
return EACCES;

size = tsize = sizeof(MDB_txn);
if (parent) {
/* Nested transactions: Max 1 child, write txns only, no writemap */
flags |= parent->mt_flags;
if (parent->mt_child ||
(flags & MDB_RDONLY) ||
(parent->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) ||
(env->me_flags & MDB_WRITEMAP))
(flags & (MDB_RDONLY|MDB_WRITEMAP|MDB_TXN_ERROR)))
{
return (parent->mt_flags & MDB_TXN_RDONLY) ? EINVAL : MDB_BAD_TXN;
}
tsize = sizeof(MDB_ntxn);
}
size = tsize;
if (!(flags & MDB_RDONLY)) {
if (!parent) {
txn = env->me_txn0; /* just reuse preallocated write txn */
goto ok;
}
/* child txns use own copy of cursors */
/* Child txns save MDB_pgstate and use own copy of cursors */
size = tsize = sizeof(MDB_ntxn);
size += env->me_maxdbs * sizeof(MDB_cursor *);
} else if (!(flags & MDB_RDONLY)) {
/* Reuse preallocated write txn. However, do not touch it until
* mdb_txn_renew0() succeeds, since it currently may be active.
*/
txn = env->me_txn0;
goto renew;
}
size += env->me_maxdbs * (sizeof(MDB_db)+1);

Expand All @@ -2759,14 +2768,9 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
}
txn->mt_dbs = (MDB_db *) ((char *)txn + tsize);
if (flags & MDB_RDONLY) {
txn->mt_flags |= MDB_TXN_RDONLY;
txn->mt_dbflags = (unsigned char *)(txn->mt_dbs + env->me_maxdbs);
txn->mt_dbiseqs = env->me_dbiseqs;
} else {
if (flags & MDB_NOSYNC)
txn->mt_flags |= MDB_TXN_NOSYNC;
if (flags & MDB_NOMETASYNC)
txn->mt_flags |= MDB_TXN_NOMETASYNC;
txn->mt_cursors = (MDB_cursor **)(txn->mt_dbs + env->me_maxdbs);
if (parent) {
txn->mt_dbiseqs = parent->mt_dbiseqs;
Expand All @@ -2776,9 +2780,9 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
txn->mt_dbflags = (unsigned char *)(txn->mt_dbiseqs + env->me_maxdbs);
}
}
txn->mt_flags = flags;
txn->mt_env = env;

ok:
if (parent) {
unsigned int i;
txn->mt_u.dirty_list = malloc(sizeof(MDB_ID2)*MDB_IDL_UM_SIZE);
Expand All @@ -2797,7 +2801,6 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
parent->mt_child = txn;
txn->mt_parent = parent;
txn->mt_numdbs = parent->mt_numdbs;
txn->mt_flags = parent->mt_flags;
txn->mt_dbxs = parent->mt_dbxs;
memcpy(txn->mt_dbs, parent->mt_dbs, txn->mt_numdbs * sizeof(MDB_db));
/* Copy parent's mt_dbflags, but clear DB_NEW */
Expand All @@ -2819,15 +2822,17 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
if (rc)
mdb_txn_reset0(txn, "beginchild-fail");
} else {
renew:
rc = mdb_txn_renew0(txn);
}
if (rc) {
if (txn != env->me_txn0)
free(txn);
} else {
txn->mt_flags |= flags; /* for txn==me_txn0, no effect otherwise */
*ret = txn;
DPRINTF(("begin txn %"Z"u%c %p on mdbenv %p, root page %"Z"u",
txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w',
txn->mt_txnid, (flags & MDB_RDONLY) ? 'r' : 'w',
(void *) txn, (void *) env, txn->mt_dbs[MAIN_DBI].md_root));
}

Expand Down Expand Up @@ -3237,15 +3242,19 @@ mdb_page_flush(MDB_txn *txn, int keep)
/* Write up to MDB_COMMIT_PAGES dirty pages at a time. */
if (pos!=next_pos || n==MDB_COMMIT_PAGES || wsize+size>MAX_WRITE) {
if (n) {
retry_write:
/* Write previous page(s) */
#ifdef MDB_USE_PWRITEV
wres = pwritev(env->me_fd, iov, n, wpos);
#else
if (n == 1) {
wres = pwrite(env->me_fd, iov[0].iov_base, wsize, wpos);
} else {
retry_seek:
if (lseek(env->me_fd, wpos, SEEK_SET) == -1) {
rc = ErrCode();
if (rc == EINTR)
goto retry_seek;
DPRINTF(("lseek: %s", strerror(rc)));
return rc;
}
Expand All @@ -3255,6 +3264,8 @@ mdb_page_flush(MDB_txn *txn, int keep)
if (wres != wsize) {
if (wres < 0) {
rc = ErrCode();
if (rc == EINTR)
goto retry_write;
DPRINTF(("Write error: %s", strerror(rc)));
} else {
rc = EIO; /* TODO: Use which error code? */
Expand Down Expand Up @@ -3628,7 +3639,8 @@ mdb_env_init_meta(MDB_env *env, MDB_meta *meta)
int len;
#define DO_PWRITE(rc, fd, ptr, size, len, pos) do { \
len = pwrite(fd, ptr, size, pos); \
rc = (len >= 0); } while(0)
if (len == -1 && ErrCode() == EINTR) continue; \
rc = (len >= 0); break; } while(1)
#endif

DPUTS("writing new meta page");
Expand Down Expand Up @@ -3665,6 +3677,7 @@ mdb_env_write_meta(MDB_txn *txn)
{
MDB_env *env;
MDB_meta meta, metab, *mp;
unsigned flags;
size_t mapsize;
off_t off;
int rc, len, toggle;
Expand All @@ -3681,13 +3694,14 @@ mdb_env_write_meta(MDB_txn *txn)
toggle, txn->mt_dbs[MAIN_DBI].md_root));

env = txn->mt_env;
flags = txn->mt_flags & env->me_flags;
mp = env->me_metas[toggle];
mapsize = env->me_metas[toggle ^ 1]->mm_mapsize;
/* Persist any increases of mapsize config */
if (mapsize < env->me_mapsize)
mapsize = env->me_mapsize;

if (env->me_flags & MDB_WRITEMAP) {
if (flags & MDB_WRITEMAP) {
mp->mm_mapsize = mapsize;
mp->mm_dbs[0] = txn->mt_dbs[0];
mp->mm_dbs[1] = txn->mt_dbs[1];
Expand All @@ -3701,9 +3715,7 @@ mdb_env_write_meta(MDB_txn *txn)
#endif
#endif
mp->mm_txnid = txn->mt_txnid;
if (txn->mt_flags & (MDB_TXN_NOSYNC|MDB_TXN_NOMETASYNC))
goto done;
if (!(env->me_flags & (MDB_NOMETASYNC|MDB_NOSYNC))) {
if (!(flags & (MDB_NOMETASYNC|MDB_NOSYNC))) {
unsigned meta_size = env->me_psize;
rc = (env->me_flags & MDB_MAPASYNC) ? MS_ASYNC : MS_SYNC;
ptr = env->me_map;
Expand Down Expand Up @@ -3739,9 +3751,8 @@ mdb_env_write_meta(MDB_txn *txn)
off += PAGEHDRSZ;

/* Write to the SYNC fd */
mfd = ((env->me_flags & (MDB_NOSYNC|MDB_NOMETASYNC)) ||
(txn->mt_flags & (MDB_TXN_NOSYNC|MDB_TXN_NOMETASYNC))) ?
env->me_fd : env->me_mfd;
mfd = (flags & (MDB_NOSYNC|MDB_NOMETASYNC)) ? env->me_fd : env->me_mfd;
retry_write:
#ifdef _WIN32
{
memset(&ov, 0, sizeof(ov));
Expand All @@ -3754,6 +3765,8 @@ mdb_env_write_meta(MDB_txn *txn)
#endif
if (rc != len) {
rc = rc < 0 ? ErrCode() : EIO;
if (rc == EINTR)
goto retry_write;
DPUTS("write failed, disk error?");
/* On a failure, the pagecache still contains the new data.
* Write some old data back, to prevent it from being used.
Expand Down Expand Up @@ -4741,15 +4754,13 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode
if (rc)
goto leave;
}
if (!((flags & MDB_RDONLY) ||
(env->me_pbuf = calloc(1, env->me_psize))))
rc = ENOMEM;
if (!(flags & MDB_RDONLY)) {
MDB_txn *txn;
int tsize = sizeof(MDB_txn), size = tsize + env->me_maxdbs *
(sizeof(MDB_db)+sizeof(MDB_cursor *)+sizeof(unsigned int)+1);
txn = calloc(1, size);
if (txn) {
if ((env->me_pbuf = calloc(1, env->me_psize)) &&
(txn = calloc(1, size)))
{
txn->mt_dbs = (MDB_db *)((char *)txn + tsize);
txn->mt_cursors = (MDB_cursor **)(txn->mt_dbs + env->me_maxdbs);
txn->mt_dbiseqs = (unsigned int *)(txn->mt_cursors + env->me_maxdbs);
Expand Down Expand Up @@ -5142,7 +5153,7 @@ mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl)
MDB_page *p = NULL;
int level;

if (!((txn->mt_flags & MDB_TXN_RDONLY) | (env->me_flags & MDB_WRITEMAP))) {
if (! (txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_WRITEMAP))) {
MDB_txn *tx2 = txn;
level = 1;
do {
Expand Down Expand Up @@ -7846,12 +7857,12 @@ mdb_rebalance(MDB_cursor *mc)
m3 = m2;
if (m3 == mc || m3->mc_snum < mc->mc_snum) continue;
if (m3->mc_pg[0] == mp) {
m3->mc_snum--;
m3->mc_top--;
for (i=0; i<m3->mc_snum; i++) {
m3->mc_pg[i] = m3->mc_pg[i+1];
m3->mc_ki[i] = m3->mc_ki[i+1];
}
m3->mc_snum--;
m3->mc_top--;
}
}
}
Expand Down Expand Up @@ -7919,9 +7930,23 @@ mdb_rebalance(MDB_cursor *mc)
if (mc->mc_ki[ptop] == 0) {
rc = mdb_page_merge(&mn, mc);
} else {
MDB_cursor dummy;
oldki += NUMKEYS(mn.mc_pg[mn.mc_top]);
mn.mc_ki[mn.mc_top] += mc->mc_ki[mn.mc_top] + 1;
/* We want mdb_rebalance to find mn when doing fixups */
if (mc->mc_flags & C_SUB) {
dummy.mc_next = mc->mc_txn->mt_cursors[mc->mc_dbi];
mc->mc_txn->mt_cursors[mc->mc_dbi] = &dummy;
dummy.mc_xcursor = (MDB_xcursor *)&mn;
} else {
mn.mc_next = mc->mc_txn->mt_cursors[mc->mc_dbi];
mc->mc_txn->mt_cursors[mc->mc_dbi] = &mn;
}
rc = mdb_page_merge(mc, &mn);
if (mc->mc_flags & C_SUB)
mc->mc_txn->mt_cursors[mc->mc_dbi] = dummy.mc_next;
else
mc->mc_txn->mt_cursors[mc->mc_dbi] = mn.mc_next;
mdb_cursor_copy(&mn, mc);
}
mc->mc_flags &= ~C_EOF;
Expand All @@ -7948,6 +7973,13 @@ mdb_cursor_del0(MDB_cursor *mc)
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;

/* DB is totally empty now, just bail out.
* Other cursors adjustments were already done
* by mdb_rebalance and aren't needed here.
*/
if (!mc->mc_snum)
return rc;

mp = mc->mc_pg[mc->mc_top];
nkeys = NUMKEYS(mp);

Expand Down

0 comments on commit ffc2e2e

Please sign in to comment.