Skip to content

Commit

Permalink
auth: auth-worker-client - Use connection structure
Browse files Browse the repository at this point in the history
Simplifies following commits
  • Loading branch information
cmouse committed Nov 28, 2018
1 parent f02b659 commit 643ae56
Showing 1 changed file with 52 additions and 49 deletions.
101 changes: 52 additions & 49 deletions src/auth/auth-worker-client.c
Expand Up @@ -2,6 +2,7 @@

#include "auth-common.h"
#include "base64.h"
#include "connection.h"
#include "ioloop.h"
#include "net.h"
#include "istream.h"
Expand All @@ -24,13 +25,10 @@
#define CLIENT_STATE_STOP "waiting for shutdown"

struct auth_worker_client {
struct connection conn;
int refcount;

struct auth *auth;
int fd;
struct io *io;
struct istream *input;
struct ostream *output;
struct timeout *to_idle;
time_t cmd_start;

Expand Down Expand Up @@ -69,11 +67,11 @@ void auth_worker_refresh_proctitle(const char *state)
static void
auth_worker_client_check_throttle(struct auth_worker_client *client)
{
if (o_stream_get_buffer_used_size(client->output) >=
if (o_stream_get_buffer_used_size(client->conn.output) >=
OUTBUF_THROTTLE_SIZE) {
/* stop reading new requests until client has read the pending
replies. */
io_remove(&client->io);
io_remove(&client->conn.io);
}
}

Expand Down Expand Up @@ -123,9 +121,9 @@ static void auth_worker_send_reply(struct auth_worker_client *client,
const char *p;

if (worker_restart_request)
o_stream_nsend_str(client->output, "RESTART\n");
o_stream_nsend(client->output, str_data(str), str_len(str));
if (o_stream_flush(client->output) < 0 && request != NULL &&
o_stream_nsend_str(client->conn.output, "RESTART\n");
o_stream_nsend(client->conn.output, str_data(str), str_len(str));
if (o_stream_flush(client->conn.output) < 0 && request != NULL &&
cmd_duration > AUTH_WORKER_WARN_DISCONNECTED_LONG_CMD_SECS) {
p = i_strchr_to_next(str_c(str), '\t');
p = p == NULL ? "BUG" : t_strcut(p, '\t');
Expand Down Expand Up @@ -535,7 +533,7 @@ static void list_iter_deinit(struct auth_worker_list_context *ctx)
struct auth_worker_client *client = ctx->client;
string_t *str;

i_assert(client->io == NULL);
i_assert(client->conn.io == NULL);

str = t_str_new(32);
if (ctx->auth_request->userdb->userdb->iface->
Expand All @@ -545,10 +543,12 @@ static void list_iter_deinit(struct auth_worker_list_context *ctx)
str_printfa(str, "%u\tOK\n", ctx->auth_request->id);
auth_worker_send_reply(client, NULL, str);

client->io = io_add(client->fd, IO_READ, auth_worker_input, client);
client->conn.io = io_add(client->conn.fd_in, IO_READ,
auth_worker_input, client);
auth_worker_client_set_idle_timeout(client);

o_stream_set_flush_callback(client->output, auth_worker_output, client);
o_stream_set_flush_callback(client->conn.output, auth_worker_output,
client);
auth_request_unref(&ctx->auth_request);
auth_worker_client_unref(&client);
i_free(ctx);
Expand All @@ -570,11 +570,11 @@ static void list_iter_callback(const char *user, void *context)
}

if (!ctx->sending)
o_stream_cork(ctx->client->output);
o_stream_cork(ctx->client->conn.output);
T_BEGIN {
str = t_str_new(128);
str_printfa(str, "%u\t*\t%s\n", ctx->auth_request->id, user);
o_stream_nsend(ctx->client->output, str_data(str), str_len(str));
o_stream_nsend(ctx->client->conn.output, str_data(str), str_len(str));
} T_END;

if (ctx->sending) {
Expand All @@ -590,27 +590,27 @@ static void list_iter_callback(const char *user, void *context)
ctx->auth_request->userdb->userdb->iface->
iterate_next(ctx->iter);
} T_END;
if (o_stream_get_buffer_used_size(ctx->client->output) > OUTBUF_THROTTLE_SIZE) {
if (o_stream_flush(ctx->client->output) < 0) {
if (o_stream_get_buffer_used_size(ctx->client->conn.output) > OUTBUF_THROTTLE_SIZE) {
if (o_stream_flush(ctx->client->conn.output) < 0) {
ctx->done = TRUE;
break;
}
}
} while (ctx->sent &&
o_stream_get_buffer_used_size(ctx->client->output) <= OUTBUF_THROTTLE_SIZE);
o_stream_uncork(ctx->client->output);
o_stream_get_buffer_used_size(ctx->client->conn.output) <= OUTBUF_THROTTLE_SIZE);
o_stream_uncork(ctx->client->conn.output);
ctx->sending = FALSE;
if (ctx->done)
list_iter_deinit(ctx);
else
o_stream_set_flush_pending(ctx->client->output, TRUE);
o_stream_set_flush_pending(ctx->client->conn.output, TRUE);
}

static int auth_worker_list_output(struct auth_worker_list_context *ctx)
{
int ret;

if ((ret = o_stream_flush(ctx->client->output)) < 0) {
if ((ret = o_stream_flush(ctx->client->conn.output)) < 0) {
list_iter_deinit(ctx);
return 1;
}
Expand Down Expand Up @@ -649,10 +649,10 @@ auth_worker_handle_list(struct auth_worker_client *client,
}
ctx->auth_request->userdb = userdb;

io_remove(&ctx->client->io);
io_remove(&ctx->client->conn.io);
timeout_remove(&ctx->client->to_idle);

o_stream_set_flush_callback(ctx->client->output,
o_stream_set_flush_callback(ctx->client->conn.output,
auth_worker_list_output, ctx);
ctx->iter = ctx->auth_request->userdb->userdb->iface->
iterate_init(ctx->auth_request, list_iter_callback, ctx);
Expand Down Expand Up @@ -694,7 +694,7 @@ auth_worker_handle_line(struct auth_worker_client *client, const char *line)
i_error("BUG: Auth-worker received unknown command: %s",
args[1]);
}
if (client->io != NULL)
if (client->conn.io != NULL)
auth_worker_refresh_proctitle(CLIENT_STATE_IDLE);
return ret;
}
Expand Down Expand Up @@ -722,7 +722,7 @@ static void auth_worker_input(struct auth_worker_client *client)
char *line;
bool ret;

switch (i_stream_read(client->input)) {
switch (i_stream_read(client->conn.input)) {
case 0:
return;
case -1:
Expand All @@ -738,7 +738,7 @@ static void auth_worker_input(struct auth_worker_client *client)
}

if (!client->version_received) {
line = i_stream_next_line(client->input);
line = i_stream_next_line(client->conn.input);
if (line == NULL)
return;

Expand All @@ -752,7 +752,7 @@ static void auth_worker_input(struct auth_worker_client *client)
client->version_received = TRUE;
}
if (!client->dbhash_received) {
line = i_stream_next_line(client->input);
line = i_stream_next_line(client->conn.input);
if (line == NULL)
return;

Expand All @@ -768,7 +768,7 @@ static void auth_worker_input(struct auth_worker_client *client)
}

client->refcount++;
while ((line = i_stream_next_line(client->input)) != NULL) {
while ((line = i_stream_next_line(client->conn.input)) != NULL) {
T_BEGIN {
ret = auth_worker_handle_line(client, line);
} T_END;
Expand All @@ -786,15 +786,15 @@ static void auth_worker_input(struct auth_worker_client *client)

static int auth_worker_output(struct auth_worker_client *client)
{
if (o_stream_flush(client->output) < 0) {
if (o_stream_flush(client->conn.output) < 0) {
auth_worker_client_destroy(&client);
return 1;
}

if (o_stream_get_buffer_used_size(client->output) <=
OUTBUF_THROTTLE_SIZE/3 && client->io == NULL) {
if (o_stream_get_buffer_used_size(client->conn.output) <=
OUTBUF_THROTTLE_SIZE/3 && client->conn.io == NULL) {
/* allow input again */
client->io = io_add(client->fd, IO_READ,
client->conn.io = io_add(client->conn.fd_in, IO_READ,
auth_worker_input, client);
}
return 1;
Expand All @@ -811,12 +811,13 @@ auth_worker_client_create(struct auth *auth,
client->refcount = 1;

client->auth = auth;
client->fd = fd;
client->input = i_stream_create_fd(fd, AUTH_WORKER_MAX_LINE_LENGTH);
client->output = o_stream_create_fd(fd, (size_t)-1);
o_stream_set_no_error_handling(client->output, TRUE);
o_stream_set_flush_callback(client->output, auth_worker_output, client);
client->io = io_add(fd, IO_READ, auth_worker_input, client);
client->conn.fd_in = fd;
client->conn.input = i_stream_create_fd(fd, AUTH_WORKER_MAX_LINE_LENGTH);
client->conn.output = o_stream_create_fd(fd, (size_t)-1);
o_stream_set_no_error_handling(client->conn.output, TRUE);
o_stream_set_flush_callback(client->conn.output, auth_worker_output,
client);
client->conn.io = io_add(fd, IO_READ, auth_worker_input, client);
auth_worker_client_set_idle_timeout(client);
auth_worker_refresh_proctitle(CLIENT_STATE_HANDSHAKE);

Expand All @@ -831,17 +832,17 @@ static void auth_worker_client_destroy(struct auth_worker_client **_client)
struct auth_worker_client *client = *_client;

*_client = NULL;
if (client->fd == -1)
if (client->conn.fd_in == -1)
return;

i_stream_close(client->input);
o_stream_close(client->output);
i_stream_close(client->conn.input);
o_stream_close(client->conn.output);

timeout_remove(&client->to_idle);
io_remove(&client->io);
io_remove(&client->conn.io);

net_disconnect(client->fd);
client->fd = -1;
net_disconnect(client->conn.fd_in);
client->conn.fd_in = -1;
auth_worker_client_unref(&client);

auth_worker_client = NULL;
Expand All @@ -858,8 +859,8 @@ static void auth_worker_client_unref(struct auth_worker_client **_client)
if (--client->refcount > 0)
return;

i_stream_unref(&client->input);
o_stream_unref(&client->output);
i_stream_unref(&client->conn.input);
o_stream_unref(&client->conn.output);
i_free(client);
}

Expand All @@ -868,7 +869,7 @@ void auth_worker_client_send_error(void)
auth_worker_client_error = TRUE;
if (auth_worker_client != NULL &&
!auth_worker_client->error_sent) {
o_stream_nsend_str(auth_worker_client->output, "ERROR\n");
o_stream_nsend_str(auth_worker_client->conn.output, "ERROR\n");
auth_worker_client->error_sent = TRUE;
}
auth_worker_refresh_proctitle("");
Expand All @@ -880,17 +881,19 @@ void auth_worker_client_send_success(void)
if (auth_worker_client == NULL)
return;
if (auth_worker_client->error_sent) {
o_stream_nsend_str(auth_worker_client->output, "SUCCESS\n");
o_stream_nsend_str(auth_worker_client->conn.output,
"SUCCESS\n");
auth_worker_client->error_sent = FALSE;
}
if (auth_worker_client->io != NULL)
if (auth_worker_client->conn.io != NULL)
auth_worker_refresh_proctitle(CLIENT_STATE_IDLE);
}

void auth_worker_client_send_shutdown(void)
{
if (auth_worker_client != NULL)
o_stream_nsend_str(auth_worker_client->output, "SHUTDOWN\n");
o_stream_nsend_str(auth_worker_client->conn.output,
"SHUTDOWN\n");
auth_worker_refresh_proctitle(CLIENT_STATE_STOP);
}

Expand Down

0 comments on commit 643ae56

Please sign in to comment.