Skip to content

Commit

Permalink
director: doveadm HOST-* commands now wait for ring sync before retur…
Browse files Browse the repository at this point in the history
…ning OK

This should make it easier for tests and maybe for scripts in general, so
they won't think the command failed when it just takes a while to finish.
  • Loading branch information
sirainen authored and cmouse committed Sep 27, 2017
1 parent 5406956 commit 1d0eb22
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 19 deletions.
130 changes: 113 additions & 17 deletions src/director/doveadm-connection.c
Expand Up @@ -25,13 +25,18 @@

#define MAX_VALID_VHOST_COUNT 1000
#define DEFAULT_MAX_MOVING_USERS 100
#define DOVEADM_CONNECTION_RING_SYNC_TIMEOUT_MSECS (30*1000)

enum doveadm_director_cmd_ret {
DOVEADM_DIRECTOR_CMD_RET_FAIL = -1,
DOVEADM_DIRECTOR_CMD_RET_UNFINISHED = 0,
DOVEADM_DIRECTOR_CMD_RET_OK = 1,
DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK,
};

typedef void
doveadm_connection_ring_sync_callback_t(struct doveadm_connection *);

struct director_reset_cmd {
struct director_reset_cmd *prev, *next;

Expand All @@ -51,16 +56,21 @@ struct doveadm_connection {
struct ostream *output;
struct director *dir;

struct timeout *to_ring_sync_abort;
struct director_reset_cmd *reset_cmd;
doveadm_connection_ring_sync_callback_t *ring_sync_callback;

unsigned int handshaked:1;
};

static struct doveadm_connection *doveadm_connections;
static struct doveadm_connection *doveadm_ring_sync_pending_connections;
static struct director_reset_cmd *reset_cmds = NULL;

static void doveadm_connection_set_io(struct doveadm_connection *conn);
static void doveadm_connection_deinit(struct doveadm_connection **_conn);
static void
doveadm_connection_ring_sync_list_move(struct doveadm_connection *conn);

static enum doveadm_director_cmd_ret
doveadm_cmd_host_list(struct doveadm_connection *conn,
Expand Down Expand Up @@ -326,8 +336,7 @@ doveadm_cmd_host_set_or_update(struct doveadm_connection *conn,
handle. */
director_update_host(dir, dir->self_host, NULL, host);

o_stream_nsend(conn->output, "OK\n", 3);
return DOVEADM_DIRECTOR_CMD_RET_OK;
return DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK;
}

static enum doveadm_director_cmd_ret
Expand Down Expand Up @@ -360,19 +369,19 @@ doveadm_cmd_host_updown(struct doveadm_connection *conn, bool down,
o_stream_nsend_str(conn->output, "NOTFOUND\n");
return DOVEADM_DIRECTOR_CMD_RET_OK;
}
if (host->down == down)
;
else if (host->desynced) {
if (host->down == down) {
o_stream_nsend_str(conn->output, "OK\n");
return DOVEADM_DIRECTOR_CMD_RET_OK;
} else if (host->desynced) {
o_stream_nsend_str(conn->output,
"host is already being updated - try again later\n");
return DOVEADM_DIRECTOR_CMD_RET_OK;
} else {
mail_host_set_down(host, down, ioloop_time, "doveadm: ");
director_update_host(conn->dir, conn->dir->self_host,
NULL, host);
return DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK;
}
o_stream_nsend(conn->output, "OK\n", 3);
return DOVEADM_DIRECTOR_CMD_RET_OK;
}

static enum doveadm_director_cmd_ret
Expand Down Expand Up @@ -401,14 +410,14 @@ doveadm_cmd_host_remove(struct doveadm_connection *conn,
return DOVEADM_DIRECTOR_CMD_RET_FAIL;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host == NULL)
if (host == NULL) {
o_stream_nsend_str(conn->output, "NOTFOUND\n");
else {
return DOVEADM_DIRECTOR_CMD_RET_OK;
} else {
director_remove_host(conn->dir, conn->dir->self_host,
NULL, host);
o_stream_nsend(conn->output, "OK\n", 3);
return DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK;
}
return DOVEADM_DIRECTOR_CMD_RET_OK;
}

static void
Expand Down Expand Up @@ -444,14 +453,14 @@ doveadm_cmd_host_flush(struct doveadm_connection *conn, const char *const *args)
return DOVEADM_DIRECTOR_CMD_RET_FAIL;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host == NULL)
if (host == NULL) {
o_stream_nsend_str(conn->output, "NOTFOUND\n");
else {
return DOVEADM_DIRECTOR_CMD_RET_OK;
} else {
director_flush_host(conn->dir, conn->dir->self_host,
NULL, host);
o_stream_nsend(conn->output, "OK\n", 3);
return DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK;
}
return DOVEADM_DIRECTOR_CMD_RET_OK;
}

static void doveadm_reset_cmd_free(struct director_reset_cmd *cmd)
Expand Down Expand Up @@ -771,6 +780,58 @@ struct {
{ "USER-KICK-ALT", doveadm_cmd_user_kick_alt },
};

static void
doveadm_connection_ring_sync_timeout(struct doveadm_connection *conn)
{
doveadm_connection_ring_sync_list_move(conn);
o_stream_nsend_str(conn->output, "Ring sync timed out\n");

doveadm_connection_set_io(conn);
io_set_pending(conn->io);
}

static void
doveadm_connection_set_ring_sync_callback(struct doveadm_connection *conn,
doveadm_connection_ring_sync_callback_t *callback)
{
i_assert(conn->ring_sync_callback == NULL);
i_assert(conn->to_ring_sync_abort == NULL);

conn->ring_sync_callback = callback;
io_remove(&conn->io);
DLLIST_REMOVE(&doveadm_connections, conn);
DLLIST_PREPEND(&doveadm_ring_sync_pending_connections, conn);
conn->to_ring_sync_abort =
timeout_add(DOVEADM_CONNECTION_RING_SYNC_TIMEOUT_MSECS,
doveadm_connection_ring_sync_timeout, conn);
}

static void doveadm_connection_ret_ok(struct doveadm_connection *conn)
{
o_stream_nsend(conn->output, "OK\n", 3);
}

static enum doveadm_director_cmd_ret
doveadm_connection_cmd_run(struct doveadm_connection *conn,
const char *const *args, unsigned int i)
{
enum doveadm_director_cmd_ret ret;

ret = doveadm_director_commands[i].cmd(conn, args);
if (ret != DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK)
return ret;
/* Delay sending OK until ring is synced. This way doveadm will know
whether the call actually succeeded or not. */
if (conn->dir->ring_synced) {
/* director is alone */
i_assert(conn->dir->right == NULL && conn->dir->left == NULL);
o_stream_nsend(conn->output, "OK\n", 3);
return DOVEADM_DIRECTOR_CMD_RET_OK;
}
doveadm_connection_set_ring_sync_callback(conn, doveadm_connection_ret_ok);
return DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK;
}

static enum doveadm_director_cmd_ret
doveadm_connection_cmd(struct doveadm_connection *conn, const char *line)
{
Expand All @@ -786,7 +847,7 @@ doveadm_connection_cmd(struct doveadm_connection *conn, const char *line)

for (unsigned int i = 0; i < N_ELEMENTS(doveadm_director_commands); i++) {
if (strcmp(doveadm_director_commands[i].name, cmd) == 0)
return doveadm_director_commands[i].cmd(conn, args);
return doveadm_connection_cmd_run(conn, args, i);
}
i_error("doveadm sent unknown command: %s", line);
return DOVEADM_DIRECTOR_CMD_RET_FAIL;
Expand Down Expand Up @@ -854,6 +915,8 @@ static void doveadm_connection_deinit(struct doveadm_connection **_conn)

*_conn = NULL;

i_assert(conn->to_ring_sync_abort == NULL);

if (conn->reset_cmd != NULL) {
/* finish the move even if doveadm disconnected */
conn->reset_cmd->_conn = NULL;
Expand All @@ -870,21 +933,54 @@ static void doveadm_connection_deinit(struct doveadm_connection **_conn)
master_service_client_connection_destroyed(master_service);
}

static void
doveadm_connection_ring_sync_list_move(struct doveadm_connection *conn)
{
timeout_remove(&conn->to_ring_sync_abort);
DLLIST_REMOVE(&doveadm_ring_sync_pending_connections, conn);
DLLIST_PREPEND(&doveadm_connections, conn);
}

void doveadm_connections_deinit(void)
{
while (reset_cmds != NULL)
doveadm_reset_cmd_free(reset_cmds);

unsigned int pending_count = 0;
while (doveadm_ring_sync_pending_connections != NULL) {
doveadm_connection_ring_sync_list_move(doveadm_ring_sync_pending_connections);
pending_count++;
}
if (pending_count > 0)
i_warning("Shutting down while %u doveadm connections were waiting for ring sync", pending_count);
while (doveadm_connections != NULL) {
struct doveadm_connection *conn = doveadm_connections;

doveadm_connection_deinit(&conn);
}
}

void doveadm_connections_continue_reset_cmds(void)
static void doveadm_connections_continue_reset_cmds(void)
{
while (reset_cmds != NULL) {
if (!director_reset_cmd_run(reset_cmds))
break;
}
}

void doveadm_connections_ring_synced(void)
{
while (doveadm_ring_sync_pending_connections != NULL) {
struct doveadm_connection *conn =
doveadm_ring_sync_pending_connections;
doveadm_connection_ring_sync_callback_t *callback =
conn->ring_sync_callback;

conn->ring_sync_callback = NULL;
doveadm_connection_ring_sync_list_move(conn);
doveadm_connection_set_io(conn);
io_set_pending(conn->io);
callback(conn);
}
doveadm_connections_continue_reset_cmds();
}
2 changes: 1 addition & 1 deletion src/director/doveadm-connection.h
Expand Up @@ -7,6 +7,6 @@ struct doveadm_connection *
doveadm_connection_init(struct director *dir, int fd);
void doveadm_connections_deinit(void);

void doveadm_connections_continue_reset_cmds(void);
void doveadm_connections_ring_synced(void);

#endif
2 changes: 1 addition & 1 deletion src/director/main.c
Expand Up @@ -249,7 +249,7 @@ static void director_state_changed(struct director *dir)

if (dir->to_request != NULL && array_count(&new_requests) == 0)
timeout_remove(&dir->to_request);
doveadm_connections_continue_reset_cmds();
doveadm_connections_ring_synced();
}

static void main_preinit(void)
Expand Down

0 comments on commit 1d0eb22

Please sign in to comment.