Skip to content

Kafka Connect: KafkaMetadataTransform leaks configuration across instances via a static field #16601

@wombatu-kun

Description

@wombatu-kun

Problem

KafkaMetadataTransform stores its per-instance configuration in a static field:

private static RecordAppender recordAppender;
...
@Override
public void configure(Map<String, ?> configs) {
  recordAppender = getRecordAppender(configs);
}

recordAppender captures everything the SMT was configured with: the metadata field name/prefix, whether metadata is nested vs flattened, and any external field. Because the field is static, it is shared by every KafkaMetadataTransform instance in the worker JVM. Kafka Connect creates a separate transform instance per use and calls configure() on each, so the last configure() call wins for all instances.

Impact

When two or more KafkaMetadataTransform instances are configured differently in the same worker (for example two connectors that both use the SMT, or two transform aliases in one chain), every instance emits the field names and structure of whichever instance was configured last. The output is silently wrong: records get the wrong metadata field names, or the wrong nested/flat shape. It is also a thread-safety hazard, since the field is mutated from each task's configure() and read from each task's apply().

Sibling SMTs in the same module (CopyValue, DebeziumTransform) correctly hold their configuration in instance fields; KafkaMetadataTransform is the only one using a static field.

Reproduction

This test fails on current code. The second (last-configured) instance behaves correctly, but the first instance picks up the second's configuration:

@Test
public void testConfigIsNotSharedAcrossInstances() {
  SinkRecord record =
      new SinkRecord(
          TOPIC, PARTITION, null, null, null, VALUE_MAP, OFFSET, TIMESTAMP, TimestampType.CREATE_TIME);
  try (KafkaMetadataTransform first = new KafkaMetadataTransform();
      KafkaMetadataTransform second = new KafkaMetadataTransform()) {
    first.configure(ImmutableMap.of("field_name", "aaa"));
    second.configure(ImmutableMap.of("field_name", "bbb"));

    // the second (last-configured) instance works correctly today: it uses its own "bbb" prefix
    Map<?, ?> secondValue = (Map<?, ?>) second.apply(record).value();
    assertThat(secondValue.get("bbb_topic")).isEqualTo(TOPIC);
    assertThat(secondValue.get("aaa_topic")).isNull();

    // the first instance must also use its own "aaa" prefix, not the second's "bbb"; today this fails
    Map<?, ?> firstValue = (Map<?, ?>) first.apply(record).value();
    assertThat(firstValue.get("aaa_topic")).isEqualTo(TOPIC);
    assertThat(firstValue.get("bbb_topic")).isNull();
  }
}

Failure:

org.opentest4j.AssertionFailedError:
expected: "topic"
 but was: null

Fix

Make recordAppender an instance field (remove static). All reads are already in instance methods, so the change is minimal and matches the other SMTs in the module. There is no measurable cost: the appender is still built once per instance in configure().

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions