Skip to content

Commit

Permalink
imapc: Fix sending initial FETCH after reconnection SELECTs mailbox
Browse files Browse the repository at this point in the history
Move sending the FETCH when the SELECT returns tagged OK reply instead of
delaying it until mailbox is next synced. Most importantly this allows
sending the FETCH before any retried commands that are also sent after
SELECT receives tagged reply.
  • Loading branch information
sirainen authored and villesavolainen committed Jan 18, 2018
1 parent b185c79 commit 259a4ca
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 70 deletions.
87 changes: 72 additions & 15 deletions src/lib-storage/index/imapc/imapc-mailbox.c
Expand Up @@ -157,35 +157,79 @@ static void imapc_mailbox_idle_notify(struct imapc_mailbox *mbox)
}
}

void imapc_mailbox_fetch_state_finish(struct imapc_mailbox *mbox,
struct mail_index_view *sync_view,
struct mail_index_transaction *trans)
static void
imapc_mailbox_fetch_state_finish(struct imapc_mailbox *mbox)
{
uint32_t lseq, uid, msg_count;

if (mbox->sync_next_lseq == 0)
if (mbox->sync_next_lseq == 0) {
/* FETCH n:*, not 1:* */
i_assert(mbox->state_fetched_success ||
(mbox->box.flags & MAILBOX_FLAG_SAVEONLY) != 0);
return;
}

/* if we haven't seen FETCH reply for some messages at the end of
mailbox they've been externally expunged. */
msg_count = mail_index_view_get_messages_count(sync_view);
msg_count = mail_index_view_get_messages_count(mbox->delayed_sync_view);
for (lseq = mbox->sync_next_lseq; lseq <= msg_count; lseq++) {
mail_index_lookup_uid(sync_view, lseq, &uid);
mail_index_lookup_uid(mbox->delayed_sync_view, lseq, &uid);
if (uid >= mbox->sync_uid_next) {
/* another process already added new messages to index
that our IMAP connection hasn't seen yet */
break;
}
mail_index_expunge(trans, lseq);
mail_index_expunge(mbox->delayed_sync_trans, lseq);
}

mbox->sync_next_lseq = 0;
mbox->sync_next_rseq = 0;
mbox->state_fetched_success = TRUE;
}

static void
imapc_mailbox_fetch_state_callback(const struct imapc_command_reply *reply,
void *context)
{
struct imapc_mailbox *mbox = context;

mbox->state_fetching_uid1 = FALSE;
imapc_client_stop(mbox->storage->client->client);

switch (reply->state) {
case IMAPC_COMMAND_STATE_OK:
imapc_mailbox_fetch_state_finish(mbox);
break;
case IMAPC_COMMAND_STATE_NO:
imapc_copy_error_from_reply(mbox->storage, MAIL_ERROR_PARAMS, reply);
break;
case IMAPC_COMMAND_STATE_DISCONNECTED:
mail_storage_set_internal_error(mbox->box.storage);

break;
default:
mail_storage_set_critical(mbox->box.storage,
"imapc: state FETCH failed: %s", reply->text_full);
break;
}
}

void imapc_mailbox_fetch_state(struct imapc_mailbox *mbox, string_t *str,
uint32_t first_uid)
static void
imapc_mailbox_fetch_state(struct imapc_mailbox *mbox, uint32_t first_uid)
{
struct imapc_command *cmd;

if (mbox->exists_count == 0) {
/* empty mailbox - no point in fetching anything */
mbox->state_fetched_success = TRUE;
return;
}
if (mbox->state_fetching_uid1) {
/* retrying after reconnection - don't send duplicate */
return;
}

string_t *str = t_str_new(64);
str_printfa(str, "UID FETCH %u:* (FLAGS", first_uid);
if (imapc_mailbox_has_modseqs(mbox)) {
str_append(str, " MODSEQ");
Expand All @@ -207,6 +251,19 @@ void imapc_mailbox_fetch_state(struct imapc_mailbox *mbox, string_t *str,

}
str_append_c(str, ')');

cmd = imapc_client_mailbox_cmd(mbox->client_box,
imapc_mailbox_fetch_state_callback, mbox);
if (first_uid == 1) {
mbox->sync_next_lseq = 1;
mbox->sync_next_rseq = 1;
mbox->state_fetched_success = FALSE;
/* only the FETCH 1:* is retriable - others will be retried
by the 1:* after the reconnection */
imapc_command_set_flags(cmd, IMAPC_COMMAND_FLAG_RETRIABLE);
}
mbox->state_fetching_uid1 = first_uid == 1;
imapc_command_send(cmd, str_c(str));
}

static void
Expand All @@ -220,19 +277,21 @@ imapc_untagged_exists(const struct imapc_untagged_reply *reply,
if (mbox == NULL)
return;

mbox->exists_count = exists_count;
mbox->exists_received = TRUE;

view = mbox->delayed_sync_view;
if (view == NULL)
view = imapc_mailbox_get_sync_view(mbox);

if (mbox->selecting) {
/* We don't know the latest flags, refresh them. */
mbox->sync_fetch_first_uid = 1;
imapc_mailbox_fetch_state(mbox, 1);
} else if (mbox->sync_fetch_first_uid != 1) {
hdr = mail_index_get_header(view);
mbox->sync_fetch_first_uid = hdr->next_uid;
imapc_mailbox_fetch_state(mbox, hdr->next_uid);
}
mbox->exists_count = exists_count;
mbox->exists_received = TRUE;
imapc_mailbox_idle_notify(mbox);
}

Expand Down Expand Up @@ -337,7 +396,7 @@ imapc_mailbox_msgmap_update(struct imapc_mailbox *mbox,
imapc_msgmap_append(msgmap, rseq, uid);
if (uid < mbox->min_append_uid) {
/* message is already added to index */
} else if (mbox->syncing) {
} else {
mail_index_append(mbox->delayed_sync_trans,
uid, lseq_r);
mbox->min_append_uid = uid + 1;
Expand Down Expand Up @@ -452,8 +511,6 @@ static void imapc_untagged_fetch(const struct imapc_untagged_reply *reply,
if (rseq == mbox->sync_next_rseq) {
/* we're doing the initial full sync of mails. expunge any
mails that no longer exist. */
i_assert(mbox->syncing);

while (mbox->sync_next_lseq < lseq) {
mail_index_expunge(mbox->delayed_sync_trans,
mbox->sync_next_lseq);
Expand Down
10 changes: 5 additions & 5 deletions src/lib-storage/index/imapc/imapc-storage.c
Expand Up @@ -137,7 +137,8 @@ void imapc_mailbox_run_nofetch(struct imapc_mailbox *mbox)
{
do {
imapc_client_run(mbox->storage->client->client);
} while (mbox->storage->reopen_count > 0);
} while (mbox->storage->reopen_count > 0 ||
mbox->state_fetching_uid1);
}

void imapc_simple_callback(const struct imapc_command_reply *reply,
Expand Down Expand Up @@ -625,9 +626,6 @@ static void imapc_mailbox_reopen(void *context)
imapc_mailbox_get_remote_name(mbox));
}
mbox->storage->reopen_count++;

if (mbox->syncing)
imapc_sync_mailbox_reopened(mbox);
}

static void
Expand Down Expand Up @@ -728,8 +726,10 @@ int imapc_mailbox_select(struct imapc_mailbox *mbox)
imapc_mailbox_get_remote_name(mbox));
}

while (ctx.ret == -2)
while (ctx.ret == -2 || mbox->state_fetching_uid1)
imapc_mailbox_run(mbox);
if (!mbox->state_fetched_success)
ctx.ret = -1;
return ctx.ret;
}

Expand Down
8 changes: 2 additions & 6 deletions src/lib-storage/index/imapc/imapc-storage.h
Expand Up @@ -145,6 +145,8 @@ struct imapc_mailbox {
unsigned int initial_sync_done:1;
unsigned int selected:1;
unsigned int exists_received:1;
unsigned int state_fetching_uid1:1;
unsigned int state_fetched_success:1;
};

struct imapc_simple_context {
Expand Down Expand Up @@ -195,12 +197,6 @@ void imapc_mailbox_set_corrupted(struct imapc_mailbox *mbox,
const char *reason, ...) ATTR_FORMAT(2, 3);
const char *imapc_mailbox_get_remote_name(struct imapc_mailbox *mbox);

void imapc_mailbox_fetch_state(struct imapc_mailbox *mbox, string_t *cmd,
uint32_t first_uid);
void imapc_mailbox_fetch_state_finish(struct imapc_mailbox *mbox,
struct mail_index_view *sync_view,
struct mail_index_transaction *trans);

void imapc_storage_client_register_untagged(struct imapc_storage_client *client,
const char *name,
imapc_storage_callback_t *callback);
Expand Down
52 changes: 9 additions & 43 deletions src/lib-storage/index/imapc/imapc-sync.c
Expand Up @@ -352,16 +352,12 @@ imapc_initial_sync_check(struct imapc_sync_context *ctx, bool nooped)
}

static void
imapc_sync_send_commands(struct imapc_sync_context *ctx, uint32_t first_uid)
imapc_sync_send_commands(struct imapc_sync_context *ctx)
{
string_t *cmd = t_str_new(64);

if (ctx->mbox->exists_count == 0) {
/* empty mailbox - no point in fetching anything */
return;
}
imapc_mailbox_fetch_state(ctx->mbox, cmd, first_uid);
imapc_sync_cmd(ctx, str_c(cmd));

if (IMAPC_BOX_HAS_FEATURE(ctx->mbox, IMAPC_FEATURE_GMAIL_MIGRATION) &&
ctx->mbox->storage->set->pop3_deleted_flag[0] != '\0') {
Expand Down Expand Up @@ -407,20 +403,8 @@ static void imapc_sync_index(struct imapc_sync_context *ctx)
imapc_sync_finish_store(ctx);
pool_unref(&ctx->pool);

if (!mbox->initial_sync_done) {
/* with initial syncing we're fetching all messages' flags and
expunge mails from local index that no longer exist on
remote server */
i_assert(mbox->sync_fetch_first_uid == 1);
mbox->sync_next_lseq = 1;
mbox->sync_next_rseq = 1;
}
if (mbox->sync_fetch_first_uid != 0) {
/* we'll resync existing messages' flags and add new messages.
adding new messages requires sync locking to avoid
duplicates. */
imapc_sync_send_commands(ctx, mbox->sync_fetch_first_uid);
}
if (!mbox->initial_sync_done)
imapc_sync_send_commands(ctx);

imapc_sync_expunge_finish(ctx);
while (ctx->sync_command_count > 0)
Expand All @@ -431,10 +415,6 @@ static void imapc_sync_index(struct imapc_sync_context *ctx)
imapc_sync_uid_next(ctx);
imapc_sync_highestmodseq(ctx);

if (!ctx->failed) {
imapc_mailbox_fetch_state_finish(ctx->mbox, ctx->sync_view,
ctx->trans);
}
if (mbox->box.v.sync_notify != NULL)
mbox->box.v.sync_notify(&mbox->box, 0, 0);

Expand All @@ -448,26 +428,6 @@ static void imapc_sync_index(struct imapc_sync_context *ctx)
}
}

void imapc_sync_mailbox_reopened(struct imapc_mailbox *mbox)
{
struct imapc_sync_context *ctx = mbox->sync_ctx;

i_assert(mbox->syncing);

if (!mbox->initial_sync_done) {
/* the same sync commands are automatically already retried by
lib-imap-client. don't duplicate them here. */
return;
}

/* we got disconnected while syncing. need to
re-fetch everything */
mbox->sync_next_lseq = 1;
mbox->sync_next_rseq = 1;

imapc_sync_send_commands(ctx, 1);
}

static int
imapc_sync_begin(struct imapc_mailbox *mbox,
struct imapc_sync_context **ctx_r, bool force)
Expand Down Expand Up @@ -612,6 +572,12 @@ imapc_mailbox_sync_init(struct mailbox *box, enum mailbox_sync_flags flags)

imapc_noop_if_needed(mbox, flags);

if (imapc_storage_client_handle_auth_failure(mbox->storage->client))
ret = -1;
else if (!mbox->state_fetched_success && !mbox->state_fetching_uid1) {
/* initial FETCH failed already */
ret = -1;
}
if (imapc_mailbox_commit_delayed_trans(mbox, &changes) < 0)
ret = -1;
if ((changes || mbox->sync_fetch_first_uid != 0 ||
Expand Down
1 change: 0 additions & 1 deletion src/lib-storage/index/imapc/imapc-sync.h
Expand Up @@ -35,6 +35,5 @@ struct mailbox_sync_context *
imapc_mailbox_sync_init(struct mailbox *box, enum mailbox_sync_flags flags);
int imapc_mailbox_sync_deinit(struct mailbox_sync_context *ctx,
struct mailbox_sync_status *status_r);
void imapc_sync_mailbox_reopened(struct imapc_mailbox *mbox);

#endif

0 comments on commit 259a4ca

Please sign in to comment.