Skip to content

Commit

Permalink
out_http: add support for concatenated JSON Streaming format. (#477)
Browse files Browse the repository at this point in the history
https://en.wikipedia.org/wiki/JSON_streaming#Concatenated_JSON

This is an independent output format behind flag format=json_stream
Useful when flushing to system that doesn't accept msgpack or JSON arrays

Signed-off-by: Brendan <brendan.wreford@anz.com>
  • Loading branch information
ffscl authored and edsiper committed Feb 28, 2018
1 parent 16422b3 commit 82608bb
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 39 deletions.
103 changes: 67 additions & 36 deletions plugins/out_http/http.c
Expand Up @@ -74,7 +74,7 @@ static char *msgpack_to_json(struct flb_out_http_config *ctx, char *data, uint64
}

flb_time_pop_from_msgpack(&tm, &result, &obj);
map = root.via.array.ptr[1];
map = root.via.array.ptr[1];

map_size = map.via.map.size;
msgpack_pack_map(&tmp_pck, map_size + 1);
Expand All @@ -99,6 +99,33 @@ static char *msgpack_to_json(struct flb_out_http_config *ctx, char *data, uint64
/* Format to JSON */
ret = flb_msgpack_raw_to_json_str(tmp_sbuf.data, tmp_sbuf.size,
&json_buf, &json_size);

/* Optionally convert to JSON stream from JSON array */
if (ctx->out_format == FLB_HTTP_OUT_JSON_STREAM) {
char *p;
char *end = json_buf + json_size;
int level = 0;
int in_string = FLB_FALSE;
int in_escape = FLB_FALSE;

for (p = json_buf; p!=end; p++) {
if (in_escape)
in_escape = FLB_FALSE;
else if (*p == '\\')
in_escape = FLB_TRUE;
else if (*p == '"')
in_string = !in_string;
else if (!in_string) {
if (*p == '{')
level++;
else if (*p == '}')
level--;
else if ((*p == '[' || *p == ']' || *p == ',') && level == 0)
*p=' ';
}
}
}

msgpack_sbuffer_destroy(&tmp_sbuf);
if (ret != 0) {
return NULL;
Expand All @@ -109,15 +136,15 @@ static char *msgpack_to_json(struct flb_out_http_config *ctx, char *data, uint64
}

int cb_http_init(struct flb_output_instance *ins, struct flb_config *config,
void *data)
void *data)
{
int ulen;
int io_flags = 0;
char *uri = NULL;
char *tmp;
struct flb_upstream *upstream;
struct flb_out_http_config *ctx = NULL;
(void) data;
(void)data;

/* Allocate plugin context */
ctx = flb_calloc(1, sizeof(struct flb_out_http_config));
Expand All @@ -127,15 +154,15 @@ int cb_http_init(struct flb_output_instance *ins, struct flb_config *config,
}
/*
* Check if a Proxy have been set, if so the Upstream manager will use
* the Proxy end-point and then we let the HTTP client know about it,
* so it can adjust the HTTP requests.
* the Proxy end-point and then we let the HTTP client know about it, so
* it can adjust the HTTP requests.
*/
tmp = flb_output_get_property("proxy", ins);
if (tmp) {
/*
* Here we just want to lookup two things: host and port, we are
* going to skip validations as most of them are handled by the
* HTTP Client in a later stage.
* going to skip validations as most of them are handled by the HTTP
* Client in a later stage.
*/
char *p;
char *addr;
Expand All @@ -145,8 +172,8 @@ int cb_http_init(struct flb_output_instance *ins, struct flb_config *config,
flb_free(ctx);
return -1;
}
addr += 2; /* get right to the host section */
if (*addr == '[') { /* IPv6 */
addr += 2; /* get right to the host section */
if (*addr == '[') { /* IPv6 */
p = strchr(addr, ']');
if (!p) {
flb_free(ctx);
Expand Down Expand Up @@ -207,13 +234,13 @@ int cb_http_init(struct flb_output_instance *ins, struct flb_config *config,
upstream = flb_upstream_create(config,
ctx->proxy_host,
ctx->proxy_port,
io_flags, (void *) &ins->tls);
io_flags, (void *)&ins->tls);
}
else {
upstream = flb_upstream_create(config,
ins->host.name,
ins->host.port,
io_flags, (void *) &ins->tls);
io_flags, (void *)&ins->tls);
}

if (!upstream) {
Expand Down Expand Up @@ -269,6 +296,9 @@ int cb_http_init(struct flb_output_instance *ins, struct flb_config *config,
else if (strcasecmp(tmp, "json") == 0) {
ctx->out_format = FLB_HTTP_OUT_JSON;
}
else if (strcasecmp(tmp, "json_stream") == 0) {
ctx->out_format = FLB_HTTP_OUT_JSON_STREAM;
}
else {
flb_warn("[out_http] unrecognized 'format' option. Using 'msgpack'");
}
Expand All @@ -286,7 +316,7 @@ int cb_http_init(struct flb_output_instance *ins, struct flb_config *config,
}

ctx->u = upstream;
ctx->uri = uri;
ctx->uri = uri;
ctx->host = ins->host.name;
ctx->port = ins->host.port;

Expand All @@ -296,10 +326,10 @@ int cb_http_init(struct flb_output_instance *ins, struct flb_config *config,
}

void cb_http_flush(void *data, size_t bytes,
char *tag, int tag_len,
struct flb_input_instance *i_ins,
void *out_context,
struct flb_config *config)
char *tag, int tag_len,
struct flb_input_instance *i_ins,
void *out_context,
struct flb_config *config)
{
int ret;
int out_ret = FLB_OK;
Expand All @@ -310,9 +340,9 @@ void cb_http_flush(void *data, size_t bytes,
struct flb_http_client *c;
void *body = NULL;
uint64_t body_len;
(void) i_ins;
(void)i_ins;

if (ctx->out_format == FLB_HTTP_OUT_JSON) {
if ((ctx->out_format == FLB_HTTP_OUT_JSON) || (ctx->out_format == FLB_HTTP_OUT_JSON_STREAM)) {
body = msgpack_to_json(ctx, data, bytes, &body_len);
}
else {
Expand All @@ -338,19 +368,19 @@ void cb_http_flush(void *data, size_t bytes,
ctx->proxy, 0);

/* Append headers */
if (ctx->out_format == FLB_HTTP_OUT_JSON) {
if ((ctx->out_format == FLB_HTTP_OUT_JSON) || (ctx->out_format == FLB_HTTP_OUT_JSON_STREAM)) {
flb_http_add_header(c,
FLB_HTTP_CONTENT_TYPE,
sizeof(FLB_HTTP_CONTENT_TYPE) - 1,
FLB_HTTP_MIME_JSON,
sizeof(FLB_HTTP_MIME_JSON) -1);
sizeof(FLB_HTTP_MIME_JSON) - 1);
}
else {
flb_http_add_header(c,
FLB_HTTP_CONTENT_TYPE,
sizeof(FLB_HTTP_CONTENT_TYPE) - 1,
FLB_HTTP_MIME_MSGPACK,
sizeof(FLB_HTTP_MIME_MSGPACK) -1);
sizeof(FLB_HTTP_MIME_MSGPACK) - 1);
}

if (ctx->http_user && ctx->http_passwd) {
Expand All @@ -361,13 +391,14 @@ void cb_http_flush(void *data, size_t bytes,
if (ret == 0) {
/*
* Only allow the following HTTP status:
*
* - 200: OK
* - 201: Created
* - 202: Accepted
* - 203: no authorative resp
* - 204: No Content
* - 205: Reset content
*
* - 200: OK
* - 201: Created
* - 202: Accepted
* - 203: no authorative resp
* - 204: No Content
* - 205: Reset content
*
*/
if (c->resp.status < 200 || c->resp.status > 205) {
flb_error("[out_http] HTTP STATUS=%i", c->resp.status);
Expand All @@ -393,7 +424,7 @@ void cb_http_flush(void *data, size_t bytes,
/* Release the connection */
flb_upstream_conn_release(u_conn);

if (ctx->out_format == FLB_HTTP_OUT_JSON) {
if ((ctx->out_format == FLB_HTTP_OUT_JSON) || (ctx->out_format == FLB_HTTP_OUT_JSON_STREAM)) {
flb_free(body);
}

Expand All @@ -420,11 +451,11 @@ int cb_http_exit(void *data, struct flb_config *config)

/* Plugin reference */
struct flb_output_plugin out_http_plugin = {
.name = "http",
.description = "HTTP Output",
.cb_init = cb_http_init,
.cb_pre_run = NULL,
.cb_flush = cb_http_flush,
.cb_exit = cb_http_exit,
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
.name = "http",
.description = "HTTP Output",
.cb_init = cb_http_init,
.cb_pre_run = NULL,
.cb_flush = cb_http_flush,
.cb_exit = cb_http_exit,
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
};
7 changes: 4 additions & 3 deletions plugins/out_http/http.h
Expand Up @@ -20,8 +20,9 @@
#ifndef FLB_OUT_HTTP_H
#define FLB_OUT_HTTP_H

#define FLB_HTTP_OUT_MSGPACK 0
#define FLB_HTTP_OUT_JSON 1
#define FLB_HTTP_OUT_MSGPACK 0
#define FLB_HTTP_OUT_JSON 1
#define FLB_HTTP_OUT_JSON_STREAM 2

#define FLB_HTTP_CONTENT_TYPE "Content-Type"
#define FLB_HTTP_MIME_MSGPACK "application/msgpack"
Expand All @@ -45,7 +46,7 @@ struct flb_out_http_config {
/* HTTP URI */
char *uri;
char *host;
int port;
int port;

/* Upstream connection to the backend server */
struct flb_upstream *u;
Expand Down

0 comments on commit 82608bb

Please sign in to comment.