Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
432 changes: 419 additions & 13 deletions plugins/out_azure_blob/azure_blob.c

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions plugins/out_azure_blob/azure_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_sqldb.h>
#include <fluent-bit/flb_time.h>

/* Content-Type */
#define AZURE_BLOB_CT "Content-Type"
Expand Down Expand Up @@ -62,6 +63,7 @@ struct flb_azure_blob {
flb_sds_t shared_key;
flb_sds_t endpoint;
flb_sds_t path;
struct flb_record_accessor *path_accessor;
flb_sds_t date_key;
flb_sds_t auth_type;
flb_sds_t sas_token;
Expand Down Expand Up @@ -166,4 +168,10 @@ struct flb_azure_blob {
struct flb_config *config;
};

int azb_resolve_path(struct flb_azure_blob *ctx,
const char *tag,
int tag_len,
const struct flb_time *timestamp,
flb_sds_t *out_path);

#endif
11 changes: 8 additions & 3 deletions plugins/out_azure_blob/azure_blob_appendblob.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,22 @@
#include "azure_blob_conf.h"
#include "azure_blob_uri.h"

flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx, char *tag)
flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx,
const char *path_prefix,
char *tag)
{
flb_sds_t uri;
const char *effective_path;

uri = azb_uri_container(ctx);
if (!uri) {
return NULL;
}

if (ctx->path) {
flb_sds_printf(&uri, "/%s/%s?comp=appendblock", ctx->path, tag);
effective_path = (path_prefix && path_prefix[0] != '\0') ? path_prefix : ctx->path;

if (effective_path && effective_path[0] != '\0') {
flb_sds_printf(&uri, "/%s/%s?comp=appendblock", effective_path, tag);
}
else {
flb_sds_printf(&uri, "/%s?comp=appendblock", tag);
Expand Down
4 changes: 3 additions & 1 deletion plugins/out_azure_blob/azure_blob_appendblob.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <fluent-bit/flb_output_plugin.h>
#include "azure_blob.h"

flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx, char *tag);
flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx,
const char *path_prefix,
char *tag);

#endif
55 changes: 40 additions & 15 deletions plugins/out_azure_blob/azure_blob_blockblob.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,23 @@
#include "azure_blob_uri.h"
#include "azure_blob_http.h"

flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, char *name)
flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx,
const char *path_prefix,
char *name)
{
flb_sds_t uri;
const char *effective_path;

uri = azb_uri_container(ctx);
if (!uri) {
return NULL;
}

if (ctx->path) {
effective_path = (path_prefix && path_prefix[0] != '\0') ? path_prefix : ctx->path;

if (effective_path && effective_path[0] != '\0') {
flb_sds_printf(&uri, "/%s/%s?comp=blocklist",
ctx->path, name);
effective_path, name);
}
else {
flb_sds_printf(&uri, "/%s?comp=blocklist", name);
Expand All @@ -55,13 +60,18 @@ flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, char *name)
return uri;
}

flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name,
char *blockid, uint64_t ms, char *random_str)
flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx,
const char *path_prefix,
char *name,
char *blockid,
uint64_t ms,
char *random_str)
{
int len;
flb_sds_t uri;
char *ext;
char *encoded_blockid;
const char *effective_path;

len = strlen(blockid);
encoded_blockid = azb_uri_encode(blockid, len);
Expand All @@ -82,14 +92,16 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name,
ext = "";
}

if (ctx->path) {
effective_path = (path_prefix && path_prefix[0] != '\0') ? path_prefix : ctx->path;

if (effective_path && effective_path[0] != '\0') {
if (ms > 0) {
flb_sds_printf(&uri, "/%s/%s.%s.%" PRIu64 "%s?blockid=%s&comp=block",
ctx->path, name, random_str, ms, ext, encoded_blockid);
effective_path, name, random_str, ms, ext, encoded_blockid);
}
else {
flb_sds_printf(&uri, "/%s/%s.%s%s?blockid=%s&comp=block",
ctx->path, name, random_str, ext, encoded_blockid);
effective_path, name, random_str, ext, encoded_blockid);
}
}
else {
Expand All @@ -112,10 +124,14 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name,
}

flb_sds_t azb_block_blob_uri_commit(struct flb_azure_blob *ctx,
char *tag, uint64_t ms, char *str)
const char *path_prefix,
char *tag,
uint64_t ms,
char *str)
{
char *ext;
flb_sds_t uri;
const char *effective_path;

uri = azb_uri_container(ctx);
if (!uri) {
Expand All @@ -129,9 +145,13 @@ flb_sds_t azb_block_blob_uri_commit(struct flb_azure_blob *ctx,
ext = "";
}

if (ctx->path) {
flb_sds_printf(&uri, "/%s/%s.%s.%" PRIu64 "%s?comp=blocklist", ctx->path, tag, str,
ms, ext);
effective_path = (path_prefix && path_prefix[0] != '\0') ? path_prefix : ctx->path;

if (effective_path && effective_path[0] != '\0') {
flb_sds_printf(&uri,
"/%s/%s.%s.%" PRIu64 "%s?comp=blocklist",
effective_path, tag, str,
ms, ext);
}
else {
flb_sds_printf(&uri, "/%s.%s.%" PRIu64 "%s?comp=blocklist", tag, str, ms, ext);
Expand Down Expand Up @@ -331,14 +351,19 @@ int azb_block_blob_put_block_list(struct flb_azure_blob *ctx, flb_sds_t uri, flb
}

/* Commit a single block */
int azb_block_blob_commit_block(struct flb_azure_blob *ctx, char *blockid, char *tag, uint64_t ms, char *str)
int azb_block_blob_commit_block(struct flb_azure_blob *ctx,
const char *path_prefix,
char *blockid,
char *tag,
uint64_t ms,
char *str)
{
int ret;
flb_sds_t uri = NULL;
flb_sds_t payload;

/* Compose commit URI */
uri = azb_block_blob_uri_commit(ctx, tag, ms, str);
uri = azb_block_blob_uri_commit(ctx, path_prefix, tag, ms, str);
if (!uri) {
return FLB_ERROR;
}
Expand Down Expand Up @@ -419,7 +444,7 @@ int azb_block_blob_commit_file_parts(struct flb_azure_blob *ctx, uint64_t file_i
cfl_sds_cat_safe(&payload, "</BlockList>", 12);
flb_utils_split_free(list);

uri = azb_block_blob_blocklist_uri(ctx, path);
uri = azb_block_blob_blocklist_uri(ctx, NULL, path);
if (!uri) {
flb_sds_destroy(payload);
return -1;
Expand Down
19 changes: 15 additions & 4 deletions plugins/out_azure_blob/azure_blob_blockblob.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,24 @@
#include <fluent-bit/flb_output_plugin.h>
#include "azure_blob.h"

flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, char *name);
flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *tag, char *blockid,
uint64_t ms, char *random_str);
flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx,
const char *path_prefix,
char *name);
flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx,
const char *path_prefix,
char *tag,
char *blockid,
uint64_t ms,
char *random_str);
char *azb_block_blob_id_logs(uint64_t *ms);
char *azb_block_blob_id_blob(struct flb_azure_blob *ctx, char *path, uint64_t part_id);

int azb_block_blob_commit_block(struct flb_azure_blob *ctx, char *blockid, char *tag, uint64_t ms, char *str);
int azb_block_blob_commit_block(struct flb_azure_blob *ctx,
const char *path_prefix,
char *blockid,
char *tag,
uint64_t ms,
char *str);
int azb_block_blob_commit_file_parts(struct flb_azure_blob *ctx, uint64_t file_id,
cfl_sds_t path, cfl_sds_t part_ids);

Expand Down
12 changes: 12 additions & 0 deletions plugins/out_azure_blob/azure_blob_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_base64.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_record_accessor.h>

#include "azure_blob.h"
#include "azure_blob_conf.h"
Expand Down Expand Up @@ -760,6 +761,13 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
if (ctx->path[flb_sds_len(ctx->path) - 1] == '/') {
ctx->path[flb_sds_len(ctx->path) - 1] = '\0';
}

/* Initialize path record accessor for tag expansion */
ctx->path_accessor = flb_ra_create(ctx->path, FLB_TRUE);
if (!ctx->path_accessor) {
flb_plg_error(ctx->ins, "cannot create path record accessor");
return NULL;
}
}

/* database file for blob signal handling */
Expand Down Expand Up @@ -805,6 +813,10 @@ void flb_azure_blob_conf_destroy(struct flb_azure_blob *ctx)
flb_sds_destroy(ctx->path);
ctx->path = NULL;
}
if (ctx->path_accessor) {
flb_ra_destroy(ctx->path_accessor);
ctx->path_accessor = NULL;
}

if (ctx->decoded_sk) {
flb_free(ctx->decoded_sk);
Expand Down
11 changes: 8 additions & 3 deletions plugins/out_azure_blob/azure_blob_uri.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,22 @@ flb_sds_t azb_uri_ensure_or_create_container(struct flb_azure_blob *ctx)
return uri;
}

flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx, char *tag)
flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx,
const char *path_prefix,
char *tag)
{
flb_sds_t uri;
const char *effective_path;

uri = azb_uri_container(ctx);
if (!uri) {
return NULL;
}

if (ctx->path) {
flb_sds_printf(&uri, "/%s/%s", ctx->path, tag);
effective_path = (path_prefix && path_prefix[0] != '\0') ? path_prefix : ctx->path;

if (effective_path && effective_path[0] != '\0') {
flb_sds_printf(&uri, "/%s/%s", effective_path, tag);
}
else {
flb_sds_printf(&uri, "/%s", tag);
Expand Down
4 changes: 3 additions & 1 deletion plugins/out_azure_blob/azure_blob_uri.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

flb_sds_t azb_uri_container(struct flb_azure_blob *ctx);
flb_sds_t azb_uri_ensure_or_create_container(struct flb_azure_blob *ctx);
flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx, char *tag);
flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx,
const char *path_prefix,
char *tag);
flb_sds_t azb_uri_encode(const char *uri, size_t len);
flb_sds_t azb_uri_decode(const char *uri, size_t len);

Expand Down
11 changes: 11 additions & 0 deletions tests/internal/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ set(UNIT_TESTS_FILES
storage_dlq.c
)

if(FLB_OUT_AZURE_BLOB)
set(UNIT_TESTS_FILES
${UNIT_TESTS_FILES}
azure_blob_path.c
)
endif()

# TLS helpers
if(FLB_TLS)
set(UNIT_TESTS_FILES
Expand Down Expand Up @@ -217,6 +224,10 @@ function(prepare_unit_tests TEST_PREFIX SOURCEFILES)
target_link_libraries(${source_file_we} flb-aws)
endif()

if(FLB_OUT_AZURE_BLOB AND "${source_file_we}" STREQUAL "flb-it-azure_blob_path")
target_link_libraries(${source_file_we} flb-plugin-out_azure_blob)
endif()

if(FLB_STREAM_PROCESSOR)
target_link_libraries(${source_file_we} flb-sp)
endif()
Expand Down
Loading