Skip to content

Commit

Permalink
dsync: Try to commit transactions every dsync_commit_msgs_interval me…
Browse files Browse the repository at this point in the history
…ssages

This was first attempted to be implemented by
ec0cc8f, but it was later partially
reverted by 5973d49. This current
commit should fix its problems.
  • Loading branch information
sirainen authored and GitLab committed May 17, 2017
1 parent e73fbb6 commit a76faea
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/doveadm/doveadm-dsync.c
Expand Up @@ -93,6 +93,7 @@ struct dsync_cmd_context {
const char *error;

unsigned int lock_timeout;
unsigned int import_commit_msgs_interval;

bool lock:1;
bool purge_remote:1;
Expand Down Expand Up @@ -592,6 +593,7 @@ cmd_dsync_run(struct doveadm_mail_cmd_context *_ctx, struct mail_user *user)
set.virtual_all_box = ctx->virtual_all_box;
memcpy(set.sync_box_guid, ctx->mailbox_guid, sizeof(set.sync_box_guid));
set.lock_timeout_secs = ctx->lock_timeout;
set.import_commit_msgs_interval = ctx->import_commit_msgs_interval;
set.state = ctx->state_input;
set.mailbox_alt_char = doveadm_settings->dsync_alt_char[0];

Expand Down Expand Up @@ -1111,6 +1113,7 @@ static struct doveadm_mail_cmd_context *cmd_dsync_alloc(void)
p_array_init(&ctx->namespace_prefixes, ctx->ctx.pool, 4);
if ((doveadm_settings->parsed_features & DSYNC_FEATURE_EMPTY_HDR_WORKAROUND) != 0)
ctx->empty_hdr_workaround = TRUE;
ctx->import_commit_msgs_interval = doveadm_settings->dsync_commit_msgs_interval;
return &ctx->ctx;
}

Expand Down
2 changes: 2 additions & 0 deletions src/doveadm/doveadm-settings.c
Expand Up @@ -72,6 +72,7 @@ static const struct setting_define doveadm_setting_defines[] = {
DEF(SET_STR, director_username_hash),
DEF(SET_STR, doveadm_api_key),
DEF(SET_STR, dsync_features),
DEF(SET_UINT, dsync_commit_msgs_interval),
DEF(SET_STR, doveadm_http_rawlog_dir),

{ SET_STRLIST, "plugin", offsetof(struct doveadm_settings, plugin_envs), NULL },
Expand All @@ -95,6 +96,7 @@ const struct doveadm_settings doveadm_default_settings = {
.dsync_alt_char = "_",
.dsync_remote_cmd = "ssh -l%{login} %{host} doveadm dsync-server -u%u -U",
.dsync_features = "",
.dsync_commit_msgs_interval = 100,
.ssl_client_ca_dir = "",
.ssl_client_ca_file = "",
.director_username_hash = "%Lu",
Expand Down
1 change: 1 addition & 0 deletions src/doveadm/doveadm-settings.h
Expand Up @@ -29,6 +29,7 @@ struct doveadm_settings {
const char *director_username_hash;
const char *doveadm_api_key;
const char *dsync_features;
unsigned int dsync_commit_msgs_interval;
const char *doveadm_http_rawlog_dir;
enum dsync_features parsed_features;
ARRAY(const char *) plugin_envs;
Expand Down
1 change: 1 addition & 0 deletions src/doveadm/dsync/dsync-brain-mailbox.c
Expand Up @@ -236,6 +236,7 @@ dsync_brain_sync_mailbox_init_remote(struct dsync_brain *brain,
brain->sync_until_timestamp,
brain->sync_max_size,
brain->sync_flag,
brain->import_commit_msgs_interval,
import_flags);
}

Expand Down
1 change: 1 addition & 0 deletions src/doveadm/dsync/dsync-brain-private.h
Expand Up @@ -62,6 +62,7 @@ struct dsync_brain {
uoff_t sync_max_size;
const char *sync_flag;
char alt_char;
unsigned int import_commit_msgs_interval;

unsigned int lock_timeout;
int lock_fd;
Expand Down
2 changes: 2 additions & 0 deletions src/doveadm/dsync/dsync-brain.c
Expand Up @@ -221,6 +221,7 @@ dsync_brain_master_init(struct mail_user *user, struct dsync_ibc *ibc,
memcpy(brain->sync_box_guid, set->sync_box_guid,
sizeof(brain->sync_box_guid));
brain->lock_timeout = set->lock_timeout_secs;
brain->import_commit_msgs_interval = set->import_commit_msgs_interval;
brain->master_brain = TRUE;
dsync_brain_set_flags(brain, flags);

Expand Down Expand Up @@ -260,6 +261,7 @@ dsync_brain_master_init(struct mail_user *user, struct dsync_ibc *ibc,
ibc_set.sync_type = sync_type;
ibc_set.hdr_hash_v2 = TRUE;
ibc_set.lock_timeout = set->lock_timeout_secs;
ibc_set.import_commit_msgs_interval = set->import_commit_msgs_interval;
/* reverse the backup direction for the slave */
ibc_set.brain_flags = flags & ~(DSYNC_BRAIN_FLAG_BACKUP_SEND |
DSYNC_BRAIN_FLAG_BACKUP_RECV);
Expand Down
3 changes: 3 additions & 0 deletions src/doveadm/dsync/dsync-brain.h
Expand Up @@ -79,6 +79,9 @@ struct dsync_brain_settings {

/* If non-zero, use dsync lock file for this user */
unsigned int lock_timeout_secs;
/* If non-zero, importing will attempt to commit transaction after
saving this many messages. */
unsigned int import_commit_msgs_interval;
/* Input state for DSYNC_BRAIN_SYNC_TYPE_STATE */
const char *state;
};
Expand Down
14 changes: 13 additions & 1 deletion src/doveadm/dsync/dsync-ibc-stream.c
Expand Up @@ -78,7 +78,7 @@ static const struct {
"send_mail_requests backup_send backup_recv lock_timeout "
"no_mail_sync no_mailbox_renames no_backup_overwrite purge_remote "
"no_notify sync_since_timestamp sync_max_size sync_flags sync_until_timestamp"
"virtual_all_box empty_hdr_workaround"
"virtual_all_box empty_hdr_workaround import_commit_msgs_interval"
},
{ .name = "mailbox_state",
.chr = 'S',
Expand Down Expand Up @@ -704,6 +704,10 @@ dsync_ibc_stream_send_handshake(struct dsync_ibc *_ibc,
dsync_serializer_encode_add(encoder, "lock_timeout",
t_strdup_printf("%u", set->lock_timeout));
}
if (set->import_commit_msgs_interval > 0) {
dsync_serializer_encode_add(encoder, "import_commit_msgs_interval",
t_strdup_printf("%u", set->import_commit_msgs_interval));
}
if (set->sync_since_timestamp > 0) {
dsync_serializer_encode_add(encoder, "sync_since_timestamp",
t_strdup_printf("%ld", (long)set->sync_since_timestamp));
Expand Down Expand Up @@ -820,6 +824,14 @@ dsync_ibc_stream_recv_handshake(struct dsync_ibc *_ibc,
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
}
if (dsync_deserializer_decode_try(decoder, "import_commit_msgs_interval", &value)) {
if (str_to_uint(value, &set->import_commit_msgs_interval) < 0 ||
set->import_commit_msgs_interval == 0) {
dsync_ibc_input_error(ibc, decoder,
"Invalid import_commit_msgs_interval: %s", value);
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
}
if (dsync_deserializer_decode_try(decoder, "sync_since_timestamp", &value)) {
if (str_to_time(value, &set->sync_since_timestamp) < 0 ||
set->sync_since_timestamp == 0) {
Expand Down
1 change: 1 addition & 0 deletions src/doveadm/dsync/dsync-ibc.h
Expand Up @@ -69,6 +69,7 @@ struct dsync_ibc_settings {
enum dsync_brain_flags brain_flags;
bool hdr_hash_v2;
unsigned int lock_timeout;
unsigned int import_commit_msgs_interval;
};

void dsync_ibc_init_pipe(struct dsync_ibc **ibc1_r,
Expand Down
64 changes: 61 additions & 3 deletions src/doveadm/dsync/dsync-mailbox-import.c
Expand Up @@ -44,6 +44,7 @@ struct importer_new_mail {
bool skip:1;
bool expunged:1;
bool copy_failed:1;
bool saved:1;
};

/* for quickly testing that two-way sync doesn't actually do any unexpected
Expand All @@ -66,6 +67,7 @@ struct dsync_mailbox_importer {
uoff_t sync_max_size;
enum mailbox_transaction_flags transaction_flags;
unsigned int hdr_hash_version;
unsigned int commit_msgs_interval;

enum mail_flags sync_flag;
const char *sync_keyword;
Expand Down Expand Up @@ -106,6 +108,7 @@ struct dsync_mailbox_importer {
uint32_t prev_uid, next_local_seq, local_uid_next;
uint64_t local_initial_highestmodseq, local_initial_highestpvtmodseq;
unsigned int import_pos, import_count;
unsigned int first_unsaved_idx, saves_since_commit;

enum mail_error mail_error;

Expand Down Expand Up @@ -222,6 +225,7 @@ dsync_mailbox_import_init(struct mailbox *box,
time_t sync_until_timestamp,
uoff_t sync_max_size,
const char *sync_flag,
unsigned int commit_msgs_interval,
enum dsync_mailbox_import_flags flags)
{
struct dsync_mailbox_importer *importer;
Expand Down Expand Up @@ -257,6 +261,7 @@ dsync_mailbox_import_init(struct mailbox *box,
else
importer->sync_keyword = p_strdup(pool, sync_flag);
}
importer->commit_msgs_interval = commit_msgs_interval;
importer->transaction_flags = MAILBOX_TRANSACTION_FLAG_SYNC;
if ((flags & DSYNC_MAILBOX_IMPORT_FLAG_NO_NOTIFY) != 0)
importer->transaction_flags |= MAILBOX_TRANSACTION_FLAG_NO_NOTIFY;
Expand Down Expand Up @@ -1817,6 +1822,17 @@ int dsync_mailbox_import_change(struct dsync_mailbox_importer *importer,
return importer->failed ? -1 : 0;
}

static int
importer_new_mail_final_uid_cmp(struct importer_new_mail *const *newmail1,
struct importer_new_mail *const *newmail2)
{
if ((*newmail1)->final_uid < (*newmail2)->final_uid)
return -1;
if ((*newmail1)->final_uid > (*newmail2)->final_uid)
return 1;
return 0;
}

static void
dsync_mailbox_import_assign_new_uids(struct dsync_mailbox_importer *importer)
{
Expand All @@ -1829,6 +1845,7 @@ dsync_mailbox_import_assign_new_uids(struct dsync_mailbox_importer *importer)
newmail = *newmailp;
if (newmail->skip) {
/* already assigned */
i_assert(newmail->final_uid != 0);
continue;
}

Expand Down Expand Up @@ -1856,6 +1873,9 @@ dsync_mailbox_import_assign_new_uids(struct dsync_mailbox_importer *importer)
}
importer->last_common_uid = common_uid_next-1;
importer->new_uids_assigned = TRUE;
/* Sort the newmails by their final_uid. This is used for tracking
whether an intermediate commit is allowed. */
array_sort(&importer->newmails, importer_new_mail_final_uid_cmp);
}

static int
Expand Down Expand Up @@ -1901,6 +1921,45 @@ dsync_mailbox_import_saved_uid(struct dsync_mailbox_importer *importer,
array_append(&importer->wanted_uids, &uid, 1);
}

static void
dsync_mailbox_import_update_first_saved(struct dsync_mailbox_importer *importer)
{
struct importer_new_mail *const *newmails;
unsigned int count;

newmails = array_get(&importer->newmails, &count);
while (importer->first_unsaved_idx < count) {
if (!newmails[importer->first_unsaved_idx]->saved)
break;
importer->first_unsaved_idx++;
}
}

static void
dsync_mailbox_import_saved_newmail(struct dsync_mailbox_importer *importer,
struct importer_new_mail *newmail)
{
dsync_mailbox_import_saved_uid(importer, newmail->final_uid);
newmail->saved = TRUE;

dsync_mailbox_import_update_first_saved(importer);
importer->saves_since_commit++;
/* we can commit only if all the upcoming mails will have UIDs that
are larger than we're committing.
Note that if any existing UIDs have been changed, the new UID is
usually higher than anything that is being saved so we can't do
an intermediate commit. It's too much extra work to try to handle
that situation. So here this never happens, because then
array_count(wanted_uids) is always higher than first_unsaved_idx. */
if (importer->saves_since_commit >= importer->commit_msgs_interval &&
importer->first_unsaved_idx == array_count(&importer->wanted_uids)) {
if (dsync_mailbox_import_commit(importer, FALSE) < 0)
importer->failed = TRUE;
importer->saves_since_commit = 0;
}
}

static bool
dsync_msg_change_uid(struct dsync_mailbox_importer *importer,
uint32_t old_uid, uint32_t new_uid)
Expand Down Expand Up @@ -2367,7 +2426,7 @@ dsync_mailbox_save_body(struct dsync_mailbox_importer *importer,
}
if (ret > 0) {
i_assert(save_ctx == NULL);
dsync_mailbox_import_saved_uid(importer, newmail->final_uid);
dsync_mailbox_import_saved_newmail(importer, newmail);
return TRUE;
}
/* fallback to saving from remote stream */
Expand Down Expand Up @@ -2447,8 +2506,7 @@ dsync_mailbox_save_body(struct dsync_mailbox_importer *importer,
&importer->mail_error));
importer->failed = TRUE;
} else {
dsync_mailbox_import_saved_uid(importer,
newmail->final_uid);
dsync_mailbox_import_saved_newmail(importer, newmail);
}
}
return TRUE;
Expand Down
1 change: 1 addition & 0 deletions src/doveadm/dsync/dsync-mailbox-import.h
Expand Up @@ -36,6 +36,7 @@ dsync_mailbox_import_init(struct mailbox *box,
time_t sync_until_timestamp,
uoff_t sync_max_size,
const char *sync_flag,
unsigned int commit_msgs_interval,
enum dsync_mailbox_import_flags flags);
int dsync_mailbox_import_attribute(struct dsync_mailbox_importer *importer,
const struct dsync_mailbox_attribute *attr);
Expand Down

0 comments on commit a76faea

Please sign in to comment.