Navigation Menu

Skip to content

Commit

Permalink
Support streaming dump
Browse files Browse the repository at this point in the history
Currently, "groonga --protocol http" only supports it.
  • Loading branch information
kou committed Jan 6, 2015
1 parent 4978fff commit 95d2b3f
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 77 deletions.
1 change: 1 addition & 0 deletions include/groonga/groonga.h
Expand Up @@ -1302,6 +1302,7 @@ GRN_API void grn_output_envelope(grn_ctx *ctx, grn_rc rc,
grn_obj *head, grn_obj *body, grn_obj *foot,
const char *file, int line);

GRN_API void grn_ctx_output_flush(grn_ctx *ctx, int flags);
GRN_API void grn_ctx_output_array_open(grn_ctx *ctx,
const char *name, int nelements);
GRN_API void grn_ctx_output_array_close(grn_ctx *ctx);
Expand Down
12 changes: 12 additions & 0 deletions lib/ctx.c
Expand Up @@ -3002,6 +3002,18 @@ grn_set_term_handler(void)
return rc;
}

void
grn_ctx_output_flush(grn_ctx *ctx, int flags)
{
if (flags & GRN_CTX_QUIET) {
return;
}
if (!ctx->impl->output) {
return;
}
ctx->impl->output(ctx, 0, ctx->impl->data.ptr);
}

void
grn_ctx_output_array_open(grn_ctx *ctx, const char *name, int nelements)
{
Expand Down
6 changes: 6 additions & 0 deletions lib/proc.c
Expand Up @@ -2629,6 +2629,8 @@ exit :
return NULL;
}

static const size_t DUMP_FLUSH_THRESHOLD_SIZE = 256 * 1024;

static void
dump_name(grn_ctx *ctx, grn_obj *outbuf, const char *name, int name_len)
{
Expand Down Expand Up @@ -2966,6 +2968,9 @@ dump_records(grn_ctx *ctx, grn_obj *outbuf, grn_obj *table)
}
}
GRN_TEXT_PUTC(ctx, outbuf, ']');
if (GRN_TEXT_LEN(outbuf) >= DUMP_FLUSH_THRESHOLD_SIZE) {
grn_ctx_output_flush(ctx, 0);
}
}
GRN_TEXT_PUTS(ctx, outbuf, "\n]\n");
GRN_TEXT_PUT(ctx, outbuf, GRN_TEXT_VALUE(&delete_commands),
Expand Down Expand Up @@ -3217,6 +3222,7 @@ proc_dump(grn_ctx *ctx, int nargs, grn_obj **args, grn_user_data *user_data)
ctx->impl->output_type = GRN_CONTENT_NONE;
ctx->impl->mime_type = "text/x-groonga-command-list";
dump_schema(ctx, outbuf);
grn_ctx_output_flush(ctx, 0);
/* To update index columns correctly, we first create the whole schema, then
load non-derivative records, while skipping records of index columns. That
way, groonga will silently do the job of updating index columns for us. */
Expand Down
294 changes: 217 additions & 77 deletions src/groonga.c
Expand Up @@ -649,108 +649,244 @@ start_service(grn_ctx *ctx, const char *db_path,

typedef struct {
grn_msg *msg;
grn_bool in_body;
grn_bool is_chunked;
} ht_context;

static void
h_output(grn_ctx *ctx, int flags, void *arg)
h_output_set_header(grn_ctx *ctx, grn_obj *header,
grn_rc rc, long long int content_length)
{
grn_rc expr_rc = ctx->rc;
ht_context *hc = (ht_context *)arg;
grn_sock fd = hc->msg->u.fd;
grn_obj header, head, foot, *outbuf = ctx->impl->outbuf;
grn_bool should_return_body = (hc->msg->header.qtype == 'G');
if (!(flags & GRN_CTX_TAIL)) { return; }
GRN_TEXT_INIT(&header, 0);
GRN_TEXT_INIT(&head, 0);
GRN_TEXT_INIT(&foot, 0);
output_envelope(ctx, expr_rc, &head, outbuf, &foot);
switch (expr_rc) {
switch (rc) {
case GRN_SUCCESS :
GRN_TEXT_SETS(ctx, &header, "HTTP/1.1 200 OK\r\n");
GRN_TEXT_SETS(ctx, header, "HTTP/1.1 200 OK\r\n");
break;
case GRN_INVALID_ARGUMENT :
case GRN_SYNTAX_ERROR :
GRN_TEXT_SETS(ctx, &header, "HTTP/1.1 400 Bad Request\r\n");
GRN_TEXT_SETS(ctx, header, "HTTP/1.1 400 Bad Request\r\n");
break;
case GRN_NO_SUCH_FILE_OR_DIRECTORY :
GRN_TEXT_SETS(ctx, &header, "HTTP/1.1 404 Not Found\r\n");
GRN_TEXT_SETS(ctx, header, "HTTP/1.1 404 Not Found\r\n");
break;
default :
GRN_TEXT_SETS(ctx, &header, "HTTP/1.1 500 Internal Server Error\r\n");
GRN_TEXT_SETS(ctx, header, "HTTP/1.1 500 Internal Server Error\r\n");
break;
}
GRN_TEXT_PUTS(ctx, &header, "Connection: close\r\n");
GRN_TEXT_PUTS(ctx, &header, "Content-Type: ");
GRN_TEXT_PUTS(ctx, &header, grn_ctx_get_mime_type(ctx));
GRN_TEXT_PUTS(ctx, &header, "\r\nContent-Length: ");
grn_text_lltoa(ctx, &header,
GRN_TEXT_LEN(&head) + GRN_TEXT_LEN(outbuf) + GRN_TEXT_LEN(&foot));
GRN_TEXT_PUTS(ctx, &header, "\r\n\r\n");
{
ssize_t ret, len;
GRN_TEXT_PUTS(ctx, header, "Content-Type: ");
GRN_TEXT_PUTS(ctx, header, grn_ctx_get_mime_type(ctx));
GRN_TEXT_PUTS(ctx, header, "\r\n");
if (content_length >= 0) {
GRN_TEXT_PUTS(ctx, header, "Connection: close\r\n");
GRN_TEXT_PUTS(ctx, header, "Content-Length: ");
grn_text_lltoa(ctx, header, content_length);
GRN_TEXT_PUTS(ctx, header, "\r\n");
} else {
GRN_TEXT_PUTS(ctx, header, "Transfer-Encoding: chunked\r\n");
}
GRN_TEXT_PUTS(ctx, header, "\r\n");
}

static void
h_output_send(grn_ctx *ctx, grn_sock fd,
grn_obj *header, grn_obj *head, grn_obj *body, grn_obj *foot)
{
ssize_t ret;
ssize_t len = 0;
#ifdef WIN32
int n_buffers;
WSABUF wsabufs[4];
wsabufs[0].buf = GRN_TEXT_VALUE(&header);
wsabufs[0].len = GRN_TEXT_LEN(&header);
n_buffers = 1;
len = GRN_TEXT_LEN(&header);
if (should_return_body) {
wsabufs[1].buf = GRN_TEXT_VALUE(&head);
wsabufs[1].len = GRN_TEXT_LEN(&head);
wsabufs[2].buf = GRN_TEXT_VALUE(outbuf);
wsabufs[2].len = GRN_TEXT_LEN(outbuf);
wsabufs[3].buf = GRN_TEXT_VALUE(&foot);
wsabufs[3].len = GRN_TEXT_LEN(&foot);
n_buffers += 3;
len += GRN_TEXT_LEN(&head) + GRN_TEXT_LEN(outbuf) + GRN_TEXT_LEN(&foot);
}
{
DWORD sent;
if (WSASend(fd, wsabufs, n_buffers, &sent, 0, NULL, NULL) == SOCKET_ERROR) {
SERR("WSASend");
}
ret = sent;
int n_buffers = 0;
WSABUF wsabufs[4];
if (header) {
wsabufs[n_buffers].buf = GRN_TEXT_VALUE(header);
wsabufs[n_buffers].len = GRN_TEXT_LEN(header);
len += GRN_TEXT_LEN(header);
n_buffers++;
}
if (head) {
wsabufs[n_buffers].buf = GRN_TEXT_VALUE(head);
wsabufs[n_buffers].len = GRN_TEXT_LEN(head);
len += GRN_TEXT_LEN(head);
n_buffers++;
}
if (body) {
wsabufs[n_buffers].buf = GRN_TEXT_VALUE(body);
wsabufs[n_buffers].len = GRN_TEXT_LEN(body);
len += GRN_TEXT_LEN(body);
n_buffers++;
}
if (foot) {
wsabufs[n_buffers].buf = GRN_TEXT_VALUE(foot);
wsabufs[n_buffers].len = GRN_TEXT_LEN(foot);
len += GRN_TEXT_LEN(foot);
n_buffers++;
}
{
DWORD sent;
if (WSASend(fd, wsabufs, n_buffers, &sent, 0, NULL, NULL) == SOCKET_ERROR) {
SERR("WSASend");
}
ret = sent;
}
#else /* WIN32 */
struct iovec msg_iov[4];
struct msghdr msg;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = msg_iov;
msg.msg_iovlen = 1;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
msg_iov[0].iov_base = GRN_TEXT_VALUE(&header);
msg_iov[0].iov_len = GRN_TEXT_LEN(&header);
len = GRN_TEXT_LEN(&header);
if (should_return_body) {
msg_iov[1].iov_base = GRN_TEXT_VALUE(&head);
msg_iov[1].iov_len = GRN_TEXT_LEN(&head);
msg_iov[2].iov_base = GRN_TEXT_VALUE(outbuf);
msg_iov[2].iov_len = GRN_TEXT_LEN(outbuf);
msg_iov[3].iov_base = GRN_TEXT_VALUE(&foot);
msg_iov[3].iov_len = GRN_TEXT_LEN(&foot);
msg.msg_iovlen += 3;
len += GRN_TEXT_LEN(&head) + GRN_TEXT_LEN(outbuf) + GRN_TEXT_LEN(&foot);
}
if ((ret = sendmsg(fd, &msg, MSG_NOSIGNAL)) == -1) {
SERR("sendmsg");
}
struct iovec msg_iov[4];
struct msghdr msg;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = msg_iov;
msg.msg_iovlen = 0;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;

if (header) {
msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(header);
msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(header);
len += GRN_TEXT_LEN(header);
msg.msg_iovlen++;
}
if (head) {
msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(head);
msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(head);
len += GRN_TEXT_LEN(head);
msg.msg_iovlen++;
}
if (body) {
msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(body);
msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(body);
len += GRN_TEXT_LEN(body);
msg.msg_iovlen++;
}
if (foot) {
msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(foot);
msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(foot);
len += GRN_TEXT_LEN(foot);
msg.msg_iovlen++;
}
if ((ret = sendmsg(fd, &msg, MSG_NOSIGNAL)) == -1) {
SERR("sendmsg");
}
#endif /* WIN32 */
if (ret != len) {
GRN_LOG(&grn_gctx, GRN_LOG_NOTICE,
"couldn't send all data (%" GRN_FMT_LLD "/%" GRN_FMT_LLD ")",
(long long int)ret, (long long int)len);
if (ret != len) {
GRN_LOG(&grn_gctx, GRN_LOG_NOTICE,
"couldn't send all data (%" GRN_FMT_LLD "/%" GRN_FMT_LLD ")",
(long long int)ret, (long long int)len);
}
}

static void
h_output_raw(grn_ctx *ctx, int flags, ht_context *hc)
{
grn_rc expr_rc = ctx->rc;
grn_sock fd = hc->msg->u.fd;
grn_obj header_;
grn_obj head_;
grn_obj body_;
grn_obj foot_;
grn_obj *header = NULL;
grn_obj *head = NULL;
grn_obj *body = NULL;
grn_obj *foot = NULL;
char *chunk = NULL;
unsigned int chunk_size = 0;
int recv_flags;
grn_bool is_last_message = (flags & GRN_CTX_TAIL);

GRN_TEXT_INIT(&header_, 0);
GRN_TEXT_INIT(&head_, 0);
GRN_TEXT_INIT(&body_, GRN_OBJ_DO_SHALLOW_COPY);
GRN_TEXT_INIT(&foot_, 0);

grn_ctx_recv(ctx, &chunk, &chunk_size, &recv_flags);
GRN_TEXT_SET(ctx, &body_, chunk, chunk_size);

if (!hc->in_body) {
if (is_last_message) {
h_output_set_header(ctx, &header_, expr_rc, GRN_TEXT_LEN(&body_));
hc->is_chunked = GRN_FALSE;
} else {
h_output_set_header(ctx, &header_, expr_rc, -1);
hc->is_chunked = GRN_TRUE;
}
header = &header_;
hc->in_body = GRN_TRUE;
}

if (GRN_TEXT_LEN(&body_) > 0) {
if (hc->is_chunked) {
grn_text_printf(ctx, &head_,
"%x\r\n", (unsigned int)GRN_TEXT_LEN(&body_));
head = &head_;
GRN_TEXT_PUTS(ctx, &foot_, "\r\n");
foot = &foot_;
}
body = &body_;
}

if (is_last_message) {
if (hc->is_chunked) {
GRN_TEXT_PUTS(ctx, &foot_, "0\r\n");
GRN_TEXT_PUTS(ctx, &foot_, "Connection: close\r\n");
GRN_TEXT_PUTS(ctx, &foot_, "\r\n");
foot = &foot_;
}
}
GRN_BULK_REWIND(outbuf);

h_output_send(ctx, fd, header, head, body, foot);

GRN_OBJ_FIN(ctx, &foot_);
GRN_OBJ_FIN(ctx, &body_);
GRN_OBJ_FIN(ctx, &head_);
GRN_OBJ_FIN(ctx, &header_);
}

static void
h_output_typed(grn_ctx *ctx, int flags, ht_context *hc)
{
grn_rc expr_rc = ctx->rc;
grn_sock fd = hc->msg->u.fd;
grn_obj header, head, body, foot;
char *chunk = NULL;
unsigned int chunk_size = 0;
int recv_flags;
grn_bool should_return_body = (hc->msg->header.qtype == 'G');

if (!(flags & GRN_CTX_TAIL)) { return; }

GRN_TEXT_INIT(&header, 0);
GRN_TEXT_INIT(&head, 0);
GRN_TEXT_INIT(&body, GRN_OBJ_DO_SHALLOW_COPY);
GRN_TEXT_INIT(&foot, 0);

grn_ctx_recv(ctx, &chunk, &chunk_size, &recv_flags);
GRN_TEXT_SET(ctx, &body, chunk, chunk_size);

output_envelope(ctx, expr_rc, &head, &body, &foot);
h_output_set_header(ctx, &header, expr_rc,
GRN_TEXT_LEN(&head) +
GRN_TEXT_LEN(&body) +
GRN_TEXT_LEN(&foot));
if (should_return_body) {
h_output_send(ctx, fd, &header, &head, &body, &foot);
} else {
h_output_send(ctx, fd, &header, NULL, NULL, NULL);
}
GRN_OBJ_FIN(ctx, &foot);
GRN_OBJ_FIN(ctx, &body);
GRN_OBJ_FIN(ctx, &head);
GRN_OBJ_FIN(ctx, &header);
}

static void
h_output(grn_ctx *ctx, int flags, void *arg)
{
ht_context *hc = (ht_context *)arg;

if (grn_ctx_get_output_type(ctx) == GRN_CONTENT_NONE) {
h_output_raw(ctx, flags, hc);
} else {
h_output_typed(ctx, flags, hc);
}
}

static void
do_htreq_get(grn_ctx *ctx, grn_msg *msg)
{
Expand Down Expand Up @@ -982,6 +1118,8 @@ do_htreq_post(grn_ctx *ctx, grn_msg *msg)
if (ctx->rc != GRN_SUCCESS) {
ht_context context;
context.msg = msg;
context.in_body = GRN_FALSE;
context.is_chunked = GRN_FALSE;
h_output(ctx, GRN_CTX_TAIL, &context);
return;
}
Expand Down Expand Up @@ -1698,6 +1836,8 @@ h_worker(void *arg)
nfthreads--;
MUTEX_UNLOCK(q_mutex);
hc.msg = (grn_msg *)msg;
hc.in_body = GRN_FALSE;
hc.is_chunked = GRN_FALSE;
do_htreq(ctx, (grn_msg *)msg);
MUTEX_LOCK(q_mutex);
} while (nfthreads < max_nfthreads && grn_gctx.stat != GRN_CTX_QUIT);
Expand Down

0 comments on commit 95d2b3f

Please sign in to comment.