Skip to content

Commit

Permalink
WAL sync performance optimizations (NFSE-5149) (#200) (#201)
Browse files Browse the repository at this point in the history
WAL performance optimizations to support frequent syncs on a slow IO backend

Signed-off-by: Nabeel M Mohamed <50154757+nabeelmmd@users.noreply.github.com>
(cherry picked from commit 4c0668d)
  • Loading branch information
nabeelmmd committed Feb 8, 2022
1 parent 4b58695 commit 049e790
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 71 deletions.
15 changes: 8 additions & 7 deletions lib/include/hse_ikvdb/kvdb_rparams.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ struct kvdb_rparams {
uint64_t csched_leaf_len_params;
uint64_t csched_node_min_ttl;

uint32_t dur_bufsz_mb;
uint32_t dur_intvl_ms;
uint8_t dur_throttle_lo_th;
uint8_t dur_throttle_hi_th;
bool dur_enable;
bool dur_buf_managed;
uint8_t dur_mclass;
uint32_t dur_bufsz_mb;
uint32_t dur_intvl_ms;
uint32_t dur_size_bytes;
bool dur_enable;
bool dur_buf_managed;
uint8_t dur_throttle_lo_th;
uint8_t dur_throttle_hi_th;
uint8_t dur_mclass;

uint64_t throttle_update_ns;
uint throttle_init_policy; /* [HSE_REVISIT]: Make this a fixed width type */
Expand Down
5 changes: 5 additions & 0 deletions lib/include/hse_ikvdb/wal.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
#define HSE_WAL_DUR_MS_DFLT (100)
#define HSE_WAL_DUR_MS_MAX (1000)

/* Durability size threshold for all wal buffers combined */
#define HSE_WAL_DUR_SIZE_BYTES_MIN (32 * 1024u)
#define HSE_WAL_DUR_SIZE_BYTES_DFLT (8 * 1024 * 1024u)
#define HSE_WAL_DUR_SIZE_BYTES_MAX (128 * 1024 * 1024u)

/* Per wal buffer size */
#define HSE_WAL_DUR_BUFSZ_MB_MIN (256ul)
#define HSE_WAL_DUR_BUFSZ_MB_DFLT (4096ul)
Expand Down
21 changes: 21 additions & 0 deletions lib/kvdb/kvdb_rparams.c
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,27 @@ static const struct param_spec pspecs[] = {
},
},
},
{
.ps_name = "durability.size_bytes",
.ps_description = "Maximum amount of application data lost in the event of a crash",
.ps_flags = PARAM_FLAG_EXPERIMENTAL,
.ps_type = PARAM_TYPE_U32,
.ps_offset = offsetof(struct kvdb_rparams, dur_size_bytes),
.ps_size = PARAM_SZ(struct kvdb_rparams, dur_size_bytes),
.ps_convert = param_default_converter,
.ps_validate = param_default_validator,
.ps_stringify = param_default_stringify,
.ps_jsonify = param_default_jsonify,
.ps_default_value = {
.as_uscalar = HSE_WAL_DUR_SIZE_BYTES_DFLT,
},
.ps_bounds = {
.as_uscalar = {
.ps_min = HSE_WAL_DUR_SIZE_BYTES_MIN,
.ps_max = HSE_WAL_DUR_SIZE_BYTES_MAX,
},
},
},
{
.ps_name = "durability.buffer.size",
.ps_description = "durability buffer size in MiB",
Expand Down
138 changes: 109 additions & 29 deletions lib/wal/wal.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ struct wal {
pthread_t timer_tid;
pthread_t sync_notify_tid;
uint32_t dur_ms;
uint32_t dur_bytes;
size_t dur_bufsz;
enum hse_mclass dur_mclass;
uint32_t version;
Expand All @@ -75,7 +76,7 @@ struct wal {
struct wal_sync_waiter {
struct list_head ws_link;
merr_t ws_err;
int ws_bufcnt;
uint32_t ws_bufcnt;
uint64_t ws_offv[WAL_BUF_MAX];
struct cv ws_cv;
};
Expand All @@ -89,6 +90,73 @@ struct wal_sync_waiter {
void
wal_ionotify_cb(void *cbarg, merr_t err);

static HSE_ALWAYS_INLINE void
wal_throttle_sensor_set(struct wal *wal, uint64_t bufsz, uint64_t buflen)
{
const uint64_t hwm = (bufsz * wal->wal_thr_hwm) / 100;
const uint64_t lwm = (bufsz * wal->wal_thr_lwm) / 100;
uint32_t new;

if (ev(!wal->wal_thr_sensor))
return;

ev(buflen >= bufsz);

new = (buflen > lwm) ? (THROTTLE_SENSOR_SCALE * buflen) / hwm : 0;

throttle_sensor_set(wal->wal_thr_sensor, new);
}

/*
* Wait for at least 'pct' of flushed data to become durable.
* This avoids overloading the WAL IO layer with numerous flushes when IOs are
* backed up due to a slow IO backend.
*/
static HSE_ALWAYS_INLINE void
wal_flush_wait(struct wal *wal, struct wal_flush_stats *stats, uint8_t pct)
{
INVARIANT(wal && stats);

uint64_t flushv[WAL_BUF_MAX];
const uint32_t flushc = stats->bufcnt;
assert(flushc <= WAL_BUF_MAX);

pct = clamp_t(uint8_t, pct, 0, 100);

for (int i = 0; i < flushc; i++)
flushv[i] = stats->flush_soff[i] + ((stats->flush_len[i] * pct) / 100);

while (wal_bufset_durcnt(wal->wbs, WAL_BUF_MAX, flushv) < flushc)
usleep(50);
}

static HSE_ALWAYS_INLINE bool
wal_dirty_exceeds_threshold(struct wal *wal, const uint32_t flushc, uint64_t *flushv)
{
INVARIANT(wal && flushc <= WAL_BUF_MAX);

uint64_t curv[WAL_BUF_MAX], tot_bytes = 0;
uint32_t buf_thresh = wal->dur_bytes / flushc;
uint32_t buf_cnt;

buf_cnt = wal_bufset_curoff(wal->wbs, WAL_BUF_MAX, curv);
assert(buf_cnt == flushc);
if (ev(buf_cnt != flushc))
return false;

buf_cnt = 0;
for (int i = 0; i < flushc; i++) {
const uint64_t bytes = (curv[i] - flushv[i]);

if (bytes > 0) {
tot_bytes += bytes;
buf_cnt++;
}
}

return (tot_bytes > 0 && tot_bytes >= (buf_cnt * buf_thresh));
}

static void *
wal_timer(void *rock)
{
Expand All @@ -103,7 +171,7 @@ wal_timer(void *rock)
dur_ns = MSEC_TO_NSEC(wal->dur_ms) - (long)timer_slack;

while (!closing && !atomic_read(&wal->error)) {
uint64_t tstart, rid, lag, sleep_ns, flushb, bufsz, buflen;
uint64_t tstart, rid, lag, sleep_ns;

closing = !!atomic_read(&wal->closing);

Expand All @@ -112,43 +180,53 @@ wal_timer(void *rock)

rid = atomic_read(&wal->wal_rid);
if (rid != rid_last || closing) {
struct wal_flush_stats stats;
rid_last = rid;

err = wal_bufset_flush(wal->wbs, &flushb, &bufsz, &buflen);
err = wal_bufset_flush(wal->wbs, &stats);
if (err) {
atomic_set(&wal->error, err);
wal_ionotify_cb(wal, err); /* Notify sync waiters on flush error */
continue;
}

/* No dirty data, notify any sync waiters */
if (flushb == 0)
wal_ionotify_cb(wal, 0);

if (wal->wal_thr_sensor) {
const uint64_t hwm = (bufsz * wal->wal_thr_hwm) / 100;
const uint64_t lwm = (bufsz * wal->wal_thr_lwm) / 100;
uint32_t new;
if (stats.flush_tlen == 0)
wal_ionotify_cb(wal, 0); /* No dirty data, notify any sync waiters */

ev(buflen >= bufsz);
wal_throttle_sensor_set(wal, stats.bufsz, stats.max_buflen);

new = (buflen > lwm) ? (THROTTLE_SENSOR_SCALE * buflen) / hwm : 0;

throttle_sensor_set(wal->wal_thr_sensor, new);
}
wal_flush_wait(wal, &stats, WAL_FLUSH_WAIT_PCT);

lag = get_time_ns() - tstart;
sleep_ns = (lag >= sleep_ns || closing) ? 0 : sleep_ns - lag;
} else {
/* No mutations, notify any sync waiters */
wal_ionotify_cb(wal, 0);
wal_ionotify_cb(wal, 0); /* No mutations, notify any sync waiters */
}

mutex_lock(&wal->timer_mutex);
if (wal->sync_pending)

if (wal->sync_pending) {
closing = false;
else if (!closing && sleep_ns > 0)
cv_timedwait(&wal->timer_cv, &wal->timer_mutex, NSEC_TO_MSEC(sleep_ns));
} else if (!closing && sleep_ns > 0) {
uint64_t flushv[WAL_BUF_MAX], intvl, tstart = get_time_ns();
uint32_t flushc;

intvl = max_t(uint64_t, sleep_ns / 10, MSEC_TO_NSEC(1));
flushc = wal_bufset_flushoff(wal->wbs, WAL_BUF_MAX, flushv);

while (!atomic_read(&wal->closing)) {
int rc = cv_timedwait(&wal->timer_cv, &wal->timer_mutex, NSEC_TO_MSEC(intvl));
if (rc != ETIMEDOUT)
break;

if (wal->sync_pending || (get_time_ns() - tstart >= sleep_ns))
break;

if (wal_dirty_exceeds_threshold(wal, flushc, flushv))
break;
}
}

wal->sync_pending = false;
mutex_unlock(&wal->timer_mutex);
}
Expand Down Expand Up @@ -178,7 +256,7 @@ wal_sync_notifier(void *rock)

list_for_each_entry(swait, &wal->sync_waiters, ws_link) {
if (err ||
swait->ws_bufcnt <= wal_bufset_durcnt(wal->wbs, swait->ws_bufcnt, swait->ws_offv)) {
swait->ws_bufcnt <= wal_bufset_durcnt(wal->wbs, WAL_BUF_MAX, swait->ws_offv)) {
swait->ws_err = err;
cv_signal(&swait->ws_cv);
}
Expand Down Expand Up @@ -222,7 +300,7 @@ wal_sync_impl(struct wal *wal, struct wal_sync_waiter *swait)
cv_signal(&wal->timer_cv);
mutex_unlock(&wal->timer_mutex);

while (swait->ws_bufcnt > wal_bufset_durcnt(wal->wbs, swait->ws_bufcnt, swait->ws_offv) &&
while (swait->ws_bufcnt > wal_bufset_durcnt(wal->wbs, WAL_BUF_MAX, swait->ws_offv) &&
!swait->ws_err)
cv_timedwait(&swait->ws_cv, &wal->sync_mutex, wal->dur_ms);

Expand All @@ -246,8 +324,6 @@ wal_sync(struct wal *wal)
INIT_LIST_HEAD(&swait.ws_link);

swait.ws_bufcnt = wal_bufset_curoff(wal->wbs, WAL_BUF_MAX, swait.ws_offv);
if (swait.ws_bufcnt < 0)
return merr(EBUG);

return wal_sync_impl(wal, &swait);
}
Expand All @@ -266,8 +342,6 @@ wal_cond_sync(struct wal *wal, uint64_t gen)
INIT_LIST_HEAD(&swait.ws_link);

swait.ws_bufcnt = wal_bufset_genoff(wal->wbs, gen, WAL_BUF_MAX, swait.ws_offv);
if (swait.ws_bufcnt < 0)
return merr(EBUG);

start = get_time_ns();
err = wal_sync_impl(wal, &swait);
Expand Down Expand Up @@ -571,6 +645,7 @@ wal_open(
wal->buf_flags = wal->buf_managed ? HSE_BTF_MANAGED : 0;

wal->dur_ms = HSE_WAL_DUR_MS_DFLT;
wal->dur_bytes = HSE_WAL_DUR_SIZE_BYTES_DFLT;
wal->dur_bufsz = HSE_WAL_DUR_BUFSZ_MB_DFLT << MB_SHIFT;

err = wal_mdc_open(mp, rinfo->mdcid1, rinfo->mdcid2, wal->read_only, &wal->mdc);
Expand Down Expand Up @@ -605,7 +680,11 @@ wal_open(
wal_fileset_version_update(wal->wfset, wal->version);

if (rp->dur_intvl_ms != HSE_WAL_DUR_MS_DFLT)
wal->dur_ms = clamp_t(long, rp->dur_intvl_ms, HSE_WAL_DUR_MS_MIN, HSE_WAL_DUR_MS_MAX);
wal->dur_ms = clamp_t(uint32_t, rp->dur_intvl_ms, HSE_WAL_DUR_MS_MIN, HSE_WAL_DUR_MS_MAX);

if (rp->dur_size_bytes != HSE_WAL_DUR_SIZE_BYTES_DFLT)
wal->dur_bytes = clamp_t(uint32_t, rp->dur_size_bytes,
HSE_WAL_DUR_SIZE_BYTES_MIN, HSE_WAL_DUR_SIZE_BYTES_MAX);

if (rp->dur_bufsz_mb != HSE_WAL_DUR_BUFSZ_MB_DFLT)
wal->dur_bufsz = (size_t)rp->dur_bufsz_mb << MB_SHIFT;
Expand Down Expand Up @@ -654,7 +733,8 @@ wal_open(

wal->wiocb.iocb = wal_ionotify_cb;
wal->wiocb.cbarg = wal;
wal->wbs = wal_bufset_open(wal->wfset, wal->dur_bufsz, &wal->wal_ingestgen, &wal->wiocb);
wal->wbs = wal_bufset_open(wal->wfset, wal->dur_bufsz, wal->dur_bytes,
&wal->wal_ingestgen, &wal->wiocb);
if (!wal->wbs) {
err = merr(ENOMEM);
goto errout;
Expand Down
11 changes: 11 additions & 0 deletions lib/wal/wal.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#define WAL_ROFF_UNRECOV_ERR (UINT64_MAX)
#define WAL_ROFF_RECOV_ERR (UINT64_MAX - 1)

#define WAL_FLUSH_WAIT_PCT (25)

/* clang-format on */

struct wal_minmax_info {
Expand All @@ -41,6 +43,15 @@ struct wal_iocb {
void (*iocb)(void *cbarg, merr_t err);
};

struct wal_flush_stats {
uint32_t bufcnt;
uint64_t bufsz;
uint64_t max_buflen;
uint64_t flush_soff[WAL_BUF_MAX];
uint64_t flush_len[WAL_BUF_MAX];
uint64_t flush_tlen;
};

struct wal;
struct mpool;

Expand Down

0 comments on commit 049e790

Please sign in to comment.