Navigation Menu

Skip to content

Commit

Permalink
groonga-httpd: support streaming dump
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Jan 7, 2015
1 parent d216dca commit 99dc6bf
Showing 1 changed file with 142 additions and 39 deletions.
181 changes: 142 additions & 39 deletions src/httpd/nginx-module/ngx_http_groonga_module.c
@@ -1,6 +1,6 @@
/* -*- c-basic-offset: 2 -*- */
/*
Copyright(C) 2012-2014 Brazil
Copyright(C) 2012-2015 Brazil
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -63,9 +63,17 @@ typedef struct {
typedef struct {
grn_bool initialized;
grn_ctx context;
grn_obj head;
grn_obj body;
grn_obj foot;
struct {
grn_bool processed;
grn_bool header_sent;
ngx_http_request_t *r;
ngx_int_t rc;
} raw;
struct {
grn_obj head;
grn_obj body;
grn_obj foot;
} typed;
} ngx_http_groonga_handler_data_t;

typedef struct {
Expand Down Expand Up @@ -358,20 +366,97 @@ ngx_http_groonga_handler_cleanup(void *user_data)
}

context = &(data->context);
GRN_OBJ_FIN(context, &(data->head));
GRN_OBJ_FIN(context, &(data->body));
GRN_OBJ_FIN(context, &(data->foot));
GRN_OBJ_FIN(context, &(data->typed.head));
GRN_OBJ_FIN(context, &(data->typed.body));
GRN_OBJ_FIN(context, &(data->typed.foot));
grn_logger_set(context, NULL);
grn_query_logger_set(context, NULL);
grn_ctx_fin(context);
}

static void
ngx_http_groonga_context_receive_handler(grn_ctx *context,
int flags,
void *callback_data)
ngx_http_groonga_handler_set_content_type(ngx_http_request_t *r,
const char *content_type)
{
r->headers_out.content_type.len = strlen(content_type);
r->headers_out.content_type.data = (u_char *)content_type;
r->headers_out.content_type_len = r->headers_out.content_type.len;
}

static void
ngx_http_groonga_context_receive_handler_raw(grn_ctx *context,
int flags,
ngx_http_groonga_handler_data_t *data)
{
char *chunk = NULL;
unsigned int chunk_size = 0;
int recv_flags;
ngx_http_request_t *r;
ngx_log_t *log;
grn_bool is_last_chunk;

grn_ctx_recv(context, &chunk, &chunk_size, &recv_flags);
data->raw.processed = GRN_TRUE;

if (data->raw.rc != NGX_OK) {
return;
}

r = data->raw.r;
log = r->connection->log;
is_last_chunk = (flags & GRN_CTX_TAIL);

if (!data->raw.header_sent) {
ngx_http_groonga_handler_set_content_type(r, grn_ctx_get_mime_type(context));
r->headers_out.status = NGX_HTTP_OK;
if (is_last_chunk) {
r->headers_out.content_length_n = chunk_size;
} else {
r->headers_out.content_length_n = -1;
}
if (chunk_size == 0) {
r->header_only = 1;
}
data->raw.rc = ngx_http_send_header(r);
data->raw.header_sent = GRN_TRUE;

if (data->raw.rc != NGX_OK) {
return;
}
}

if (chunk_size > 0) {
ngx_chain_t chain;
ngx_buf_t *buffer;

buffer = ngx_palloc(r->pool, sizeof(ngx_buf_t));
if (!buffer) {
ngx_log_error(NGX_LOG_ERR, log, 0,
"http_groonga: failed to allocate memory for chunked body");
data->raw.rc = NGX_ERROR;
return;
}
buffer->pos = (u_char *)chunk;
buffer->last = (u_char *)chunk + chunk_size;
buffer->memory = 1;
buffer->in_file = 0;
if (is_last_chunk) {
buffer->last_buf = 1;
} else {
buffer->last_buf = 0;
}
chain.buf = buffer;
chain.next = NULL;

data->raw.rc = ngx_http_output_filter(r, &chain);
}
}

static void
ngx_http_groonga_context_receive_handler_typed(grn_ctx *context,
int flags,
ngx_http_groonga_handler_data_t *data)
{
ngx_http_groonga_handler_data_t *data = callback_data;
char *result = NULL;
unsigned int result_size = 0;
int recv_flags;
Expand Down Expand Up @@ -402,29 +487,43 @@ ngx_http_groonga_context_receive_handler(grn_ctx *context,
context->stat |= GRN_CTX_QUIT;
} else {
context->rc = GRN_OPERATION_NOT_PERMITTED;
GRN_TEXT_PUTS(context, &(data->body), "false");
GRN_TEXT_PUTS(context, &(data->typed.body), "false");
context->stat &= ~GRN_CTX_QUIT;
}
}
#endif

if (result_size > 0 ||
GRN_TEXT_LEN(&(data->body)) > 0 ||
GRN_TEXT_LEN(&(data->typed.body)) > 0 ||
context->rc != GRN_SUCCESS) {
if (result_size > 0) {
GRN_TEXT_PUT(context, &(data->body), result, result_size);
GRN_TEXT_PUT(context, &(data->typed.body), result, result_size);
}

grn_output_envelope(context,
context->rc,
&(data->head),
&(data->body),
&(data->foot),
&(data->typed.head),
&(data->typed.body),
&(data->typed.foot),
NULL,
0);
}
}

static void
ngx_http_groonga_context_receive_handler(grn_ctx *context,
int flags,
void *callback_data)
{
ngx_http_groonga_handler_data_t *data = callback_data;

if (grn_ctx_get_output_type(context) == GRN_CONTENT_NONE) {
ngx_http_groonga_context_receive_handler_raw(context, flags, data);
} else {
ngx_http_groonga_context_receive_handler_typed(context, flags, data);
}
}

static ngx_int_t
ngx_http_groonga_extract_command_path(ngx_http_request_t *r,
ngx_str_t *command_path)
Expand Down Expand Up @@ -470,15 +569,6 @@ ngx_http_groonga_extract_command_path(ngx_http_request_t *r,
return NGX_OK;
}

static void
ngx_http_groonga_handler_set_content_type(ngx_http_request_t *r,
const char *content_type)
{
r->headers_out.content_type.len = strlen(content_type);
r->headers_out.content_type.data = (u_char *)content_type;
r->headers_out.content_type_len = r->headers_out.content_type.len;
}

static ngx_int_t
ngx_http_groonga_handler_create_data(ngx_http_request_t *r,
ngx_http_groonga_handler_data_t **data_return)
Expand All @@ -505,10 +595,18 @@ ngx_http_groonga_handler_create_data(ngx_http_request_t *r,
if (rc != NGX_OK) {
return rc;
}

data->initialized = GRN_TRUE;
GRN_TEXT_INIT(&(data->head), GRN_NO_FLAGS);
GRN_TEXT_INIT(&(data->body), GRN_NO_FLAGS);
GRN_TEXT_INIT(&(data->foot), GRN_NO_FLAGS);

data->raw.processed = GRN_FALSE;
data->raw.header_sent = GRN_FALSE;
data->raw.r = r;
data->raw.rc = NGX_OK;

GRN_TEXT_INIT(&(data->typed.head), GRN_NO_FLAGS);
GRN_TEXT_INIT(&(data->typed.body), GRN_NO_FLAGS);
GRN_TEXT_INIT(&(data->typed.foot), GRN_NO_FLAGS);

grn_ctx_use(context, grn_ctx_db(&(location_conf->context)));
rc = ngx_http_groonga_context_check_error(r->connection->log, context);
if (rc != NGX_OK) {
Expand Down Expand Up @@ -562,9 +660,10 @@ ngx_http_groonga_handler_validate_post_command(ngx_http_request_t *r,

context = &(data->context);
ngx_http_groonga_handler_set_content_type(r, "text/plain");
GRN_TEXT_PUTS(context, &(data->body), "command for POST must be <load>: <");
GRN_TEXT_PUT(context, &(data->body), command.data, command.len);
GRN_TEXT_PUTS(context, &(data->body), ">");
GRN_TEXT_PUTS(context, &(data->typed.body),
"command for POST must be <load>: <");
GRN_TEXT_PUT(context, &(data->typed.body), command.data, command.len);
GRN_TEXT_PUTS(context, &(data->typed.body), ">");

return NGX_HTTP_BAD_REQUEST;
}
Expand Down Expand Up @@ -674,7 +773,7 @@ ngx_http_groonga_handler_process_body(ngx_http_request_t *r,
body = r->request_body->bufs->buf;
if (!body) {
ngx_http_groonga_handler_set_content_type(r, "text/plain");
GRN_TEXT_PUTS(context, &(data->body), "must send load data as body");
GRN_TEXT_PUTS(context, &(data->typed.body), "must send load data as body");
return NGX_HTTP_BAD_REQUEST;
}

Expand Down Expand Up @@ -753,6 +852,10 @@ ngx_http_groonga_handler_send_response(ngx_http_request_t *r,
ngx_chain_t head_chain, body_chain, foot_chain;
ngx_chain_t *output_chain = NULL;

if (data->raw.processed) {
return data->raw.rc;
}

context = &(data->context);

/* set the 'Content-type' header */
Expand All @@ -762,17 +865,17 @@ ngx_http_groonga_handler_send_response(ngx_http_request_t *r,
}

/* allocate buffers for a response body */
head_buf = ngx_http_groonga_grn_obj_to_ngx_buf(r->pool, &(data->head));
head_buf = ngx_http_groonga_grn_obj_to_ngx_buf(r->pool, &(data->typed.head));
if (!head_buf) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}

body_buf = ngx_http_groonga_grn_obj_to_ngx_buf(r->pool, &(data->body));
body_buf = ngx_http_groonga_grn_obj_to_ngx_buf(r->pool, &(data->typed.body));
if (!body_buf) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}

foot_buf = ngx_http_groonga_grn_obj_to_ngx_buf(r->pool, &(data->foot));
foot_buf = ngx_http_groonga_grn_obj_to_ngx_buf(r->pool, &(data->typed.foot));
if (!foot_buf) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
Expand All @@ -787,9 +890,9 @@ ngx_http_groonga_handler_send_response(ngx_http_request_t *r,

/* set the status line */
r->headers_out.status = NGX_HTTP_OK;
r->headers_out.content_length_n = GRN_TEXT_LEN(&(data->head)) +
GRN_TEXT_LEN(&(data->body)) +
GRN_TEXT_LEN(&(data->foot));
r->headers_out.content_length_n = GRN_TEXT_LEN(&(data->typed.head)) +
GRN_TEXT_LEN(&(data->typed.body)) +
GRN_TEXT_LEN(&(data->typed.foot));
if (r->headers_out.content_length_n == 0) {
r->header_only = 1;
}
Expand Down

0 comments on commit 99dc6bf

Please sign in to comment.