diff --git a/plugins/out_azure_blob/azure_blob.c b/plugins/out_azure_blob/azure_blob.c index a650809c4d4..29658a64336 100644 --- a/plugins/out_azure_blob/azure_blob.c +++ b/plugins/out_azure_blob/azure_blob.c @@ -31,8 +31,10 @@ #include #include #include +#include #include +#include #include "azure_blob.h" #include "azure_blob_db.h" @@ -44,6 +46,7 @@ #include "azure_blob_store.h" #define CREATE_BLOB 1337 +#define AZB_UUID_PLACEHOLDER "$UUID" /* thread_local_storage for workers */ @@ -53,6 +56,8 @@ struct worker_info { FLB_TLS_DEFINE(struct worker_info, worker_info); +static int create_blob(struct flb_azure_blob *ctx, const char *path_prefix, char *name); + static int azure_blob_format(struct flb_config *config, struct flb_input_instance *ins, void *plugin_context, @@ -168,7 +173,375 @@ void generate_random_string_blob(char *str, size_t length) str[length] = '\0'; } -static int create_blob(struct flb_azure_blob *ctx, char *name) +/* Simple UUID replacement - replaces $UUID with 8 random chars */ +static flb_sds_t azb_replace_uuid(flb_sds_t path) +{ + char random_buf[9] = {0}; + char *p; + flb_sds_t result; + size_t prefix_len; + size_t suffix_len; + + if (!path) { + return NULL; + } + + p = strstr(path, AZB_UUID_PLACEHOLDER); + if (!p) { + return path; + } + + generate_random_string_blob(random_buf, 8); + + prefix_len = (size_t)(p - path); + suffix_len = flb_sds_len(path) - prefix_len - strlen(AZB_UUID_PLACEHOLDER); + + result = flb_sds_create_size(prefix_len + 8 + suffix_len + 1); + if (!result) { + flb_errno(); + flb_sds_destroy(path); + return NULL; + } + + if (prefix_len > 0) { + memcpy(result, path, prefix_len); + } + memcpy(result + prefix_len, random_buf, 8); + if (suffix_len > 0) { + memcpy(result + prefix_len + 8, p + strlen(AZB_UUID_PLACEHOLDER), suffix_len); + } + result[prefix_len + 8 + suffix_len] = '\0'; + flb_sds_len_set(result, prefix_len + 8 + suffix_len); + + flb_sds_destroy(path); + return result; +} + +/* Simple token replacement helper */ +static flb_sds_t azb_simple_replace(flb_sds_t input, + const char *token, + const char *replacement) +{ + char *pos; + size_t token_len; + size_t replace_len; + size_t prefix_len; + size_t suffix_len; + flb_sds_t result; + + if (!input || !token) { + return input; + } + + pos = strstr(input, token); + if (!pos) { + return input; + } + + token_len = strlen(token); + replace_len = strlen(replacement); + prefix_len = (size_t)(pos - input); + suffix_len = flb_sds_len(input) - prefix_len - token_len; + + result = flb_sds_create_size(prefix_len + replace_len + suffix_len + 1); + if (!result) { + flb_errno(); + flb_sds_destroy(input); + return NULL; + } + + if (prefix_len > 0) { + memcpy(result, input, prefix_len); + } + if (replace_len > 0) { + memcpy(result + prefix_len, replacement, replace_len); + } + if (suffix_len > 0) { + memcpy(result + prefix_len + replace_len, pos + token_len, suffix_len); + } + result[prefix_len + replace_len + suffix_len] = '\0'; + flb_sds_len_set(result, prefix_len + replace_len + suffix_len); + + flb_sds_destroy(input); + return result; +} + +static flb_sds_t azb_apply_time_tokens(flb_sds_t path, const struct flb_time *timestamp) +{ + char ms_buf[4]; + char ns_buf[10]; + flb_sds_t tmp; + + if (!path || !timestamp) { + return path; + } + + snprintf(ms_buf, sizeof(ms_buf), "%03lu", + (unsigned long)(timestamp->tm.tv_nsec / 1000000)); + snprintf(ns_buf, sizeof(ns_buf), "%09lu", + (unsigned long)timestamp->tm.tv_nsec); + + /* Replace %3N with milliseconds */ + tmp = azb_simple_replace(path, "%3N", ms_buf); + if (!tmp) { + return NULL; + } + path = tmp; + + /* Replace %9N with nanoseconds */ + tmp = azb_simple_replace(path, "%9N", ns_buf); + if (!tmp) { + return NULL; + } + path = tmp; + + /* Replace %L with nanoseconds */ + tmp = azb_simple_replace(path, "%L", ns_buf); + if (!tmp) { + return NULL; + } + + return tmp; +} + +static flb_sds_t azb_apply_strftime(flb_sds_t path, const struct flb_time *timestamp) +{ + struct flb_time now; + const struct flb_time *ref; + struct tm tm_utc; + time_t seconds; + size_t buf_size; + size_t out_len; + char *buf; + char *tmp_buf; + flb_sds_t result; + + if (!path) { + return NULL; + } + + if (timestamp) { + ref = timestamp; + } + else { + flb_time_get(&now); + ref = &now; + } + + seconds = ref->tm.tv_sec; + if (!gmtime_r(&seconds, &tm_utc)) { + flb_sds_destroy(path); + return NULL; + } + + buf_size = flb_sds_len(path) + 64; + buf = flb_malloc(buf_size + 1); + if (!buf) { + flb_errno(); + flb_sds_destroy(path); + return NULL; + } + + buf[0] = '\0'; + + while (1) { + out_len = strftime(buf, buf_size + 1, path, &tm_utc); + if (out_len > 0) { + break; + } + + if (buf_size > 4096) { + break; + } + + buf_size *= 2; + tmp_buf = flb_realloc(buf, buf_size + 1); + if (!tmp_buf) { + flb_errno(); + flb_free(buf); + flb_sds_destroy(path); + return NULL; + } + buf = tmp_buf; + } + + result = flb_sds_create_len(buf, out_len); + if (!result) { + flb_errno(); + flb_free(buf); + flb_sds_destroy(path); + return NULL; + } + + flb_free(buf); + flb_sds_destroy(path); + + return result; +} + +static void azb_trim_slashes(flb_sds_t path) +{ + size_t len; + size_t start = 0; + char *buf; + + if (!path) { + return; + } + + buf = path; + len = flb_sds_len(path); + + while (start < len && buf[start] == '/') { + start++; + } + + if (start > 0) { + memmove(buf, buf + start, len - start + 1); + len -= start; + flb_sds_len_set(path, len); + } + + while (len > 0 && buf[len - 1] == '/') { + len--; + } + buf[len] = '\0'; + flb_sds_len_set(path, len); +} + +int azb_resolve_path(struct flb_azure_blob *ctx, + const char *tag, + int tag_len, + const struct flb_time *timestamp, + flb_sds_t *out_path) +{ + flb_sds_t path; + struct flb_time now; + msgpack_sbuffer sbuf; + msgpack_packer pk; + msgpack_unpacked result; + msgpack_object root; + struct flb_record_accessor *temp_ra; + flb_sds_t expanded; + + if (!out_path) { + return -1; + } + + *out_path = NULL; + + if (!ctx->path_accessor) { + return 0; + } + + if (!timestamp) { + flb_time_get(&now); + timestamp = &now; + } + + /* Start with the original path template */ + path = flb_sds_create_len(ctx->path, flb_sds_len(ctx->path)); + if (!path) { + flb_errno(); + return -1; + } + + /* Apply UUID replacement before record accessor step. + * Unknown $ tokens get stripped otherwise. + */ + path = azb_replace_uuid(path); + if (!path) { + return -1; + } + + /* Apply time tokens (%3N, %9N, %L) */ + path = azb_apply_time_tokens(path, timestamp); + if (!path) { + return -1; + } + + /* Apply strftime */ + path = azb_apply_strftime(path, timestamp); + if (!path) { + return -1; + } + + /* Now use record accessor to expand $TAG and $TAG[n] */ + /* Create empty msgpack map for record accessor */ + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write); + msgpack_pack_map(&pk, 0); + + /* Unpack to get msgpack_object */ + msgpack_unpacked_init(&result); + if (msgpack_unpack_next(&result, + sbuf.data, + sbuf.size, + NULL) != MSGPACK_UNPACK_SUCCESS) { + msgpack_sbuffer_destroy(&sbuf); + msgpack_unpacked_destroy(&result); + flb_sds_destroy(path); + return -1; + } + root = result.data; + + /* Create a temporary record accessor for the partially-processed path */ + temp_ra = flb_ra_create(path, FLB_TRUE); + if (!temp_ra) { + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + flb_sds_destroy(path); + return -1; + } + + /* Use record accessor to expand $TAG and $TAG[n] */ + expanded = flb_ra_translate(temp_ra, (char *)tag, tag_len, root, NULL); + + flb_ra_destroy(temp_ra); + msgpack_unpacked_destroy(&result); + msgpack_sbuffer_destroy(&sbuf); + flb_sds_destroy(path); + + if (!expanded) { + return -1; + } + + azb_trim_slashes(expanded); + + if (flb_sds_len(expanded) == 0) { + flb_sds_destroy(expanded); + return 0; + } + + *out_path = expanded; + return 0; +} + +static int azb_create_blob_with_tag(struct flb_azure_blob *ctx, + const char *tag, + int tag_len, + const char *blob_name) +{ + flb_sds_t prefix = NULL; + int ret; + + if (azb_resolve_path(ctx, tag, tag_len, NULL, &prefix) != 0) { + if (prefix) { + flb_sds_destroy(prefix); + } + return FLB_RETRY; + } + + ret = create_blob(ctx, prefix, (char *) blob_name); + + if (prefix) { + flb_sds_destroy(prefix); + } + + return ret; +} + +static int create_blob(struct flb_azure_blob *ctx, const char *path_prefix, char *name) { int ret; size_t b_sent; @@ -176,7 +549,7 @@ static int create_blob(struct flb_azure_blob *ctx, char *name) struct flb_http_client *c; struct flb_connection *u_conn; - uri = azb_uri_create_blob(ctx, name); + uri = azb_uri_create_blob(ctx, path_prefix, name); if (!uri) { return FLB_RETRY; } @@ -256,7 +629,7 @@ static int delete_blob(struct flb_azure_blob *ctx, char *name) struct flb_http_client *c; struct flb_connection *u_conn; - uri = azb_uri_create_blob(ctx, name); + uri = azb_uri_create_blob(ctx, NULL, name); if (!uri) { return FLB_RETRY; } @@ -460,12 +833,23 @@ static int send_blob(struct flb_config *config, flb_sds_t uri = NULL; flb_sds_t block_id = NULL; flb_sds_t ref_name = NULL; + flb_sds_t path_prefix = NULL; void *payload_buf = data; size_t payload_size = bytes; char *generated_random_string; + struct flb_time now; + + flb_time_get(&now); + + if (azb_resolve_path(ctx, tag, tag_len, &now, &path_prefix) != 0) { + return FLB_RETRY; + } ref_name = flb_sds_create_size(256); if (!ref_name) { + if (path_prefix) { + flb_sds_destroy(path_prefix); + } return FLB_RETRY; } @@ -475,11 +859,14 @@ static int send_blob(struct flb_config *config, flb_errno(); flb_plg_error(ctx->ins, "cannot allocate memory for random string"); flb_sds_destroy(ref_name); + if (path_prefix) { + flb_sds_destroy(path_prefix); + } return FLB_RETRY; } if (blob_type == AZURE_BLOB_APPENDBLOB) { - uri = azb_append_blob_uri(ctx, tag); + uri = azb_append_blob_uri(ctx, path_prefix, tag); } else if (blob_type == AZURE_BLOB_BLOCKBLOB) { generate_random_string_blob(generated_random_string, ctx->blob_uri_length); /* Generate the random string */ @@ -488,15 +875,20 @@ static int send_blob(struct flb_config *config, if (!block_id) { flb_plg_error(ctx->ins, "could not generate block id"); flb_free(generated_random_string); - cfl_sds_destroy(ref_name); + flb_sds_destroy(ref_name); + if (path_prefix) { + flb_sds_destroy(path_prefix); + } return FLB_RETRY; } - uri = azb_block_blob_uri(ctx, tag, block_id, ms, generated_random_string); + uri = azb_block_blob_uri(ctx, path_prefix, tag, + block_id, ms, generated_random_string); ref_name = flb_sds_printf(&ref_name, "file=%s.%" PRIu64, name, ms); } else if (event_type == FLB_EVENT_TYPE_BLOBS) { block_id = azb_block_blob_id_blob(ctx, name, part_id); - uri = azb_block_blob_uri(ctx, name, block_id, 0, generated_random_string); + uri = azb_block_blob_uri(ctx, path_prefix, name, + block_id, 0, generated_random_string); ref_name = flb_sds_printf(&ref_name, "file=%s:%" PRIu64, name, part_id); } } @@ -507,6 +899,9 @@ static int send_blob(struct flb_config *config, flb_free(block_id); } flb_sds_destroy(ref_name); + if (path_prefix) { + flb_sds_destroy(path_prefix); + } return FLB_RETRY; } @@ -520,11 +915,12 @@ static int send_blob(struct flb_config *config, if (ret == FLB_OK) { /* For Logs type, we need to commit the block right away */ if (event_type == FLB_EVENT_TYPE_LOGS) { - ret = azb_block_blob_commit_block(ctx, block_id, tag, ms, generated_random_string); + ret = azb_block_blob_commit_block(ctx, path_prefix, block_id, + tag, ms, generated_random_string); } } else if (ret == CREATE_BLOB) { - ret = create_blob(ctx, name); + ret = create_blob(ctx, path_prefix, name); if (ret == FLB_OK) { ret = http_send_blob(config, ctx, ref_name, uri, block_id, event_type, payload_buf, payload_size); } @@ -542,6 +938,10 @@ static int send_blob(struct flb_config *config, flb_free(block_id); } + if (path_prefix) { + flb_sds_destroy(path_prefix); + } + return ret; } @@ -1260,7 +1660,9 @@ static void cb_azure_blob_ingest(struct flb_config *config, void *data) { /* Handle blob creation if necessary */ if (ret == CREATE_BLOB) { - ret = create_blob(ctx, tag_sds); + ret = azb_create_blob_with_tag(ctx, tag_sds, + (int) flb_sds_len(tag_sds), + tag_sds); if (ret == FLB_OK) { ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS,ctx->btype, (char *) tag_sds, 0, (char *) tag_sds, flb_sds_len(tag_sds), payload, flb_sds_len(payload)); @@ -1389,7 +1791,9 @@ static int ingest_all_chunks(struct flb_azure_blob *ctx, struct flb_config *conf ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype, (char *)tag_sds, 0, (char *)tag_sds, flb_sds_len(tag_sds), payload, flb_sds_len(payload)); if (ret == CREATE_BLOB) { - ret = create_blob(ctx, tag_sds); + ret = azb_create_blob_with_tag(ctx, tag_sds, + (int) flb_sds_len(tag_sds), + tag_sds); if (ret == FLB_OK) { ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype, (char *)tag_sds, 0, (char *)tag_sds, flb_sds_len(tag_sds), payload, flb_sds_len(payload)); } @@ -1559,7 +1963,7 @@ static void cb_azure_blob_flush(struct flb_event_chunk *event_chunk, ret = send_blob(config, i_ins, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype,(char *)tag_name, 0, (char *)tag_name, tag_len, final_payload, final_payload_size); if (ret == CREATE_BLOB) { - ret = create_blob(ctx, upload_file->fsf->name); + ret = azb_create_blob_with_tag(ctx, tag_name, tag_len, tag_name); if (ret == FLB_OK) { ret = send_blob(config, i_ins, ctx, FLB_EVENT_TYPE_LOGS, ctx->btype,(char *)tag_name, 0, (char *)tag_name, tag_len, final_payload, final_payload_size); } @@ -1645,7 +2049,9 @@ static void cb_azure_blob_flush(struct flb_event_chunk *event_chunk, json, json_size); if (ret == CREATE_BLOB) { - ret = create_blob(ctx, event_chunk->tag); + ret = azb_create_blob_with_tag(ctx, event_chunk->tag, + (int) flb_sds_len(event_chunk->tag), + event_chunk->tag); if (ret == FLB_OK) { ret = send_blob(config, i_ins, ctx, FLB_EVENT_TYPE_LOGS, diff --git a/plugins/out_azure_blob/azure_blob.h b/plugins/out_azure_blob/azure_blob.h index 8699dda54f8..946e279dd80 100644 --- a/plugins/out_azure_blob/azure_blob.h +++ b/plugins/out_azure_blob/azure_blob.h @@ -24,6 +24,7 @@ #include #include #include +#include /* Content-Type */ #define AZURE_BLOB_CT "Content-Type" @@ -62,6 +63,7 @@ struct flb_azure_blob { flb_sds_t shared_key; flb_sds_t endpoint; flb_sds_t path; + struct flb_record_accessor *path_accessor; flb_sds_t date_key; flb_sds_t auth_type; flb_sds_t sas_token; @@ -166,4 +168,10 @@ struct flb_azure_blob { struct flb_config *config; }; +int azb_resolve_path(struct flb_azure_blob *ctx, + const char *tag, + int tag_len, + const struct flb_time *timestamp, + flb_sds_t *out_path); + #endif diff --git a/plugins/out_azure_blob/azure_blob_appendblob.c b/plugins/out_azure_blob/azure_blob_appendblob.c index 110e9eb5d4d..2ef165ba537 100644 --- a/plugins/out_azure_blob/azure_blob_appendblob.c +++ b/plugins/out_azure_blob/azure_blob_appendblob.c @@ -24,17 +24,22 @@ #include "azure_blob_conf.h" #include "azure_blob_uri.h" -flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx, char *tag) +flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx, + const char *path_prefix, + char *tag) { flb_sds_t uri; + const char *effective_path; uri = azb_uri_container(ctx); if (!uri) { return NULL; } - if (ctx->path) { - flb_sds_printf(&uri, "/%s/%s?comp=appendblock", ctx->path, tag); + effective_path = (path_prefix && path_prefix[0] != '\0') ? path_prefix : ctx->path; + + if (effective_path && effective_path[0] != '\0') { + flb_sds_printf(&uri, "/%s/%s?comp=appendblock", effective_path, tag); } else { flb_sds_printf(&uri, "/%s?comp=appendblock", tag); diff --git a/plugins/out_azure_blob/azure_blob_appendblob.h b/plugins/out_azure_blob/azure_blob_appendblob.h index 5c3f1a7bd4b..fb43ec08bc1 100644 --- a/plugins/out_azure_blob/azure_blob_appendblob.h +++ b/plugins/out_azure_blob/azure_blob_appendblob.h @@ -23,6 +23,8 @@ #include #include "azure_blob.h" -flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx, char *tag); +flb_sds_t azb_append_blob_uri(struct flb_azure_blob *ctx, + const char *path_prefix, + char *tag); #endif diff --git a/plugins/out_azure_blob/azure_blob_blockblob.c b/plugins/out_azure_blob/azure_blob_blockblob.c index 860401b2623..06c9fa829bd 100644 --- a/plugins/out_azure_blob/azure_blob_blockblob.c +++ b/plugins/out_azure_blob/azure_blob_blockblob.c @@ -31,18 +31,23 @@ #include "azure_blob_uri.h" #include "azure_blob_http.h" -flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, char *name) +flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, + const char *path_prefix, + char *name) { flb_sds_t uri; + const char *effective_path; uri = azb_uri_container(ctx); if (!uri) { return NULL; } - if (ctx->path) { + effective_path = (path_prefix && path_prefix[0] != '\0') ? path_prefix : ctx->path; + + if (effective_path && effective_path[0] != '\0') { flb_sds_printf(&uri, "/%s/%s?comp=blocklist", - ctx->path, name); + effective_path, name); } else { flb_sds_printf(&uri, "/%s?comp=blocklist", name); @@ -55,13 +60,18 @@ flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, char *name) return uri; } -flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name, - char *blockid, uint64_t ms, char *random_str) +flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, + const char *path_prefix, + char *name, + char *blockid, + uint64_t ms, + char *random_str) { int len; flb_sds_t uri; char *ext; char *encoded_blockid; + const char *effective_path; len = strlen(blockid); encoded_blockid = azb_uri_encode(blockid, len); @@ -82,14 +92,16 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name, ext = ""; } - if (ctx->path) { + effective_path = (path_prefix && path_prefix[0] != '\0') ? path_prefix : ctx->path; + + if (effective_path && effective_path[0] != '\0') { if (ms > 0) { flb_sds_printf(&uri, "/%s/%s.%s.%" PRIu64 "%s?blockid=%s&comp=block", - ctx->path, name, random_str, ms, ext, encoded_blockid); + effective_path, name, random_str, ms, ext, encoded_blockid); } else { flb_sds_printf(&uri, "/%s/%s.%s%s?blockid=%s&comp=block", - ctx->path, name, random_str, ext, encoded_blockid); + effective_path, name, random_str, ext, encoded_blockid); } } else { @@ -112,10 +124,14 @@ flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *name, } flb_sds_t azb_block_blob_uri_commit(struct flb_azure_blob *ctx, - char *tag, uint64_t ms, char *str) + const char *path_prefix, + char *tag, + uint64_t ms, + char *str) { char *ext; flb_sds_t uri; + const char *effective_path; uri = azb_uri_container(ctx); if (!uri) { @@ -129,9 +145,13 @@ flb_sds_t azb_block_blob_uri_commit(struct flb_azure_blob *ctx, ext = ""; } - if (ctx->path) { - flb_sds_printf(&uri, "/%s/%s.%s.%" PRIu64 "%s?comp=blocklist", ctx->path, tag, str, - ms, ext); + effective_path = (path_prefix && path_prefix[0] != '\0') ? path_prefix : ctx->path; + + if (effective_path && effective_path[0] != '\0') { + flb_sds_printf(&uri, + "/%s/%s.%s.%" PRIu64 "%s?comp=blocklist", + effective_path, tag, str, + ms, ext); } else { flb_sds_printf(&uri, "/%s.%s.%" PRIu64 "%s?comp=blocklist", tag, str, ms, ext); @@ -331,14 +351,19 @@ int azb_block_blob_put_block_list(struct flb_azure_blob *ctx, flb_sds_t uri, flb } /* Commit a single block */ -int azb_block_blob_commit_block(struct flb_azure_blob *ctx, char *blockid, char *tag, uint64_t ms, char *str) +int azb_block_blob_commit_block(struct flb_azure_blob *ctx, + const char *path_prefix, + char *blockid, + char *tag, + uint64_t ms, + char *str) { int ret; flb_sds_t uri = NULL; flb_sds_t payload; /* Compose commit URI */ - uri = azb_block_blob_uri_commit(ctx, tag, ms, str); + uri = azb_block_blob_uri_commit(ctx, path_prefix, tag, ms, str); if (!uri) { return FLB_ERROR; } @@ -419,7 +444,7 @@ int azb_block_blob_commit_file_parts(struct flb_azure_blob *ctx, uint64_t file_i cfl_sds_cat_safe(&payload, "", 12); flb_utils_split_free(list); - uri = azb_block_blob_blocklist_uri(ctx, path); + uri = azb_block_blob_blocklist_uri(ctx, NULL, path); if (!uri) { flb_sds_destroy(payload); return -1; diff --git a/plugins/out_azure_blob/azure_blob_blockblob.h b/plugins/out_azure_blob/azure_blob_blockblob.h index 6949aefde51..c70aaa89385 100644 --- a/plugins/out_azure_blob/azure_blob_blockblob.h +++ b/plugins/out_azure_blob/azure_blob_blockblob.h @@ -23,13 +23,24 @@ #include #include "azure_blob.h" -flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, char *name); -flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, char *tag, char *blockid, - uint64_t ms, char *random_str); +flb_sds_t azb_block_blob_blocklist_uri(struct flb_azure_blob *ctx, + const char *path_prefix, + char *name); +flb_sds_t azb_block_blob_uri(struct flb_azure_blob *ctx, + const char *path_prefix, + char *tag, + char *blockid, + uint64_t ms, + char *random_str); char *azb_block_blob_id_logs(uint64_t *ms); char *azb_block_blob_id_blob(struct flb_azure_blob *ctx, char *path, uint64_t part_id); -int azb_block_blob_commit_block(struct flb_azure_blob *ctx, char *blockid, char *tag, uint64_t ms, char *str); +int azb_block_blob_commit_block(struct flb_azure_blob *ctx, + const char *path_prefix, + char *blockid, + char *tag, + uint64_t ms, + char *str); int azb_block_blob_commit_file_parts(struct flb_azure_blob *ctx, uint64_t file_id, cfl_sds_t path, cfl_sds_t part_ids); diff --git a/plugins/out_azure_blob/azure_blob_conf.c b/plugins/out_azure_blob/azure_blob_conf.c index ea883a01852..311482ad173 100644 --- a/plugins/out_azure_blob/azure_blob_conf.c +++ b/plugins/out_azure_blob/azure_blob_conf.c @@ -20,6 +20,7 @@ #include #include #include +#include #include "azure_blob.h" #include "azure_blob_conf.h" @@ -760,6 +761,13 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in if (ctx->path[flb_sds_len(ctx->path) - 1] == '/') { ctx->path[flb_sds_len(ctx->path) - 1] = '\0'; } + + /* Initialize path record accessor for tag expansion */ + ctx->path_accessor = flb_ra_create(ctx->path, FLB_TRUE); + if (!ctx->path_accessor) { + flb_plg_error(ctx->ins, "cannot create path record accessor"); + return NULL; + } } /* database file for blob signal handling */ @@ -805,6 +813,10 @@ void flb_azure_blob_conf_destroy(struct flb_azure_blob *ctx) flb_sds_destroy(ctx->path); ctx->path = NULL; } + if (ctx->path_accessor) { + flb_ra_destroy(ctx->path_accessor); + ctx->path_accessor = NULL; + } if (ctx->decoded_sk) { flb_free(ctx->decoded_sk); diff --git a/plugins/out_azure_blob/azure_blob_uri.c b/plugins/out_azure_blob/azure_blob_uri.c index 75a643079aa..355dd1e51b9 100644 --- a/plugins/out_azure_blob/azure_blob_uri.c +++ b/plugins/out_azure_blob/azure_blob_uri.c @@ -134,17 +134,22 @@ flb_sds_t azb_uri_ensure_or_create_container(struct flb_azure_blob *ctx) return uri; } -flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx, char *tag) +flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx, + const char *path_prefix, + char *tag) { flb_sds_t uri; + const char *effective_path; uri = azb_uri_container(ctx); if (!uri) { return NULL; } - if (ctx->path) { - flb_sds_printf(&uri, "/%s/%s", ctx->path, tag); + effective_path = (path_prefix && path_prefix[0] != '\0') ? path_prefix : ctx->path; + + if (effective_path && effective_path[0] != '\0') { + flb_sds_printf(&uri, "/%s/%s", effective_path, tag); } else { flb_sds_printf(&uri, "/%s", tag); diff --git a/plugins/out_azure_blob/azure_blob_uri.h b/plugins/out_azure_blob/azure_blob_uri.h index 98ccc8f5b35..2dddc08e49b 100644 --- a/plugins/out_azure_blob/azure_blob_uri.h +++ b/plugins/out_azure_blob/azure_blob_uri.h @@ -27,7 +27,9 @@ flb_sds_t azb_uri_container(struct flb_azure_blob *ctx); flb_sds_t azb_uri_ensure_or_create_container(struct flb_azure_blob *ctx); -flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx, char *tag); +flb_sds_t azb_uri_create_blob(struct flb_azure_blob *ctx, + const char *path_prefix, + char *tag); flb_sds_t azb_uri_encode(const char *uri, size_t len); flb_sds_t azb_uri_decode(const char *uri, size_t len); diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index 45d769b9c25..03cec4bbbf5 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -59,6 +59,13 @@ set(UNIT_TESTS_FILES storage_dlq.c ) +if(FLB_OUT_AZURE_BLOB) + set(UNIT_TESTS_FILES + ${UNIT_TESTS_FILES} + azure_blob_path.c + ) +endif() + # TLS helpers if(FLB_TLS) set(UNIT_TESTS_FILES @@ -217,6 +224,10 @@ function(prepare_unit_tests TEST_PREFIX SOURCEFILES) target_link_libraries(${source_file_we} flb-aws) endif() + if(FLB_OUT_AZURE_BLOB AND "${source_file_we}" STREQUAL "flb-it-azure_blob_path") + target_link_libraries(${source_file_we} flb-plugin-out_azure_blob) + endif() + if(FLB_STREAM_PROCESSOR) target_link_libraries(${source_file_we} flb-sp) endif() diff --git a/tests/internal/azure_blob_path.c b/tests/internal/azure_blob_path.c new file mode 100644 index 00000000000..96ba7609513 --- /dev/null +++ b/tests/internal/azure_blob_path.c @@ -0,0 +1,218 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include + +#include +#include +#include + +#include "../../plugins/out_azure_blob/azure_blob.h" +#include "flb_tests_internal.h" + +static void ctx_cleanup(struct flb_azure_blob *ctx) +{ + if (ctx->path) { + flb_sds_destroy(ctx->path); + ctx->path = NULL; + } + + if (ctx->path_accessor) { + flb_ra_destroy(ctx->path_accessor); + ctx->path_accessor = NULL; + } +} + +void test_resolve_path_basic_tag(void) +{ + struct flb_azure_blob ctx; + flb_sds_t resolved = NULL; + const char *tag = "service.app"; + int ret; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.path = flb_sds_create("logs/$TAG"); + TEST_CHECK(ctx.path != NULL); + if (!ctx.path) { + return; + } + + ctx.path_accessor = flb_ra_create(ctx.path, FLB_TRUE); + TEST_CHECK(ctx.path_accessor != NULL); + if (!ctx.path_accessor) { + ctx_cleanup(&ctx); + return; + } + + ret = azb_resolve_path(&ctx, tag, (int) strlen(tag), NULL, &resolved); + TEST_CHECK(ret == 0); + TEST_CHECK(resolved != NULL); + + if (resolved) { + TEST_CHECK(strcmp(resolved, "logs/service.app") == 0); + flb_sds_destroy(resolved); + } + + ctx_cleanup(&ctx); +} + +void test_resolve_path_custom_delimiter(void) +{ + struct flb_azure_blob ctx; + flb_sds_t resolved = NULL; + const char *tag = "prod.backend"; + int ret; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.path = flb_sds_create("stream/$TAG[0]/$TAG[1]/$TAG"); + + TEST_CHECK(ctx.path != NULL); + if (!ctx.path) { + ctx_cleanup(&ctx); + return; + } + + ctx.path_accessor = flb_ra_create(ctx.path, FLB_TRUE); + TEST_CHECK(ctx.path_accessor != NULL); + if (!ctx.path_accessor) { + ctx_cleanup(&ctx); + return; + } + + ret = azb_resolve_path(&ctx, tag, (int) strlen(tag), NULL, &resolved); + TEST_CHECK(ret == 0); + TEST_CHECK(resolved != NULL); + + if (resolved) { + TEST_CHECK(strcmp(resolved, "stream/prod/backend/prod.backend") == 0); + flb_sds_destroy(resolved); + } + + ctx_cleanup(&ctx); +} + +void test_resolve_path_time_tokens(void) +{ + struct flb_azure_blob ctx; + struct flb_time ts; + flb_sds_t resolved = NULL; + const char *expect = "time/2025/11/17/987/987654321/987654321"; + int ret; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.path = flb_sds_create("time/%Y/%m/%d/%3N/%9N/%L"); + TEST_CHECK(ctx.path != NULL); + if (!ctx.path) { + return; + } + + ctx.path_accessor = flb_ra_create(ctx.path, FLB_TRUE); + TEST_CHECK(ctx.path_accessor != NULL); + if (!ctx.path_accessor) { + ctx_cleanup(&ctx); + return; + } + + flb_time_set(&ts, 1763382896, 987654321); + + ret = azb_resolve_path(&ctx, NULL, 0, &ts, &resolved); + TEST_CHECK(ret == 0); + TEST_CHECK(resolved != NULL); + + if (resolved) { + TEST_CHECK(strcmp(resolved, expect) == 0); + flb_sds_destroy(resolved); + } + + ctx_cleanup(&ctx); +} + +void test_resolve_path_uuid_token(void) +{ + struct flb_azure_blob ctx; + flb_sds_t resolved = NULL; + int ret; + size_t i; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.path = flb_sds_create("uuid/$UUID"); + TEST_CHECK(ctx.path != NULL); + if (!ctx.path) { + return; + } + + ctx.path_accessor = flb_ra_create(ctx.path, FLB_TRUE); + TEST_CHECK(ctx.path_accessor != NULL); + if (!ctx.path_accessor) { + ctx_cleanup(&ctx); + return; + } + + ret = azb_resolve_path(&ctx, "demo", 4, NULL, &resolved); + TEST_CHECK(ret == 0); + TEST_CHECK(resolved != NULL); + + if (resolved) { + const char *suffix; + + TEST_CHECK(strncmp(resolved, "uuid/", 5) == 0); + + suffix = resolved + 5; + TEST_CHECK(strlen(suffix) == 8); + TEST_CHECK(strstr(resolved, "$UUID") == NULL); + + for (i = 0; i < 8 && suffix[i] != '\0'; i++) { + TEST_CHECK(isalnum((unsigned char) suffix[i]) != 0); + } + + TEST_CHECK(i == 8 && suffix[8] == '\0'); + flb_sds_destroy(resolved); + } + + ctx_cleanup(&ctx); +} + +void test_resolve_path_empty_result(void) +{ + struct flb_azure_blob ctx; + flb_sds_t resolved = NULL; + int ret; + + memset(&ctx, 0, sizeof(ctx)); + + ctx.path = flb_sds_create("$TAG[5]"); + TEST_CHECK(ctx.path != NULL); + if (!ctx.path) { + return; + } + + ctx.path_accessor = flb_ra_create(ctx.path, FLB_TRUE); + TEST_CHECK(ctx.path_accessor != NULL); + if (!ctx.path_accessor) { + ctx_cleanup(&ctx); + return; + } + + ret = azb_resolve_path(&ctx, "a.b", 3, NULL, &resolved); + TEST_CHECK(ret == 0); + TEST_CHECK(resolved == NULL); + + if (resolved) { + flb_sds_destroy(resolved); + } + + ctx_cleanup(&ctx); +} + +TEST_LIST = { + {"resolve_path_basic_tag", test_resolve_path_basic_tag}, + {"resolve_path_custom_delimiter", test_resolve_path_custom_delimiter}, + {"resolve_path_time_tokens", test_resolve_path_time_tokens}, + {"resolve_path_uuid_token", test_resolve_path_uuid_token}, + {"resolve_path_empty_result", test_resolve_path_empty_result}, + {0} +};