Skip to content

kv: add new flb_kv_item_set() api (upsert)#10894

Merged
edsiper merged 3 commits intomasterfrom
kv_item_set
Sep 18, 2025
Merged

kv: add new flb_kv_item_set() api (upsert)#10894
edsiper merged 3 commits intomasterfrom
kv_item_set

Conversation

@edsiper
Copy link
Copy Markdown
Member

@edsiper edsiper commented Sep 18, 2025

Fixes #10866

This PR introduces a new upsert-style API for the kv interface. In addition we modify OpenTelemetry output plugin to use this new function when adding labels to avoid duplicates.


Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features
    • Enhanced key-value handling to support update-or-add behavior, improving configuration robustness across components.
  • Bug Fixes
    • OpenTelemetry output: duplicate label keys now overwrite previous values instead of creating multiple entries, ensuring a single, correct value per key.
  • Tests
    • Added unit tests validating duplicate key overwrite behavior in key-value lists.

Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
@edsiper edsiper added this to the Fluent Bit v4.1 milestone Sep 18, 2025
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Sep 18, 2025

Walkthrough

Introduces a new KV API function flb_kv_item_set for update-or-insert behavior, updates the OpenTelemetry output plugin to use it for labels, and adds corresponding unit tests and build integration to prevent duplicate label keys.

Changes

Cohort / File(s) Summary of Changes
KV setter API
include/fluent-bit/flb_kv.h, src/flb_kv.c
Added declaration and implementation of flb_kv_item_set(list, k_buf, v_buf) to update existing key values or create a new item if absent; includes key lookup, value replacement, and error handling.
OpenTelemetry labels handling
plugins/out_opentelemetry/opentelemetry_conf.c
Replaced flb_kv_item_create with flb_kv_item_set in config_add_labels to ensure key uniqueness by updating existing entries.
Internal tests
tests/internal/CMakeLists.txt, tests/internal/kv.c
Added kv.c to unit test build and introduced a test validating duplicate key insertion results in a single entry with the latest value.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor User as User config
  participant OTel as OpenTelemetry plugin (config_add_labels)
  participant KV as flb_kv_item_set
  participant List as KV List

  User->>OTel: add_label entries
  loop For each label (k, v)
    OTel->>KV: flb_kv_item_set(List, k, v)
    KV->>List: Scan for key k (case-insensitive)
    alt Key exists
      KV->>List: Replace value for k
      KV-->>OTel: Return kv item
    else Key not found
      KV->>List: Create new kv item (k, v)
      KV-->>OTel: Return kv item
    end
    alt Allocation error
      KV-->>OTel: Return NULL
      OTel->>OTel: Log error, abort with -1
    end
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

I hop through keys in moonlit sets,
One “color” now—no duplicates.
Blue hops out as green hops in,
A tidy list, no twin or twin.
With whiskers twitching, tests all pass—
Carrots cached, unique-as-grass. 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title "kv: add new flb_kv_item_set() api (upsert)" concisely and accurately describes the primary change—adding an upsert-style KV API. It names the new public symbol and the intent (upsert) without extraneous detail, making it clear to reviewers. The title is appropriate for the changeset.
Linked Issues Check ✅ Passed The PR adds flb_kv_item_set with update-or-create semantics, updates plugins/out_opentelemetry/opentelemetry_conf.c to call that API when adding labels, and includes a unit test that verifies duplicate keys overwrite the previous value; together these changes address issue #10866 by preventing duplicate label keys at insertion time. The code-level change and the test map directly to the linked issue's objective to enforce unique attribute keys for add_label. I therefore assess the PR as compliant with the linked issue.
Out of Scope Changes Check ✅ Passed All modifications are limited to adding the new KV API (header and implementation), switching the OpenTelemetry plugin to use it, and adding a unit test plus CMake entry; there are no edits to unrelated plugins, features, or core behavior. The change set is narrowly scoped to the linked issue's intent and does not introduce extraneous changes. I detect no out-of-scope changes.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch kv_item_set

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
include/fluent-bit/flb_kv.h (1)

39-40: Clarify API semantics (case-insensitive match, NULL/empty value) and future-proof signature.

Document that matching is case-insensitive and define behavior when v_buf is NULL or empty. Optionally add const qualifiers later (kept out here to avoid wider churn).

 struct flb_kv *flb_kv_item_create(struct mk_list *list,
                                   char *k_buf, char *v_buf);
-struct flb_kv *flb_kv_item_set(struct mk_list *list,
-                               char *k_buf, char *v_buf);
+/* Upsert key/value into list:
+ * - Key match is case-insensitive.
+ * - If v_buf is NULL or empty, the value is unset (kv->val == NULL).
+ * - Returns existing/new item on success; NULL on allocation failure (no removal).
+ */
+struct flb_kv *flb_kv_item_set(struct mk_list *list,
+                               char *k_buf, char *v_buf);
plugins/out_opentelemetry/opentelemetry_conf.c (1)

170-174: Nit: adjust log text to reflect “set” (not “append”).

Small UX polish for clarity.

-        if (!kv) {
-            flb_plg_error(ins, "could not append label %s=%s\n", k->str, v->str);
+        if (!kv) {
+            flb_plg_error(ins, "could not set label %s=%s\n", k->str, v->str);
             return -1;
         }
tests/internal/kv.c (1)

1-34: Add tests for NULL/empty value semantics and case-insensitive overwrite.

Covers edge cases and guards against regressions.

@@
 static void test_kv_item_set_duplicate()
 {
@@
 }
 
+static void test_kv_item_set_null_and_empty()
+{
+    struct mk_list list;
+    const char *val;
+
+    flb_kv_init(&list);
+
+    /* NULL value -> unset (kv->val == NULL) */
+    TEST_CHECK(flb_kv_item_set(&list, "mode", NULL) != NULL);
+    TEST_CHECK(mk_list_size(&list) == 1);
+    val = flb_kv_get_key_value("mode", &list);
+    TEST_CHECK(val == NULL);
+
+    /* Non-empty value */
+    TEST_CHECK(flb_kv_item_set(&list, "mode", "on") != NULL);
+    val = flb_kv_get_key_value("mode", &list);
+    TEST_CHECK(val != NULL && strcmp(val, "on") == 0);
+
+    /* Empty string -> unset (kv->val == NULL) */
+    TEST_CHECK(flb_kv_item_set(&list, "mode", "") != NULL);
+    val = flb_kv_get_key_value("mode", &list);
+    TEST_CHECK(val == NULL);
+
+    flb_kv_release(&list);
+}
+
+static void test_kv_item_set_case_insensitive()
+{
+    struct mk_list list;
+    const char *val;
+
+    flb_kv_init(&list);
+
+    TEST_CHECK(flb_kv_item_set(&list, "Color", "red") != NULL);
+    TEST_CHECK(flb_kv_item_set(&list, "color", "blue") != NULL);
+
+    TEST_CHECK(mk_list_size(&list) == 1);
+    val = flb_kv_get_key_value("COLOR", &list);
+    TEST_CHECK(val != NULL && strcmp(val, "blue") == 0);
+
+    flb_kv_release(&list);
+}
@@
 TEST_LIST = {
     {"kv_item_set_duplicate", test_kv_item_set_duplicate},
+    {"kv_item_set_null_and_empty", test_kv_item_set_null_and_empty},
+    {"kv_item_set_case_insensitive", test_kv_item_set_case_insensitive},
     {0}
 };
src/flb_kv.c (1)

79-107: Optional: pre-check length to avoid full strcasecmp on mismatched keys.

Micro-optimization in long lists; not critical.

-        if (strcasecmp(kv->key, k_buf) == 0) {
+        if (flb_sds_len(kv->key) == (v_buf ? strlen(k_buf) : strlen(k_buf)) &&
+            strcasecmp(kv->key, k_buf) == 0) {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 054ac4e and dcfbf27.

📒 Files selected for processing (5)
  • include/fluent-bit/flb_kv.h (1 hunks)
  • plugins/out_opentelemetry/opentelemetry_conf.c (1 hunks)
  • src/flb_kv.c (1 hunks)
  • tests/internal/CMakeLists.txt (1 hunks)
  • tests/internal/kv.c (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
tests/internal/kv.c (1)
src/flb_kv.c (4)
  • flb_kv_init (25-28)
  • flb_kv_item_set (79-107)
  • flb_kv_get_key_value (135-162)
  • flb_kv_release (123-133)
include/fluent-bit/flb_kv.h (1)
src/flb_kv.c (1)
  • flb_kv_item_set (79-107)
plugins/out_opentelemetry/opentelemetry_conf.c (1)
src/flb_kv.c (1)
  • flb_kv_item_set (79-107)
src/flb_kv.c (2)
src/flb_sds.c (2)
  • flb_sds_destroy (389-399)
  • flb_sds_create (78-90)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (29)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
  • GitHub Check: PR - fuzzing test
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-centos-7
🔇 Additional comments (3)
tests/internal/CMakeLists.txt (1)

13-13: Test wiring LGTM.

Adding kv.c to internal tests is correct.

plugins/out_opentelemetry/opentelemetry_conf.c (1)

155-175: Behavioral check: dedupe is last-wins and preserves item order. OK for OTLP?

Using flb_kv_item_set ensures uniqueness with last value winning; the existing node is updated in place (order preserved). Confirm this matches OpenTelemetry expectations for attributes added via add_label.

src/flb_kv.c (1)

61-77: Consistency note: create() treats empty value as unset (NULL).

The patch above aligns set() to the same rule. If you intentionally want empty strings stored, we can switch to creating zero-length SDS and update tests accordingly.

Comment on lines +89 to +107
mk_list_foreach(head, list) {
kv = mk_list_entry(head, struct flb_kv, _head);
if (strcasecmp(kv->key, k_buf) == 0) {
if (kv->val) {
flb_sds_destroy(kv->val);
}
kv->val = flb_sds_create(v_buf);
if (!kv->val) {
mk_list_del(&kv->_head);
flb_sds_destroy(kv->key);
flb_free(kv);
return NULL;
}
return kv;
}
}

return flb_kv_item_create(list, k_buf, v_buf);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Don’t delete existing entry on allocation failure; align empty/NULL value semantics.

Current code destroys the old value and removes the entire kv on failure to allocate the new value, causing data loss. Also, it creates an empty SDS for NULL/empty values, diverging from create() semantics (which leave kv->val == NULL). Fix both.

 struct flb_kv *flb_kv_item_set(struct mk_list *list,
                                char *k_buf, char *v_buf)
 {
-    struct mk_list *head;
-    struct flb_kv *kv;
-
-    if (!k_buf) {
-        return NULL;
-    }
-
-    mk_list_foreach(head, list) {
-        kv = mk_list_entry(head, struct flb_kv, _head);
-        if (strcasecmp(kv->key, k_buf) == 0) {
-            if (kv->val) {
-                flb_sds_destroy(kv->val);
-            }
-            kv->val = flb_sds_create(v_buf);
-            if (!kv->val) {
-                mk_list_del(&kv->_head);
-                flb_sds_destroy(kv->key);
-                flb_free(kv);
-                return NULL;
-            }
-            return kv;
-        }
-    }
-
-    return flb_kv_item_create(list, k_buf, v_buf);
+    struct mk_list *head;
+    struct flb_kv *kv;
+    size_t v_len = 0;
+    flb_sds_t new_val = NULL;
+
+    if (!k_buf) {
+        return NULL;
+    }
+    if (v_buf != NULL) {
+        v_len = strlen(v_buf);
+    }
+
+    mk_list_foreach(head, list) {
+        kv = mk_list_entry(head, struct flb_kv, _head);
+        if (strcasecmp(kv->key, k_buf) == 0) {
+            /* Empty/NULL => unset value (kv->val == NULL), keep entry */
+            if (v_len == 0) {
+                if (kv->val) {
+                    flb_sds_destroy(kv->val);
+                    kv->val = NULL;
+                }
+                return kv;
+            }
+            /* Allocate new first; preserve old on failure */
+            new_val = flb_sds_create_len(v_buf, v_len);
+            if (!new_val) {
+                return NULL;
+            }
+            if (kv->val) {
+                flb_sds_destroy(kv->val);
+            }
+            kv->val = new_val;
+            return kv;
+        }
+    }
+
+    /* Not found: creation already handles NULL/empty values */
+    return flb_kv_item_create(list, k_buf, v_buf);
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
mk_list_foreach(head, list) {
kv = mk_list_entry(head, struct flb_kv, _head);
if (strcasecmp(kv->key, k_buf) == 0) {
if (kv->val) {
flb_sds_destroy(kv->val);
}
kv->val = flb_sds_create(v_buf);
if (!kv->val) {
mk_list_del(&kv->_head);
flb_sds_destroy(kv->key);
flb_free(kv);
return NULL;
}
return kv;
}
}
return flb_kv_item_create(list, k_buf, v_buf);
}
struct flb_kv *flb_kv_item_set(struct mk_list *list,
char *k_buf, char *v_buf)
{
struct mk_list *head;
struct flb_kv *kv;
size_t v_len = 0;
flb_sds_t new_val = NULL;
if (!k_buf) {
return NULL;
}
if (v_buf != NULL) {
v_len = strlen(v_buf);
}
mk_list_foreach(head, list) {
kv = mk_list_entry(head, struct flb_kv, _head);
if (strcasecmp(kv->key, k_buf) == 0) {
/* Empty/NULL => unset value (kv->val == NULL), keep entry */
if (v_len == 0) {
if (kv->val) {
flb_sds_destroy(kv->val);
kv->val = NULL;
}
return kv;
}
/* Allocate new first; preserve old on failure */
new_val = flb_sds_create_len(v_buf, v_len);
if (!new_val) {
return NULL;
}
if (kv->val) {
flb_sds_destroy(kv->val);
}
kv->val = new_val;
return kv;
}
}
/* Not found: creation already handles NULL/empty values */
return flb_kv_item_create(list, k_buf, v_buf);
}

@edsiper edsiper merged commit a638bc9 into master Sep 18, 2025
56 checks passed
@edsiper edsiper deleted the kv_item_set branch September 18, 2025 02:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

OpenTelemetry plugin should not allow duplicate labels for labels defined via add_label

1 participant