Skip to content

Commit

Permalink
out_s3: add content options
Browse files Browse the repository at this point in the history
Signed-off-by: Zhonghui Hu <zh0512xx@gmail.com>
  • Loading branch information
zhonghui12 committed Dec 10, 2020
1 parent 97a2a25 commit b16962d
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 31 deletions.
154 changes: 124 additions & 30 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <fluent-bit/flb_aws_util.h>
#include <fluent-bit/flb_signv4.h>
#include <fluent-bit/flb_scheduler.h>
#include <fluent-bit/flb_gzip.h>
#include <stdlib.h>
#include <msgpack.h>

Expand All @@ -50,6 +51,26 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx,
static struct multipart_upload *create_upload(struct flb_s3 *ctx,
const char *tag, int tag_len);

static struct flb_aws_header content_encoding_header = {
.key = "Content-Encoding",
.key_len = 16,
.val = "gzip",
.val_len = 4,
};

static struct flb_aws_header content_type_header = {
.key = "Content-Type",
.key_len = 12,
.val = "",
.val_len = 0,
};

static struct flb_aws_header canned_acl_header = {
.key = "x-amz-acl",
.key_len = 9,
.val = "",
.val_len = 0,
};

static char *mock_error_response(char *error_env_var)
{
Expand Down Expand Up @@ -83,22 +104,52 @@ int s3_plugin_under_test()
return FLB_FALSE;
}

struct flb_aws_header *create_canned_acl_header(char *canned_acl)
static int create_headers(struct flb_s3 *ctx, struct flb_aws_header **headers, int *num_headers)
{
struct flb_aws_header *acl_header = NULL;
int n = 0;
int headers_len = 0;
struct flb_aws_header *s3_headers = NULL;

if (ctx->content_type != NULL) {
headers_len++;
}
if (ctx->compression != NULL) {
headers_len++;
}
if (ctx->canned_acl != NULL) {
headers_len++;
}
if (headers_len == 0) {
*num_headers = headers_len;
*headers = s3_headers;
return 0;
}

acl_header = flb_malloc(sizeof(struct flb_aws_header));
if (acl_header == NULL) {
s3_headers = flb_malloc(sizeof(struct flb_aws_header) * headers_len);
if (s3_headers == NULL) {
flb_errno();
return NULL;
}

acl_header->key = "x-amz-acl";
acl_header->key_len = 9;
acl_header->val = canned_acl;
acl_header->val_len = strlen(canned_acl);

return acl_header;
return -1;
}

if (ctx->content_type != NULL) {
s3_headers[n] = content_type_header;
s3_headers[n].val = ctx->content_type;
s3_headers[n].val_len = strlen(ctx->content_type);
n++;
}
if (ctx->compression != NULL) {
s3_headers[n] = content_encoding_header;
n++;
}
if (ctx->canned_acl != NULL) {
s3_headers[n] = canned_acl_header;
s3_headers[n].val = ctx->canned_acl;
s3_headers[n].val_len = strlen(ctx->canned_acl);
}

*num_headers = headers_len;
*headers = s3_headers;
return 0;
};

struct flb_http_client *mock_s3_call(char *error_env_var, char *api)
Expand Down Expand Up @@ -431,6 +482,26 @@ static int cb_s3_init(struct flb_output_instance *ins,
if (tmp) {
ctx->canned_acl = (char *) tmp;
}

tmp = flb_output_get_property("compression", ins);
if (tmp) {
if (strcmp((char *) tmp, "gzip") != 0) {
flb_plg_error(ctx->ins,
"'gzip' is currently the only supported value for 'compression'");
return -1;
} else if (ctx->use_put_object == FLB_FALSE) {
flb_plg_error(ctx->ins,
"use_put_object must be enabled when compression is enabled");
return -1;
}

ctx->compression = (char *) tmp;
}

tmp = flb_output_get_property("content_type", ins);
if (tmp) {
ctx->content_type = (char *) tmp;
}

ctx->client_tls.context = flb_tls_context_new(FLB_TRUE,
ins->tls_debug,
Expand Down Expand Up @@ -930,13 +1001,18 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
flb_sds_t s3_key = NULL;
struct flb_http_client *c = NULL;
struct flb_aws_client *s3_client;
struct flb_aws_header *canned_acl_header;
struct flb_aws_header *headers = NULL;
char *random_alphanumeric;
int append_random = FLB_FALSE;
int len;
int ret;
int num_headers = 0;
char *final_key;
flb_sds_t uri;
flb_sds_t tmp;
void *compressed_body;
char *final_body;
size_t final_body_size;

s3_key = flb_get_s3_key(ctx->s3_key_format, create_time, tag, ctx->tag_delimiters);
if (!s3_key) {
Expand Down Expand Up @@ -980,28 +1056,34 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time
flb_sds_destroy(s3_key);
uri = tmp;

if (ctx->compression != NULL) {
ret = flb_gzip_compress(body, body_size, &compressed_body, &final_body_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to compress data");
flb_sds_destroy(uri);
return -1;
}
final_body = (char *) compressed_body;
} else {
final_body = body;
final_body_size = body_size;
}

s3_client = ctx->s3_client;
if (s3_plugin_under_test() == FLB_TRUE) {
c = mock_s3_call("TEST_PUT_OBJECT_ERROR", "PutObject");
}
else {
if (ctx->canned_acl == NULL) {
c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT,
uri, body, body_size,
NULL, 0);
}
else {
canned_acl_header = create_canned_acl_header(ctx->canned_acl);
if (canned_acl_header == NULL) {
flb_sds_destroy(uri);
flb_plg_error(ctx->ins, "Failed to create canned ACL header");
return -1;
}
c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT,
uri, body, body_size,
canned_acl_header, 1);
flb_free(canned_acl_header);
ret = create_headers(ctx, &headers, &num_headers);
if (ret == -1) {
flb_plg_error(ctx->ins, "Failed to create headers");
flb_sds_destroy(uri);
return -1;
}
c = s3_client->client_vtable->request(s3_client, FLB_HTTP_PUT,
uri, final_body, final_body_size,
headers, num_headers);
flb_free(headers);
}
if (c) {
flb_plg_debug(ctx->ins, "PutObject http status=%d", c->resp.status);
Expand Down Expand Up @@ -1447,6 +1529,18 @@ static struct flb_config_map config_map[] = {
0, FLB_FALSE, 0,
"Predefined Canned ACL policy for S3 objects."
},
{
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
"Compression type for S3 objects. 'gzip' is currently the only supported value. "
"This will be set as the Content-Encoding HTTP header."
},
{
FLB_CONFIG_MAP_STR, "content_type", NULL,
0, FLB_FALSE, 0,
"A standard MIME type for the S3 object; this will be set "
"as the Content-Type HTTP header."
},

{
FLB_CONFIG_MAP_STR, "store_dir", "/tmp/fluent-bit/s3",
Expand Down
3 changes: 2 additions & 1 deletion plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ struct flb_s3 {
char *endpoint;
char *sts_endpoint;
char *canned_acl;
char *compression;
char *content_type;
int free_endpoint;
int use_put_object;

Expand Down Expand Up @@ -151,7 +153,6 @@ void multipart_read_uploads_from_fs(struct flb_s3 *ctx);
void multipart_upload_destroy(struct multipart_upload *m_upload);

struct flb_http_client *mock_s3_call(char *error_env_var, char *api);
struct flb_aws_header *create_canned_acl_header(char *canned_acl);
int s3_plugin_under_test();

#endif
18 changes: 18 additions & 0 deletions plugins/out_s3/s3_multipart.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,24 @@

flb_sds_t get_etag(char *response, size_t size);

static struct flb_aws_header *create_canned_acl_header(char *canned_acl)
{
struct flb_aws_header *acl_header = NULL;

acl_header = flb_malloc(sizeof(struct flb_aws_header));
if (acl_header == NULL) {
flb_errno();
return NULL;
}

acl_header->key = "x-amz-acl";
acl_header->key_len = 9;
acl_header->val = canned_acl;
acl_header->val_len = strlen(canned_acl);

return acl_header;
};

static inline int try_to_write(char *buf, int *off, size_t left,
const char *str, size_t str_len)
{
Expand Down

0 comments on commit b16962d

Please sign in to comment.