diff --git a/src/auth/auth-worker-client.c b/src/auth/auth-worker-client.c index dd5c3753a5..da42e03a81 100644 --- a/src/auth/auth-worker-client.c +++ b/src/auth/auth-worker-client.c @@ -2,6 +2,7 @@ #include "auth-common.h" #include "base64.h" +#include "connection.h" #include "ioloop.h" #include "net.h" #include "istream.h" @@ -24,13 +25,10 @@ #define CLIENT_STATE_STOP "waiting for shutdown" struct auth_worker_client { + struct connection conn; int refcount; struct auth *auth; - int fd; - struct io *io; - struct istream *input; - struct ostream *output; struct timeout *to_idle; time_t cmd_start; @@ -69,11 +67,11 @@ void auth_worker_refresh_proctitle(const char *state) static void auth_worker_client_check_throttle(struct auth_worker_client *client) { - if (o_stream_get_buffer_used_size(client->output) >= + if (o_stream_get_buffer_used_size(client->conn.output) >= OUTBUF_THROTTLE_SIZE) { /* stop reading new requests until client has read the pending replies. */ - io_remove(&client->io); + io_remove(&client->conn.io); } } @@ -123,9 +121,9 @@ static void auth_worker_send_reply(struct auth_worker_client *client, const char *p; if (worker_restart_request) - o_stream_nsend_str(client->output, "RESTART\n"); - o_stream_nsend(client->output, str_data(str), str_len(str)); - if (o_stream_flush(client->output) < 0 && request != NULL && + o_stream_nsend_str(client->conn.output, "RESTART\n"); + o_stream_nsend(client->conn.output, str_data(str), str_len(str)); + if (o_stream_flush(client->conn.output) < 0 && request != NULL && cmd_duration > AUTH_WORKER_WARN_DISCONNECTED_LONG_CMD_SECS) { p = i_strchr_to_next(str_c(str), '\t'); p = p == NULL ? "BUG" : t_strcut(p, '\t'); @@ -535,7 +533,7 @@ static void list_iter_deinit(struct auth_worker_list_context *ctx) struct auth_worker_client *client = ctx->client; string_t *str; - i_assert(client->io == NULL); + i_assert(client->conn.io == NULL); str = t_str_new(32); if (ctx->auth_request->userdb->userdb->iface-> @@ -545,10 +543,12 @@ static void list_iter_deinit(struct auth_worker_list_context *ctx) str_printfa(str, "%u\tOK\n", ctx->auth_request->id); auth_worker_send_reply(client, NULL, str); - client->io = io_add(client->fd, IO_READ, auth_worker_input, client); + client->conn.io = io_add(client->conn.fd_in, IO_READ, + auth_worker_input, client); auth_worker_client_set_idle_timeout(client); - o_stream_set_flush_callback(client->output, auth_worker_output, client); + o_stream_set_flush_callback(client->conn.output, auth_worker_output, + client); auth_request_unref(&ctx->auth_request); auth_worker_client_unref(&client); i_free(ctx); @@ -570,11 +570,11 @@ static void list_iter_callback(const char *user, void *context) } if (!ctx->sending) - o_stream_cork(ctx->client->output); + o_stream_cork(ctx->client->conn.output); T_BEGIN { str = t_str_new(128); str_printfa(str, "%u\t*\t%s\n", ctx->auth_request->id, user); - o_stream_nsend(ctx->client->output, str_data(str), str_len(str)); + o_stream_nsend(ctx->client->conn.output, str_data(str), str_len(str)); } T_END; if (ctx->sending) { @@ -590,27 +590,27 @@ static void list_iter_callback(const char *user, void *context) ctx->auth_request->userdb->userdb->iface-> iterate_next(ctx->iter); } T_END; - if (o_stream_get_buffer_used_size(ctx->client->output) > OUTBUF_THROTTLE_SIZE) { - if (o_stream_flush(ctx->client->output) < 0) { + if (o_stream_get_buffer_used_size(ctx->client->conn.output) > OUTBUF_THROTTLE_SIZE) { + if (o_stream_flush(ctx->client->conn.output) < 0) { ctx->done = TRUE; break; } } } while (ctx->sent && - o_stream_get_buffer_used_size(ctx->client->output) <= OUTBUF_THROTTLE_SIZE); - o_stream_uncork(ctx->client->output); + o_stream_get_buffer_used_size(ctx->client->conn.output) <= OUTBUF_THROTTLE_SIZE); + o_stream_uncork(ctx->client->conn.output); ctx->sending = FALSE; if (ctx->done) list_iter_deinit(ctx); else - o_stream_set_flush_pending(ctx->client->output, TRUE); + o_stream_set_flush_pending(ctx->client->conn.output, TRUE); } static int auth_worker_list_output(struct auth_worker_list_context *ctx) { int ret; - if ((ret = o_stream_flush(ctx->client->output)) < 0) { + if ((ret = o_stream_flush(ctx->client->conn.output)) < 0) { list_iter_deinit(ctx); return 1; } @@ -649,10 +649,10 @@ auth_worker_handle_list(struct auth_worker_client *client, } ctx->auth_request->userdb = userdb; - io_remove(&ctx->client->io); + io_remove(&ctx->client->conn.io); timeout_remove(&ctx->client->to_idle); - o_stream_set_flush_callback(ctx->client->output, + o_stream_set_flush_callback(ctx->client->conn.output, auth_worker_list_output, ctx); ctx->iter = ctx->auth_request->userdb->userdb->iface-> iterate_init(ctx->auth_request, list_iter_callback, ctx); @@ -694,7 +694,7 @@ auth_worker_handle_line(struct auth_worker_client *client, const char *line) i_error("BUG: Auth-worker received unknown command: %s", args[1]); } - if (client->io != NULL) + if (client->conn.io != NULL) auth_worker_refresh_proctitle(CLIENT_STATE_IDLE); return ret; } @@ -722,7 +722,7 @@ static void auth_worker_input(struct auth_worker_client *client) char *line; bool ret; - switch (i_stream_read(client->input)) { + switch (i_stream_read(client->conn.input)) { case 0: return; case -1: @@ -738,7 +738,7 @@ static void auth_worker_input(struct auth_worker_client *client) } if (!client->version_received) { - line = i_stream_next_line(client->input); + line = i_stream_next_line(client->conn.input); if (line == NULL) return; @@ -752,7 +752,7 @@ static void auth_worker_input(struct auth_worker_client *client) client->version_received = TRUE; } if (!client->dbhash_received) { - line = i_stream_next_line(client->input); + line = i_stream_next_line(client->conn.input); if (line == NULL) return; @@ -768,7 +768,7 @@ static void auth_worker_input(struct auth_worker_client *client) } client->refcount++; - while ((line = i_stream_next_line(client->input)) != NULL) { + while ((line = i_stream_next_line(client->conn.input)) != NULL) { T_BEGIN { ret = auth_worker_handle_line(client, line); } T_END; @@ -786,15 +786,15 @@ static void auth_worker_input(struct auth_worker_client *client) static int auth_worker_output(struct auth_worker_client *client) { - if (o_stream_flush(client->output) < 0) { + if (o_stream_flush(client->conn.output) < 0) { auth_worker_client_destroy(&client); return 1; } - if (o_stream_get_buffer_used_size(client->output) <= - OUTBUF_THROTTLE_SIZE/3 && client->io == NULL) { + if (o_stream_get_buffer_used_size(client->conn.output) <= + OUTBUF_THROTTLE_SIZE/3 && client->conn.io == NULL) { /* allow input again */ - client->io = io_add(client->fd, IO_READ, + client->conn.io = io_add(client->conn.fd_in, IO_READ, auth_worker_input, client); } return 1; @@ -811,12 +811,13 @@ auth_worker_client_create(struct auth *auth, client->refcount = 1; client->auth = auth; - client->fd = fd; - client->input = i_stream_create_fd(fd, AUTH_WORKER_MAX_LINE_LENGTH); - client->output = o_stream_create_fd(fd, (size_t)-1); - o_stream_set_no_error_handling(client->output, TRUE); - o_stream_set_flush_callback(client->output, auth_worker_output, client); - client->io = io_add(fd, IO_READ, auth_worker_input, client); + client->conn.fd_in = fd; + client->conn.input = i_stream_create_fd(fd, AUTH_WORKER_MAX_LINE_LENGTH); + client->conn.output = o_stream_create_fd(fd, (size_t)-1); + o_stream_set_no_error_handling(client->conn.output, TRUE); + o_stream_set_flush_callback(client->conn.output, auth_worker_output, + client); + client->conn.io = io_add(fd, IO_READ, auth_worker_input, client); auth_worker_client_set_idle_timeout(client); auth_worker_refresh_proctitle(CLIENT_STATE_HANDSHAKE); @@ -831,17 +832,17 @@ static void auth_worker_client_destroy(struct auth_worker_client **_client) struct auth_worker_client *client = *_client; *_client = NULL; - if (client->fd == -1) + if (client->conn.fd_in == -1) return; - i_stream_close(client->input); - o_stream_close(client->output); + i_stream_close(client->conn.input); + o_stream_close(client->conn.output); timeout_remove(&client->to_idle); - io_remove(&client->io); + io_remove(&client->conn.io); - net_disconnect(client->fd); - client->fd = -1; + net_disconnect(client->conn.fd_in); + client->conn.fd_in = -1; auth_worker_client_unref(&client); auth_worker_client = NULL; @@ -858,8 +859,8 @@ static void auth_worker_client_unref(struct auth_worker_client **_client) if (--client->refcount > 0) return; - i_stream_unref(&client->input); - o_stream_unref(&client->output); + i_stream_unref(&client->conn.input); + o_stream_unref(&client->conn.output); i_free(client); } @@ -868,7 +869,7 @@ void auth_worker_client_send_error(void) auth_worker_client_error = TRUE; if (auth_worker_client != NULL && !auth_worker_client->error_sent) { - o_stream_nsend_str(auth_worker_client->output, "ERROR\n"); + o_stream_nsend_str(auth_worker_client->conn.output, "ERROR\n"); auth_worker_client->error_sent = TRUE; } auth_worker_refresh_proctitle(""); @@ -880,17 +881,19 @@ void auth_worker_client_send_success(void) if (auth_worker_client == NULL) return; if (auth_worker_client->error_sent) { - o_stream_nsend_str(auth_worker_client->output, "SUCCESS\n"); + o_stream_nsend_str(auth_worker_client->conn.output, + "SUCCESS\n"); auth_worker_client->error_sent = FALSE; } - if (auth_worker_client->io != NULL) + if (auth_worker_client->conn.io != NULL) auth_worker_refresh_proctitle(CLIENT_STATE_IDLE); } void auth_worker_client_send_shutdown(void) { if (auth_worker_client != NULL) - o_stream_nsend_str(auth_worker_client->output, "SHUTDOWN\n"); + o_stream_nsend_str(auth_worker_client->conn.output, + "SHUTDOWN\n"); auth_worker_refresh_proctitle(CLIENT_STATE_STOP); }