@@ -2048,7 +2048,12 @@ logger_load(int debug, const char *fn, const char *logdir, logger *lg, char file
logbat_destroy(lg->seqs_id);
logbat_destroy(lg->seqs_val);
logbat_destroy(lg->dseqs);
ATOMIC_DESTROY(&lg->refcount);
MT_lock_destroy(&lg->lock);
MT_lock_destroy(&lg->rotation_lock);
MT_sema_destroy(&lg->flush_queue_semaphore);
MT_lock_destroy(&lg->flush_lock);
MT_lock_destroy(&lg->flush_queue_lock);
GDKfree(lg->fn);
GDKfree(lg->dir);
GDKfree(lg->local_dir);
@@ -2108,11 +2113,21 @@ logger_new(int debug, const char *fn, const char *logdir, int version, preversio
GDKfree(lg);
return NULL;
}
MT_lock_init(&lg->lock, fn);
if (lg->debug & 1) {
fprintf(stderr, "#logger_new dir set to %s\n", lg->dir);
}

ATOMIC_INIT(&lg->refcount, 0);
MT_lock_init(&lg->lock, fn);
MT_lock_init(&lg->rotation_lock, "rotation_lock");
MT_sema_init(&lg->flush_queue_semaphore, FLUSH_QUEUE_SIZE, "flush_queue_semaphore");
MT_lock_init(&lg->flush_lock, "flush_lock");
MT_lock_init(&lg->flush_queue_lock, "flush_queue_lock");

// flush variables
lg->flush_queue_begin = 0;
lg->flush_queue_length = 0;

if (logger_load(debug, fn, logdir, lg, filename) == GDK_SUCCEED) {
return lg;
}
@@ -2156,7 +2171,12 @@ logger_destroy(logger *lg)
logbat_destroy(lg->catalog_lid);
logger_unlock(lg);
}
ATOMIC_DESTROY(&lg->refcount);
MT_lock_destroy(&lg->lock);
MT_lock_destroy(&lg->rotation_lock);
MT_sema_destroy(&lg->flush_queue_semaphore);
MT_lock_destroy(&lg->flush_lock);
MT_lock_destroy(&lg->flush_queue_lock);
GDKfree(lg->fn);
GDKfree(lg->dir);
GDKfree(lg->buf);
@@ -2215,17 +2235,20 @@ logger_cleanup_range(logger *lg)
gdk_return
logger_activate(logger *lg)
{
MT_lock_set(&lg->rotation_lock);
logger_lock(lg);
if (lg->end > 0 && lg->saved_id+1 == lg->id) {
lg->id++;
logger_close_output(lg);
/* start new file */
if (logger_open_output(lg) != GDK_SUCCEED) {
logger_unlock(lg);
MT_lock_unset(&lg->rotation_lock);
return GDK_FAIL;
}
}
logger_unlock(lg);
MT_lock_unset(&lg->rotation_lock);
return GDK_SUCCEED;
}

@@ -2361,6 +2384,7 @@ log_constant(logger *lg, int type, ptr val, log_id id, lng offset, lng cnt)
(!is_row && !mnstr_writeLng(lg->output_log, nr)) ||
(!is_row && mnstr_write(lg->output_log, &tpe, 1, 1) != 1) ||
(!is_row && !mnstr_writeLng(lg->output_log, offset))) {
(void) ATOMIC_DEC(&lg->refcount);
ok = GDK_FAIL;
goto bailout;
}
@@ -2507,6 +2531,7 @@ internal_log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, int sliced)

bailout:
if (ok != GDK_SUCCEED) {
(void) ATOMIC_DEC(&lg->refcount);
const char *err = mnstr_peek_error(lg->output_log);
TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
}
@@ -2527,6 +2552,8 @@ log_bat_persists(logger *lg, BAT *b, log_id id)

if (logger_add_bat(lg, b, id, -1) != GDK_SUCCEED) {
logger_unlock(lg);
if (!LOG_DISABLED(lg))
(void) ATOMIC_DEC(&lg->refcount);
return GDK_FAIL;
}

@@ -2536,6 +2563,7 @@ log_bat_persists(logger *lg, BAT *b, log_id id)
if (log_write_format(lg, &l) != GDK_SUCCEED ||
mnstr_write(lg->output_log, &ta, 1, 1) != 1) {
logger_unlock(lg);
(void) ATOMIC_DEC(&lg->refcount);
return GDK_FAIL;
}
}
@@ -2544,6 +2572,8 @@ log_bat_persists(logger *lg, BAT *b, log_id id)
fprintf(stderr, "#persists id (%d) bat (%d)\n", id, b->batCacheid);
gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0);
logger_unlock(lg);
if (r != GDK_SUCCEED)
(void) ATOMIC_DEC(&lg->refcount);
return r;
}

@@ -2561,6 +2591,7 @@ log_bat_transient(logger *lg, log_id id)
if (log_write_format(lg, &l) != GDK_SUCCEED) {
TRC_CRITICAL(GDK, "write failed\n");
logger_unlock(lg);
(void) ATOMIC_DEC(&lg->refcount);
return GDK_FAIL;
}
}
@@ -2571,6 +2602,8 @@ log_bat_transient(logger *lg, log_id id)
lg->end += BATcount(BBPquickdesc(bid));
gdk_return r = logger_del_bat(lg, bid);
logger_unlock(lg);
if (r != GDK_SUCCEED)
(void) ATOMIC_DEC(&lg->refcount);
return r;
}

@@ -2596,6 +2629,8 @@ log_delta(logger *lg, BAT *uid, BAT *uval, log_id id)
if (BATtdense(uid)) {
ok = internal_log_bat(lg, uval, id, uid->tseqbase, BATcount(uval), 1);
logger_unlock(lg);
if (!LOG_DISABLED(lg) && ok != GDK_SUCCEED)
(void) ATOMIC_DEC(&lg->refcount);
return ok;
}

@@ -2654,6 +2689,7 @@ log_delta(logger *lg, BAT *uid, BAT *uval, log_id id)
if (ok != GDK_SUCCEED) {
const char *err = mnstr_peek_error(lg->output_log);
TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : "");
(void) ATOMIC_DEC(&lg->refcount);
}
logger_unlock(lg);
return ok;
@@ -2678,7 +2714,11 @@ log_bat_clear(logger *lg, int id)

if (lg->debug & 1)
fprintf(stderr, "#Logged clear %d\n", id);
return log_write_format(lg, &l);

gdk_return r = log_write_format(lg, &l);
if(r != GDK_SUCCEED)
(void) ATOMIC_DEC(&lg->refcount);
return r;
}

#define DBLKSZ 8192
@@ -2690,63 +2730,164 @@ log_bat_clear(logger *lg, int id)
static gdk_return
new_logfile(logger *lg)
{
lng log_large = (GDKdebug & FORCEMITOMASK)?LOG_MINI:LOG_LARGE;
assert(!LOG_DISABLED(lg));
lng p;
p = (lng) getfilepos(getFile(lg->output_log));
if (p == -1)


const lng log_large = (GDKdebug & FORCEMITOMASK)?LOG_MINI:LOG_LARGE;

gdk_return result = GDK_SUCCEED;
MT_lock_set(&lg->rotation_lock);
const lng p = (lng) getfilepos(getFile(lg->output_log));
if (p == -1) {
MT_lock_unset(&lg->rotation_lock);
return GDK_FAIL;
if (p > log_large || (lg->end*1024) > log_large) {
lg->id++;
logger_close_output(lg);
return logger_open_output(lg);
}
return GDK_SUCCEED;
if (( p > log_large || (lg->end*1024) > log_large )) {
if (ATOMIC_GET(&lg->refcount) == 1) {
lg->id++;
logger_close_output(lg);
result = logger_open_output(lg);
lg->request_rotation = false;
}
else {
// Delegate wal rotation to next writer or last flusher.
lg->request_rotation = true;
}
}
MT_lock_unset(&lg->rotation_lock);
return result;
}

gdk_return
log_tend(logger *lg)
{
logformat l;

if (lg->debug & 1)
fprintf(stderr, "#log_tend %d\n", lg->tid);

l.flag = LOG_END;
l.id = lg->tid;
if (lg->flushnow) {
lg->flushnow = 0;
return logger_commit(lg);
}

lg->end++;
if (LOG_DISABLED(lg)) {
return GDK_SUCCEED;
}

if (log_write_format(lg, &l) != GDK_SUCCEED ||
mnstr_flush(lg->output_log, MNSTR_FLUSH_DATA) ||
(!(GDKdebug & NOSYNCMASK) && mnstr_fsync(lg->output_log)) ||
new_logfile(lg) != GDK_SUCCEED) {
TRC_CRITICAL(GDK, "write failed\n");
return GDK_FAIL;
gdk_return result;
logformat l;
l.flag = LOG_END;
l.id = lg->tid;

if ((result = log_write_format(lg, &l)) != GDK_SUCCEED)
(void) ATOMIC_DEC(&lg->refcount);
return result;
}
static int
request_number_flush_queue(logger *lg) {
// Semaphore protects ring buffer structure in queue against overflowing
static int _number = 0;
int result;
MT_sema_down(&lg->flush_queue_semaphore);
MT_lock_set(&lg->flush_queue_lock);
result = ++_number;
const int end = (lg->flush_queue_begin + lg->flush_queue_length) % FLUSH_QUEUE_SIZE;
lg->flush_queue[end] = _number;
lg->flush_queue_length++;
MT_lock_unset(&lg->flush_queue_lock);

return result;
}

static void
left_truncate_flush_queue(logger *lg, int limit) {
MT_lock_set(&lg->flush_queue_lock);
lg->flush_queue_begin = (lg->flush_queue_begin + limit) % FLUSH_QUEUE_SIZE;
lg->flush_queue_length -= limit;
MT_lock_unset(&lg->flush_queue_lock);

for (int i = 0; i < limit; i++)
MT_sema_up(&lg->flush_queue_semaphore);
}

static int
number_in_flush_queue(logger *lg, int number) {
MT_lock_set(&lg->flush_queue_lock);
const int fql = lg->flush_queue_length;
MT_lock_unset(&lg->flush_queue_lock);
for (int i = 0; i < fql; i++) {
const int idx = (lg->flush_queue_begin + i) % FLUSH_QUEUE_SIZE;
if (lg->flush_queue[idx] == number) {
return 1;
}
}
return GDK_SUCCEED;
return 0;
}

gdk_return
static int
flush_queue_length(logger *lg) {
MT_lock_set(&lg->flush_queue_lock);
const int fql = lg->flush_queue_length;
MT_lock_unset(&lg->flush_queue_lock);
return fql;
}

static gdk_return
log_tdone(logger *lg, ulng commit_ts)
{
if (lg->debug & 1)
fprintf(stderr, "#log_tdone %d\n", lg->tid);
fprintf(stderr, "#log_tdone " LLFMT "\n", commit_ts);

if (lg->current) {
lg->current->last_tid = lg->tid;
lg->current->last_ts = commit_ts;
}
return GDK_SUCCEED;
}

gdk_return
log_tflush(logger* lg, ulng log_file_id, ulng commit_ts) {

if (lg->flushnow) {
lg->flushnow = 0;
log_tdone(lg, commit_ts);
return logger_commit(lg);
}

if (LOG_DISABLED(lg)) {
return GDK_SUCCEED;
}

if (log_file_id == lg->id) {
int number = request_number_flush_queue(lg);

MT_lock_set(&lg->flush_lock);
/* the transaction is not yet flushed */
if (number_in_flush_queue(lg, number)) {
/* number of transactions in the group commit */
const int fqueue_length = flush_queue_length(lg);
/* flush + fsync */
if (mnstr_flush(lg->output_log, MNSTR_FLUSH_DATA) ||
(!(GDKdebug & NOSYNCMASK) && mnstr_fsync(lg->output_log)) ||
new_logfile(lg) != GDK_SUCCEED) {
/* flush failed */
MT_lock_unset(&lg->flush_lock);
(void) ATOMIC_DEC(&lg->refcount);
return GDK_FAIL;
}
else {
/* flush succeeded */
left_truncate_flush_queue(lg, fqueue_length);
}
}
/* else the transaction was already flushed in a group commit.
* No need to do anything */
}
/* else the log file has already rotated and hence my wal messages are already flushed.
* No need to do anything */


log_tdone(lg, commit_ts);
(void) ATOMIC_DEC(&lg->refcount);
MT_lock_unset(&lg->flush_lock);

return GDK_SUCCEED;
}

static gdk_return
log_sequence_(logger *lg, int seq, lng val)
{
@@ -2763,6 +2904,7 @@ log_sequence_(logger *lg, int seq, lng val)
if (log_write_format(lg, &l) != GDK_SUCCEED ||
!mnstr_writeLng(lg->output_log, val)) {
TRC_CRITICAL(GDK, "write failed\n");
(void) ATOMIC_DEC(&lg->refcount);
return GDK_FAIL;
}
return GDK_SUCCEED;
@@ -2910,44 +3052,55 @@ logger_find_bat(logger *lg, log_id id)
return bid;
}


gdk_return
log_tstart(logger *lg, bool flushnow)
log_tstart(logger *lg, bool flushnow, ulng *log_file_id)
{
logformat l;

MT_lock_set(&lg->rotation_lock);
logger_lock(lg);
if (flushnow) {
if (flushnow || lg->request_rotation) {
lg->id++;
logger_close_output(lg);
/* start new file */
if (logger_open_output(lg) != GDK_SUCCEED) {
logger_unlock(lg);
MT_lock_unset(&lg->rotation_lock);
return GDK_FAIL;
}
while (lg->saved_id+1 < lg->id) {
logger_unlock(lg);
logger_flush(lg, (1ULL<<63));
logger_lock(lg);
lg->request_rotation = false;
if (flushnow) {
while (lg->saved_id+1 < lg->id) {
logger_unlock(lg);
logger_flush(lg, (1ULL<<63));
logger_lock(lg);
}
lg->flushnow = flushnow;
}
lg->flushnow = flushnow;
}

(void) ATOMIC_INC(&lg->refcount);
*log_file_id = lg->id;
lg->end++;

if (LOG_DISABLED(lg)) {
(void) ATOMIC_DEC(&lg->refcount);
logger_unlock(lg);
MT_lock_unset(&lg->rotation_lock);
return GDK_SUCCEED;
}

logformat l;
l.flag = LOG_START;
l.id = ++lg->tid;

if (lg->debug & 1)
fprintf(stderr, "#log_tstart %d\n", lg->tid);
if (log_write_format(lg, &l) != GDK_SUCCEED) {
(void) ATOMIC_DEC(&lg->refcount);
logger_unlock(lg);
MT_lock_unset(&lg->rotation_lock);
return GDK_FAIL;
}

logger_unlock(lg);
MT_lock_unset(&lg->rotation_lock);
return GDK_SUCCEED;
}
@@ -68,9 +68,9 @@ gdk_export gdk_return log_delta(logger *lg, BAT *uid, BAT *uval, log_id id);
/* mark end of batgroup insert or clear */
//gdk_export gdk_return log_batgroup_end(logger *lg, oid id);

gdk_export gdk_return log_tstart(logger *lg, bool flush);
gdk_export gdk_return log_tstart(logger *lg, bool flushnow, ulng *log_file_id);
gdk_export gdk_return log_tend(logger *lg);
gdk_export gdk_return log_tdone(logger *lg, ulng commit_ts);
gdk_export gdk_return log_tflush(logger *lg, ulng log_file_id, ulng commit_ts); /* Flush the WAL to disk using group commit */

gdk_export gdk_return log_sequence(logger *lg, int seq, lng id);
gdk_export log_bid logger_find_bat(logger *lg, log_id id);
@@ -9,6 +9,8 @@
#ifndef _LOGGER_INTERNALS_H_
#define _LOGGER_INTERNALS_H_

#define FLUSH_QUEUE_SIZE 2048 /* maximum size of the flush queue, i.e. maximum number of transactions committing simultaneously */

typedef struct logged_range_t {
ulng id; /* log file id */
int first_tid; /* first */
@@ -26,6 +28,7 @@ struct logger {
int saved_tid; /* id of transaction which was flushed out (into BBP storage) */
bool flushing;
bool flushnow;
bool request_rotation;
logged_range *pending;
logged_range *current;

@@ -42,6 +45,8 @@ struct logger {
stream *input_log; /* current stream to flush */
lng end; /* end of pre-allocated blocks for faster f(data)sync */

ATOMIC_TYPE refcount; /* Number of active writers and flushers in the logger */ // TODO check refcount in c->log and c->end
MT_Lock rotation_lock;
MT_Lock lock;
/* Store log_bids (int) to circumvent trouble with reference counting */
BAT *catalog_bid; /* int bid column */
@@ -63,6 +68,14 @@ struct logger {

void *buf;
size_t bufsize;

/* flush variables */
int flush_queue[FLUSH_QUEUE_SIZE]; /* circular array with the current transactions' ids waiting to be flushed */
int flush_queue_begin; /* start index of the queue */
int flush_queue_length; /* length of the queue */
MT_Sema flush_queue_semaphore; /*to protect the queue against ring buffer overflows */
MT_Lock flush_queue_lock; /* to protect the queue against concurrent reads and writes */
MT_Lock flush_lock; /* so only one transaction can flush to disk at any given time */
};

struct old_logger {
@@ -3138,9 +3138,9 @@ bl_log_isnew(sqlstore *store)
}

static int
bl_tstart(sqlstore *store, bool flush)
bl_tstart(sqlstore *store, bool flush, ulng *log_file_id)
{
return log_tstart(store->logger, flush) == GDK_SUCCEED ? LOG_OK : LOG_ERR;
return log_tstart(store->logger, flush, log_file_id) == GDK_SUCCEED ? LOG_OK : LOG_ERR;
}

static int
@@ -3150,9 +3150,9 @@ bl_tend(sqlstore *store)
}

static int
bl_tdone(sqlstore *store, ulng commit_ts)
bl_tflush(sqlstore *store, ulng log_file_id, ulng commit_ts)
{
return log_tdone(store->logger, commit_ts) == GDK_SUCCEED ? LOG_OK : LOG_ERR;
return log_tflush(store->logger, log_file_id, commit_ts) == GDK_SUCCEED ? LOG_OK : LOG_ERR;
}

static int
@@ -3552,7 +3552,7 @@ bat_logger_init( logger_functions *lf )
lf->log_isnew = bl_log_isnew;
lf->log_tstart = bl_tstart;
lf->log_tend = bl_tend;
lf->log_tdone = bl_tdone;
lf->log_tflush = bl_tflush;
lf->log_sequence = bl_sequence;
lf->get_snapshot_files = bl_snapshot;
}
@@ -284,9 +284,9 @@ typedef int (*logger_changes_fptr)(struct sqlstore *store);
typedef int (*logger_get_sequence_fptr) (struct sqlstore *store, int seq, lng *id);

typedef int (*log_isnew_fptr)(struct sqlstore *store);
typedef int (*log_tstart_fptr) (struct sqlstore *store, bool flush);
typedef int (*log_tstart_fptr) (struct sqlstore *store, bool flush, ulng *log_file_id);
typedef int (*log_tend_fptr) (struct sqlstore *store);
typedef int (*log_tdone_fptr) (struct sqlstore *store, ulng commit_ts);
typedef int (*log_tflush_fptr) (struct sqlstore *store, ulng log_file_id, ulng commit_tis);
typedef lng (*log_save_id_fptr) (struct sqlstore *store);
typedef int (*log_sequence_fptr) (struct sqlstore *store, int seq, lng id);

@@ -319,7 +319,7 @@ typedef struct logger_functions {
log_isnew_fptr log_isnew;
log_tstart_fptr log_tstart;
log_tend_fptr log_tend;
log_tdone_fptr log_tdone;
log_tflush_fptr log_tflush;
log_save_id_fptr log_save_id;
log_sequence_fptr log_sequence;
} logger_functions;
@@ -3909,7 +3909,7 @@ sql_trans_commit(sql_trans *tr)

if (!list_empty(tr->changes)) {
int flush = 0;
ulng commit_ts = 0, oldest = 0;
ulng commit_ts = 0, oldest = 0, log_file_id = 0;

MT_lock_set(&store->commit);

@@ -3922,7 +3922,9 @@ sql_trans_commit(sql_trans *tr)
}
}

if (!tr->parent && (!list_empty(tr->dependencies) || !list_empty(tr->depchanges))) {
if (!tr->parent &&
ATOMIC_GET(&store->nr_active) == 1 &&
(!list_empty(tr->dependencies) || !list_empty(tr->depchanges))) {
ok = transaction_check_dependencies_and_removals(tr);
if (ok != LOG_OK) {
sql_trans_rollback(tr, true);
@@ -3932,12 +3934,17 @@ sql_trans_commit(sql_trans *tr)
}

/* log changes should only be done if there is something to log */
if (!tr->parent && tr->logchanges > 0) {
int min_changes = GDKdebug & FORCEMITOMASK ? 5 : 1000000;
flush = (tr->logchanges > min_changes && list_empty(store->changes));
if (flush)
MT_lock_set(&store->flush);
ok = store->logger_api.log_tstart(store, flush);
const bool log = !tr->parent && tr->logchanges > 0;

if (log) {
const int min_changes = GDKdebug & FORCEMITOMASK ? 5 : 1000000;
flush = log && (tr->logchanges > min_changes && list_empty(store->changes));
}

if (flush)
MT_lock_set(&store->flush);
if (log) {
ok = store->logger_api.log_tstart(store, flush, &log_file_id); /* wal start */
/* log */
for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) {
sql_change *c = n->data;
@@ -3958,18 +3965,15 @@ sql_trans_commit(sql_trans *tr)
if (ok == LOG_OK && store->prev_oid != store->obj_id)
ok = store->logger_api.log_sequence(store, OBJ_SID, store->obj_id);
store->prev_oid = store->obj_id;
if (ok == LOG_OK && !flush)
ok = store->logger_api.log_tend(store); /* flush/sync */
store_lock(store);
commit_ts = tr->parent ? tr->parent->tid : store_timestamp(store);
if (ok == LOG_OK && !flush) /* mark as done */
ok = store->logger_api.log_tdone(store, commit_ts);
} else {
store_lock(store);
commit_ts = tr->parent ? tr->parent->tid : store_timestamp(store);
if (tr->parent)
tr->parent->logchanges += tr->logchanges;


if (ok == LOG_OK)
ok = store->logger_api.log_tend(store); /* wal end */
}
store_lock(store);
commit_ts = tr->parent ? tr->parent->tid : store_timestamp(store);
if (tr->parent)
tr->parent->logchanges += tr->logchanges;
oldest = tr->parent ? commit_ts : store_oldest(store);
tr->logchanges = 0;
TRC_DEBUG(SQL_STORE, "Forwarding changes (" ULLFMT ", " ULLFMT ") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts);
@@ -3986,15 +3990,6 @@ sql_trans_commit(sql_trans *tr)
c->obj->new = 0;
c->ts = commit_ts;
}
/* when directly flushing: flush logger after changes got applied */
if (flush) {
if (ok == LOG_OK) {
ok = store->logger_api.log_tend(store); /* flush/sync */
if (ok == LOG_OK)
ok = store->logger_api.log_tdone(store, commit_ts); /* mark as done */
}
MT_lock_unset(&store->flush);
}
/* propagate transaction dependencies to the storage only if other transactions are running */
if (ok == LOG_OK && !tr->parent && ATOMIC_GET(&store->nr_active) > 1) {
if (!list_empty(tr->dependencies)) {
@@ -4026,6 +4021,17 @@ sql_trans_commit(sql_trans *tr)
}
tr->ts = commit_ts;
store_unlock(store);
/* flush the log structure */
if (log) {
if (!flush)
MT_lock_unset(&store->commit); /* release the commit log when flushing to disk */
if (ok == LOG_OK)
ok = store->logger_api.log_tflush(store, log_file_id, commit_ts); /* flush/sync */
if (!flush)
MT_lock_set(&store->commit); /* release the commit log when flushing to disk */
if (flush)
MT_lock_unset(&store->flush);
}
MT_lock_unset(&store->commit);
list_destroy(tr->changes);
tr->changes = NULL;