Skip to content

Commit

Permalink
lib-program-client: Add support for iostream-dot
Browse files Browse the repository at this point in the history
This is also made mandatory for TCP client.
  • Loading branch information
cmouse committed Oct 19, 2016
1 parent 541446a commit a371ea8
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 44 deletions.
4 changes: 3 additions & 1 deletion src/lib-program-client/Makefile.am
Expand Up @@ -3,7 +3,8 @@ noinst_LTLIBRARIES = libprogram_client.la
AM_CPPFLAGS = \
-I$(top_srcdir)/src/lib \
-I$(top_srcdir)/src/lib-test \
-I$(top_srcdir)/src/lib-dns
-I$(top_srcdir)/src/lib-dns \
-I$(top_srcdir)/src/lib-mail

libprogram_client_la_SOURCES = \
program-client.c \
Expand All @@ -30,6 +31,7 @@ test_libs = \
libprogram_client.la \
../lib-dns/libdns.la \
../lib-test/libtest.la \
../lib-mail/libmail.la \
../lib/liblib.la \
$(MODULE_LIBS)

Expand Down
5 changes: 4 additions & 1 deletion src/lib-program-client/program-client-private.h
Expand Up @@ -39,7 +39,8 @@ struct program_client {
struct timeval start_time;

struct istream *input, *program_input, *seekable_output;
struct ostream *output, *program_output;
struct istream *dot_input;
struct ostream *output, *program_output, *dot_output;
char *temp_prefix;

ARRAY(struct program_client_extra_fd) extra_fds;
Expand All @@ -60,6 +61,8 @@ struct program_client {
bool debug:1;
bool disconnected:1;
bool output_seekable:1;
bool input_dot_created:1;
bool output_dot_created:1;
};

void program_client_init(struct program_client *pclient, pool_t pool, const char *path,
Expand Down
6 changes: 5 additions & 1 deletion src/lib-program-client/program-client-remote.c
Expand Up @@ -493,6 +493,8 @@ int program_client_remote_close_output(struct program_client *pclient)
int fd_out = pclient->fd_out, fd_in = pclient->fd_in;

pclient->fd_out = -1;
if (fd_out >= 0 && pclient->set.use_dotstream)
return 1;

/* Shutdown output; program stdin will get EOF */
if (fd_out >= 0) {
Expand Down Expand Up @@ -589,7 +591,7 @@ program_client_net_create(const char *host, in_port_t port,
pclient->client.close_output = program_client_remote_close_output;
pclient->client.disconnect = program_client_remote_disconnect;
pclient->noreply = noreply;

pclient->client.set.use_dotstream = TRUE;
return &pclient->client;
}

Expand All @@ -614,8 +616,10 @@ program_client_net_create_ips(const struct ip_addr *ips, size_t ips_count,
pclient->client.disconnect = program_client_remote_disconnect;
pclient->client.switch_ioloop = program_client_remote_switch_ioloop;
pclient->noreply = noreply;
pclient->client.set.use_dotstream = TRUE;
pclient->ips = p_memdup(pool, ips,
sizeof(struct ip_addr)*ips_count);
pclient->ips_count = ips_count;
return &pclient->client;
}

74 changes: 65 additions & 9 deletions src/lib-program-client/program-client.c
Expand Up @@ -8,6 +8,8 @@
#include "safe-mkstemp.h"
#include "istream-private.h"
#include "istream-seekable.h"
#include "ostream-dot.h"
#include "istream-dot.h"
#include "ostream.h"
#include "lib-signals.h"

Expand Down Expand Up @@ -123,7 +125,8 @@ void program_client_disconnect_extra_fds(struct program_client *pclient)
void program_client_disconnected(struct program_client *pclient)
{
if (pclient->program_input != NULL) {
if (pclient->output_seekable)
if (pclient->output_seekable ||
pclient->set.use_dotstream)
i_stream_unref(&pclient->program_input);
else
i_stream_destroy(&pclient->program_input);
Expand Down Expand Up @@ -230,11 +233,33 @@ int program_client_program_output(struct program_client *pclient)
return ret;
}

if (!pclient->output_dot_created &&
pclient->set.use_dotstream &&
output != NULL) {
pclient->dot_output = o_stream_create_dot(output, FALSE);
pclient->output_dot_created = TRUE;
}
if (pclient->output_dot_created &&
pclient->dot_output != NULL)
output = pclient->dot_output;

if (input != NULL && output != NULL) {
res = o_stream_send_istream(output, input);

switch (res) {
case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
if (pclient->output_dot_created) {
if ((ret = o_stream_flush(pclient->dot_output)) <= 0) {
if (ret < 0) {
i_error("write(%s) failed: %s",
o_stream_get_name(pclient->dot_output),
o_stream_get_error(pclient->dot_output));
program_client_fail(pclient,
PROGRAM_CLIENT_ERROR_IO);
}
}
o_stream_unref(&pclient->dot_output);
output = pclient->program_output;
}
i_stream_unref(&pclient->input);
input = NULL;
break;
Expand All @@ -255,6 +280,20 @@ int program_client_program_output(struct program_client *pclient)
program_client_fail(pclient, PROGRAM_CLIENT_ERROR_IO);
return -1;
}
} else if (input == NULL &&
output != NULL &&
pclient->output_dot_created) {
if ((ret = o_stream_flush(pclient->dot_output)) <= 0) {
if (ret < 0) {
i_error("write(%s) failed: %s",
o_stream_get_name(output),
o_stream_get_error(output));
program_client_fail(pclient,
PROGRAM_CLIENT_ERROR_IO);
}
return ret;
}
o_stream_unref(&pclient->dot_output);
}

if (input == NULL) {
Expand Down Expand Up @@ -291,11 +330,30 @@ void program_client_program_input(struct program_client *pclient)
}

if (input != NULL) {
if (!pclient->input_dot_created &&
pclient->set.use_dotstream) {
pclient->dot_input = i_stream_create_dot(input, FALSE);
pclient->input_dot_created = TRUE;
}
if (pclient->input_dot_created &&
pclient->dot_input != NULL)
input = pclient->dot_input;
else if (pclient->set.use_dotstream) {
/* just read it empty */
while(i_stream_read_more(input, &data,
&size) > 0)
i_stream_skip(input, size);
output = NULL;
}
if (output != NULL) {
res = o_stream_send_istream(output, input);

switch (res) {
case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
if (pclient->set.use_dotstream &&
pclient->dot_input != NULL) {
i_stream_unref(&pclient->dot_input);
input = pclient->program_input;
}
break;
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
Expand Down Expand Up @@ -487,13 +545,9 @@ void program_client_init_streams(struct program_client *pclient)
o_stream_set_name(pclient->program_output, "program stdin");
}
if (pclient->fd_in >= 0) {
struct istream *input;

input = i_stream_create_fd(pclient->fd_in, (size_t)-1);

pclient->program_input = input;
pclient->program_input =
i_stream_create_fd(pclient->fd_in, (size_t)-1);
i_stream_set_name(pclient->program_input, "program stdout");

pclient->io = io_add(pclient->fd_in, IO_READ,
program_client_program_input, pclient);
}
Expand Down Expand Up @@ -527,6 +581,8 @@ void program_client_destroy(struct program_client **_pclient)

if (pclient->input != NULL)
i_stream_unref(&pclient->input);
if (pclient->dot_input != NULL)
i_stream_unref(&pclient->dot_input);
if (pclient->program_input != NULL)
i_stream_unref(&pclient->program_input);
if (pclient->program_output != NULL)
Expand Down
2 changes: 2 additions & 0 deletions src/lib-program-client/program-client.h
Expand Up @@ -22,6 +22,8 @@ struct program_client_settings {
bool allow_root:1;
bool debug:1;
bool drop_stderr:1;
/* use o_stream_dot */
bool use_dotstream:1;
};

typedef void program_client_fd_callback_t(void *context, struct istream *input);
Expand Down
88 changes: 56 additions & 32 deletions src/lib-program-client/test-program-client-net.c
Expand Up @@ -9,20 +9,22 @@
#include "array.h"
#include "istream.h"
#include "ostream.h"
#include "istream-dot.h"
#include "ostream-dot.h"
#include "net.h"
#include "iostream-temp.h"
#include "program-client.h"

#include <unistd.h>

static const char *pclient_test_io_string = "Lorem ipsum dolor sit amet, consectetur adipiscing elit.\n"
"Praesent vehicula ac leo vel placerat. Nullam placerat \n"
"volutpat leo, sed ultricies felis pulvinar quis. Nam \n"
"tempus, augue ut tempor cursus, neque felis commodo lacus, \n"
"sit amet tincidunt arcu justo vel augue. Proin dapibus \n"
"vulputate maximus. Mauris congue lacus felis, sed varius \n"
"leo finibus sagittis. Cum sociis natoque penatibus et magnis \n"
"dis parturient montes, nascetur ridiculus mus. Aliquam \n"
static const char *pclient_test_io_string = "Lorem ipsum dolor sit amet, consectetur adipiscing elit.\r\n"
"Praesent vehicula ac leo vel placerat. Nullam placerat \r\n"
"volutpat leo, sed ultricies felis pulvinar quis. Nam \r\n"
"tempus, augue ut tempor cursus, neque felis commodo lacus, \r\n"
"sit amet tincidunt arcu justo vel augue. Proin dapibus \r\n"
"vulputate maximus. Mauris congue lacus felis, sed varius \r\n"
"leo finibus sagittis. Cum sociis natoque penatibus et magnis \r\n"
"dis parturient montes, nascetur ridiculus mus. Aliquam \r\n"
"laoreet arcu a hendrerit consequat. Duis vitae erat tellus.";

static
Expand Down Expand Up @@ -102,6 +104,7 @@ int test_program_input_handle(struct test_client *client, const char *line)
{
int cmp = -1;
const char *arg;
struct istream *is;

switch(client->state) {
case CLIENT_STATE_INIT:
Expand Down Expand Up @@ -132,16 +135,21 @@ int test_program_input_handle(struct test_client *client, const char *line)
break;
case CLIENT_STATE_BODY:
client->os_body =
iostream_temp_create_named(".dovecot.test.",
0, "test_program_input body");
iostream_temp_create_named(".dovecot.test.", 0,
"test_program_input body");
is = client->in;
client->in = i_stream_create_dot(is, FALSE);
i_stream_unref(&is);

switch(o_stream_send_istream(client->os_body, client->in)) {
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
return -1;
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
break;
case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
client->body = iostream_temp_finish(&client->os_body, -1);
client->body = iostream_temp_finish(&client->os_body,
-1);
return 1;
case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
i_panic("Cannot write to ostream-temp");
Expand All @@ -156,38 +164,56 @@ void test_program_run(struct test_client *client)
{
const char *arg;

timeout_remove(&test_globals.to);
struct ostream *os;

test_assert(array_count(&client->args) > 0);
arg = *array_idx(&client->args, 0);
if (strcmp(arg, "test_program_success")==0) {
/* return hello world */
o_stream_nsend_str(client->out, t_strdup_printf("%s %s\n+\n",
*array_idx(&client->args, 1),
*array_idx(&client->args, 2)));
} else if (strcmp(arg, "test_program_io")==0) {
o_stream_send_istream(client->out, client->body);
o_stream_nsend_str(client->out, "+\n");
} else if (strcmp(arg, "test_program_failure")==0) {
o_stream_nsend_str(client->out, "-\n");
timeout_remove(&test_globals.to);
test_assert(array_is_created(&client->args));
if (array_is_created(&client->args)) {
test_assert(array_count(&client->args) > 0);
if (array_count(&client->args) > 0) {
arg = *array_idx(&client->args, 0);
if (strcmp(arg, "test_program_success")==0) {
/* return hello world */
o_stream_nsend_str(client->out, t_strdup_printf(
"%s %s\r\n.\n+\n",
*array_idx(&client->args, 1),
*array_idx(&client->args, 2)));
} else if (strcmp(arg, "test_program_io")==0) {
os = o_stream_create_dot(client->out, FALSE);
o_stream_send_istream(os, client->body);
o_stream_flush(os);
o_stream_unref(&os);
o_stream_nsend_str(client->out, "+\n");
} else if (strcmp(arg, "test_program_failure")==0) {
o_stream_nsend_str(client->out, ".\n-\n");
}
} else
o_stream_nsend_str(client->out, ".\n-\n");
}
test_program_client_destroy(&client);
}

static
void test_program_input(struct test_client *client)

{
const char *line = "";
int ret = 0;

if (client->state == CLIENT_STATE_BODY) {
if (test_program_input_handle(client, NULL)==0 &&
!client->in->eof)
return;
} else {
line = i_stream_read_next_line(client->in);
while((line = i_stream_read_next_line(client->in)) != NULL) {
ret = test_program_input_handle(client, line);
if (client->state == CLIENT_STATE_BODY)
ret = test_program_input_handle(client, NULL);
if (ret != 0) break;
}

if ((line == NULL && !client->in->eof) ||
(line != NULL && test_program_input_handle(client, line) == 0))
(line != NULL &&
ret == 0))
return;
}

Expand All @@ -202,8 +228,6 @@ void test_program_input(struct test_client *client)
else
i_warning("Client sent invalid line: %s", line);
}

test_program_client_destroy(&client);
}

static
Expand Down Expand Up @@ -240,7 +264,7 @@ void test_program_setup(void) {
net_addr2ip("127.0.0.1", &ip);

test_globals.listen_fd = net_listen(&ip, &test_globals.port, 1);
;

if (test_globals.listen_fd < 0)
i_fatal("Cannot create TCP listener: %m");

Expand Down Expand Up @@ -278,7 +302,7 @@ void test_program_success(void) {
};

struct program_client *pc =
program_client_net_create("95.175.99.158", 33333, args,
program_client_net_create("127.0.0.1", test_globals.port, args,
&pc_set, FALSE);

buffer_t *output = buffer_create_dynamic(default_pool, 16);
Expand All @@ -291,7 +315,7 @@ void test_program_success(void) {
test_program_io_loop_run();

test_assert(ret == 1);
test_assert(strcmp(str_c(output), "hello world\n") == 0);
test_assert(strcmp(str_c(output), "hello world") == 0);

program_client_destroy(&pc);

Expand Down

0 comments on commit a371ea8

Please sign in to comment.