Skip to content

Commit

Permalink
director: HOST-RESET-USERS moves users more slowly now.
Browse files Browse the repository at this point in the history
By default only 100 users can be moved in parallel. This can be overridden
with HOST-RESET-USERS parameter.

This delaying is especially useful when director_flush_socket is used to
avoid huge floods to the script service. Even without the socket it's still
good for avoiding unnecessary load spikes when all users are kicked at once
and they reconnect back at the same time.
  • Loading branch information
sirainen committed Oct 25, 2016
1 parent 8ca411e commit 309ad77
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 27 deletions.
149 changes: 122 additions & 27 deletions src/director/doveadm-connection.c
Expand Up @@ -24,6 +24,17 @@
#define DOVEADM_HANDSHAKE "VERSION\tdirector-doveadm\t1\t0\n"

#define MAX_VALID_VHOST_COUNT 1000
#define DEFAULT_MAX_MOVING_USERS 100

struct director_reset_cmd {
struct director_reset_cmd *prev, *next;

struct director *dir;
struct doveadm_connection *_conn;
struct user_directory_iter *iter;
unsigned int host_idx, hosts_count;
unsigned int max_moving_users;
};

struct doveadm_connection {
struct doveadm_connection *prev, *next;
Expand All @@ -34,11 +45,15 @@ struct doveadm_connection {
struct ostream *output;
struct director *dir;

struct director_reset_cmd *reset_cmd;

unsigned int handshaked:1;
};

static struct doveadm_connection *doveadm_connections;
static struct director_reset_cmd *reset_cmds = NULL;

static void doveadm_connection_set_io(struct doveadm_connection *conn);
static void doveadm_connection_deinit(struct doveadm_connection **_conn);

static void doveadm_cmd_host_list(struct doveadm_connection *conn)
Expand Down Expand Up @@ -408,66 +423,126 @@ doveadm_cmd_host_flush(struct doveadm_connection *conn, const char *const *args)
return 1;
}

static void
director_host_reset_users(struct director *dir, struct director_host *src,
static void doveadm_reset_cmd_free(struct director_reset_cmd *cmd)
{
DLLIST_REMOVE(&reset_cmds, cmd);

if (cmd->iter != NULL)
user_directory_iter_deinit(&cmd->iter);
if (cmd->_conn != NULL)
cmd->_conn->reset_cmd = NULL;
i_free(cmd);
}

static bool
director_host_reset_users(struct director_reset_cmd *cmd,
struct mail_host *host)
{
struct user_directory_iter *iter;
struct director *dir = cmd->dir;
struct user *user;
struct mail_host *new_host;

if (dir->users_moving_count >= cmd->max_moving_users)
return FALSE;

if (dir->right != NULL)
director_connection_cork(dir->right);

iter = user_directory_iter_init(dir->users);
while ((user = user_directory_iter_next(iter)) != NULL) {
if (cmd->iter == NULL)
cmd->iter = user_directory_iter_init(dir->users);
while ((user = user_directory_iter_next(cmd->iter)) != NULL) {
if (user->host != host)
continue;
new_host = mail_host_get_by_hash(dir->mail_hosts,
user->username_hash,
mail_host_get_tag(host));
if (new_host != host) T_BEGIN {
director_move_user(dir, src, NULL,
director_move_user(dir, dir->self_host, NULL,
user->username_hash, new_host);
} T_END;
if (dir->users_moving_count >= cmd->max_moving_users)
break;
}
user_directory_iter_deinit(&iter);
if (user == NULL)
user_directory_iter_deinit(&cmd->iter);
if (dir->right != NULL)
director_connection_uncork(dir->right);
return user == NULL;
}

static void
doveadm_cmd_host_reset_users_all(struct doveadm_connection *conn)
static bool
director_reset_cmd_run(struct director_reset_cmd *cmd)
{
struct mail_host *const *hostp;
struct mail_host *const *hosts;
unsigned int count;

array_foreach(mail_hosts_get(conn->dir->mail_hosts), hostp)
director_host_reset_users(conn->dir, conn->dir->self_host, *hostp);
o_stream_nsend(conn->output, "OK\n", 3);
hosts = array_get(mail_hosts_get(cmd->dir->mail_hosts), &count);
if (count > cmd->hosts_count)
count = cmd->hosts_count;
while (cmd->host_idx < count) {
if (!director_host_reset_users(cmd, hosts[cmd->host_idx]))
return FALSE;
cmd->host_idx++;
}
if (cmd->_conn != NULL) {
struct doveadm_connection *conn = cmd->_conn;

o_stream_nsend(conn->output, "OK\n", 3);
if (conn->io == NULL)
doveadm_connection_set_io(conn);
}
doveadm_reset_cmd_free(cmd);
return TRUE;
}

static int
doveadm_cmd_host_reset_users(struct doveadm_connection *conn,
const char *const *args)
{
struct mail_host *host;
struct director_reset_cmd *cmd;
struct ip_addr ip;
struct mail_host *const *hosts;
unsigned int i = 0, count;
unsigned int max_moving_users = DEFAULT_MAX_MOVING_USERS;

if (args[0] == NULL || args[0][0] == '\0') {
doveadm_cmd_host_reset_users_all(conn);
return 1;
}

if (net_addr2ip(args[0], &ip) < 0) {
if (args[0] != NULL && args[1] != NULL &&
str_to_uint(args[1], &max_moving_users) < 0) {
i_error("doveadm sent invalid HOST-RESET-USERS parameters");
return -1;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host == NULL)
o_stream_nsend_str(conn->output, "NOTFOUND\n");
else {
director_host_reset_users(conn->dir, conn->dir->self_host, host);
o_stream_nsend(conn->output, "OK\n", 3);

hosts = array_get(mail_hosts_get(conn->dir->mail_hosts), &count);
if (args[0] != NULL && args[0][0] != '\0') {
if (net_addr2ip(args[0], &ip) < 0) {
i_error("doveadm sent invalid HOST-RESET-USERS ip: %s",
args[0]);
return -1;
}

for (i = 0; i < count; i++) {
if (net_ip_compare(&hosts[i]->ip, &ip) == 0)
break;
}
if (i == count) {
o_stream_nsend_str(conn->output, "NOTFOUND\n");
return 1;
}
count = i+1;
}

conn->reset_cmd = cmd = i_new(struct director_reset_cmd, 1);
cmd->dir = conn->dir;
cmd->_conn = conn;
cmd->max_moving_users = max_moving_users;
cmd->host_idx = i;
cmd->hosts_count = count;
DLLIST_PREPEND(&reset_cmds, cmd);

if (!director_reset_cmd_run(cmd)) {
/* we still have work to do. don't handle any more doveadm
input until we're finished. */
io_remove(&conn->io);
return 0;
}
return 1;
}
Expand Down Expand Up @@ -684,6 +759,11 @@ static void doveadm_connection_input(struct doveadm_connection *conn)
doveadm_connection_deinit(&conn);
}

static void doveadm_connection_set_io(struct doveadm_connection *conn)
{
conn->io = io_add(conn->fd, IO_READ, doveadm_connection_input, conn);
}

struct doveadm_connection *
doveadm_connection_init(struct director *dir, int fd)
{
Expand All @@ -695,7 +775,7 @@ doveadm_connection_init(struct director *dir, int fd)
conn->input = i_stream_create_fd(conn->fd, 1024, FALSE);
conn->output = o_stream_create_fd(conn->fd, (size_t)-1, FALSE);
o_stream_set_no_error_handling(conn->output, TRUE);
conn->io = io_add(conn->fd, IO_READ, doveadm_connection_input, conn);
doveadm_connection_set_io(conn);
o_stream_nsend_str(conn->output, DOVEADM_HANDSHAKE);

DLLIST_PREPEND(&doveadm_connections, conn);
Expand All @@ -708,6 +788,11 @@ static void doveadm_connection_deinit(struct doveadm_connection **_conn)

*_conn = NULL;

if (conn->reset_cmd != NULL) {
/* finish the move even if doveadm disconnected */
conn->reset_cmd->_conn = NULL;
}

DLLIST_REMOVE(&doveadm_connections, conn);
io_remove(&conn->io);
i_stream_unref(&conn->input);
Expand All @@ -721,9 +806,19 @@ static void doveadm_connection_deinit(struct doveadm_connection **_conn)

void doveadm_connections_deinit(void)
{
while (reset_cmds != NULL)
doveadm_reset_cmd_free(reset_cmds);
while (doveadm_connections != NULL) {
struct doveadm_connection *conn = doveadm_connections;

doveadm_connection_deinit(&conn);
}
}

void doveadm_connections_continue_reset_cmds(void)
{
while (reset_cmds != NULL) {
if (!director_reset_cmd_run(reset_cmds))
break;
}
}
2 changes: 2 additions & 0 deletions src/director/doveadm-connection.h
Expand Up @@ -7,4 +7,6 @@ struct doveadm_connection *
doveadm_connection_init(struct director *dir, int fd);
void doveadm_connections_deinit(void);

void doveadm_connections_continue_reset_cmds(void);

#endif
1 change: 1 addition & 0 deletions src/director/main.c
Expand Up @@ -239,6 +239,7 @@ static void director_state_changed(struct director *dir)

if (dir->to_request != NULL && array_count(&new_requests) == 0)
timeout_remove(&dir->to_request);
doveadm_connections_continue_reset_cmds();
}

static void main_preinit(void)
Expand Down
3 changes: 3 additions & 0 deletions src/director/user-directory.h
Expand Up @@ -83,6 +83,9 @@ bool user_directory_user_is_recently_updated(struct user_directory *dir,
bool user_directory_user_is_near_expiring(struct user_directory *dir,
struct user *user);

/* Iterate through users in the directory. It's safe to modify user directory
while iterators are running. The moved/removed users will just be skipped
over. */
struct user_directory_iter *
user_directory_iter_init(struct user_directory *dir);
struct user *user_directory_iter_next(struct user_directory_iter *iter);
Expand Down

0 comments on commit 309ad77

Please sign in to comment.