Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7883 add schema.namespace support to SetSchemaMetadata SMT in Kafka Connect #11442

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
Expand All @@ -35,29 +36,34 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T
private static final Logger log = LoggerFactory.getLogger(SetSchemaMetadata.class);

public static final String OVERVIEW_DOC =
"Set the schema name, version or both on the record's key (<code>" + Key.class.getName() + "</code>)"
+ " or value (<code>" + Value.class.getName() + "</code>) schema.";
"Set the name, namespace and version of the schema on the record's key " +
"(<code>" + Key.class.getName() + "</code>) or value " +
"(<code>" + Value.class.getName() + "</code>).";

private interface ConfigName {
String SCHEMA_NAME = "schema.name";
String SCHEMA_NAMESPACE = "schema.namespace";
String SCHEMA_VERSION = "schema.version";
}

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.SCHEMA_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Schema name to set.")
.define(ConfigName.SCHEMA_NAMESPACE, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Schema namespace to prepend to the schema name.")
.define(ConfigName.SCHEMA_VERSION, ConfigDef.Type.INT, null, ConfigDef.Importance.HIGH, "Schema version to set.");

private String schemaName;
private String schemaNamespace;
private Integer schemaVersion;

@Override
public void configure(Map<String, ?> configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
schemaName = config.getString(ConfigName.SCHEMA_NAME);
schemaNamespace = config.getString(ConfigName.SCHEMA_NAMESPACE);
schemaVersion = config.getInt(ConfigName.SCHEMA_VERSION);

if (schemaName == null && schemaVersion == null) {
throw new ConfigException("Neither schema name nor version configured");
if (schemaName == null && schemaNamespace == null && schemaVersion == null) {
throw new ConfigException("Neither schema name nor namespace nor version configured");
}
}

Expand All @@ -75,7 +81,10 @@ public R apply(R record) {
schema.type(),
schema.isOptional(),
schema.defaultValue(),
schemaName != null ? schemaName : schema.name(),
buildSchemaName(
schemaNamespace,
schemaName != null ? schemaName : schema.name()
),
schemaVersion != null ? schemaVersion : schema.version(),
schema.doc(),
schema.parameters(),
Expand Down Expand Up @@ -170,4 +179,18 @@ protected static Object updateSchemaIn(Object keyOrValue, Schema updatedSchema)
}
return keyOrValue;
}

protected static String buildSchemaName(final String namespace, final String name) {
if (Utils.isBlank(namespace)) {
return name;
}

String normalizedNamespace = namespace.trim().replaceAll("[.]+$", "");
if (Utils.isBlank(name)) {
return normalizedNamespace;
}

String normalizedName = name.trim().replaceAll("^[.]+", "");
return normalizedNamespace + "." + normalizedName;
}
}
Expand Up @@ -24,10 +24,14 @@
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand All @@ -37,82 +41,122 @@
public class SetSchemaMetadataTest {
private final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();

private static Stream<Arguments> buildSchemaNameTestParams() {
return Stream.of(
Arguments.of("foo.bar", "", "foo.bar"),
Arguments.of("foo.bar", null, "foo.bar"),
Arguments.of("foo.bar.", "", "foo.bar"),
Arguments.of("foo.bar...", "", "foo.bar"),
Arguments.of(".foo.bar", "", ".foo.bar"),
Arguments.of(" foo.bar. ", "", "foo.bar"),
Arguments.of("", "baz", "baz"),
Arguments.of(null, "baz", "baz"),
Arguments.of("", ".baz", ".baz"),
Arguments.of("", "...baz ", "...baz "),
Arguments.of("", " baz ", " baz "),
Arguments.of("foo.bar", "baz", "foo.bar.baz"),
Arguments.of("foo.bar", ".baz", "foo.bar.baz"),
Arguments.of("foo.bar", "...baz ", "foo.bar.baz"),
Arguments.of("foo.bar", " .baz ", "foo.bar.baz"),
Arguments.of("foo.bar", "baz.", "foo.bar.baz."),
Arguments.of("foo.bar", "foo.bar.baz", "foo.bar.foo.bar.baz"),
Arguments.of("", "", ""),
Arguments.of(null, "", ""),
Arguments.of("", null, null),
Arguments.of(null, null, null)
);
}

@AfterEach
public void teardown() {
xform.close();
}

@Test
public void schemaNameUpdate() {
xform.configure(Collections.singletonMap("schema.name", "foo"));
final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
final SinkRecord updatedRecord = xform.apply(record);
assertEquals("foo", updatedRecord.valueSchema().name());
@ParameterizedTest
@MethodSource("buildSchemaNameTestParams")
public void buildSchemaNameShouldBeTolerantToRedundantDotsAndSpaces(String namespace,
String schemaName,
String expected) {
String actual = SetSchemaMetadata.buildSchemaName(namespace, schemaName);
assertEquals(expected, actual);
}

@Test
public void schemaVersionUpdate() {
xform.configure(Collections.singletonMap("schema.version", 42));
final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
final SinkRecord updatedRecord = xform.apply(record);
assertEquals(Integer.valueOf(42), updatedRecord.valueSchema().version());
public void shouldTakeSchemaNameFromConfig() {
final SinkRecord record = buildRecordAndApplyConfig(
SchemaBuilder.struct().build(), Collections.singletonMap("schema.name", "foo")
);

assertEquals("foo", record.valueSchema().name());
}

@Test
public void schemaNameAndVersionUpdate() {
final Map<String, String> props = new HashMap<>();
props.put("schema.name", "foo");
props.put("schema.version", "42");
public void shouldTakeSchemaNamespaceFromConfig() {
final SinkRecord record = buildRecordAndApplyConfig(
SchemaBuilder.struct().build(), Collections.singletonMap("schema.namespace", "foo.bar")
);

xform.configure(props);
assertEquals("foo.bar", record.valueSchema().name());
}

final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
@Test
public void shouldTakeSchemaNameWithNamespaceFromConfig() {
final Map<String, String> config = new HashMap<>();
config.put("schema.namespace", "foo.bar");
config.put("schema.name", "baz");
Schema schema = SchemaBuilder.struct().build();

final SinkRecord updatedRecord = xform.apply(record);
final SinkRecord record = buildRecordAndApplyConfig(schema, config);

assertEquals("foo", updatedRecord.valueSchema().name());
assertEquals(Integer.valueOf(42), updatedRecord.valueSchema().version());
assertEquals("foo.bar.baz", record.valueSchema().name());
}

@Test
public void schemaNameAndVersionUpdateWithStruct() {
final String fieldName1 = "f1";
final String fieldName2 = "f2";
final String fieldValue1 = "value1";
final int fieldValue2 = 1;
final Schema schema = SchemaBuilder.struct()
.name("my.orig.SchemaDefn")
.field(fieldName1, Schema.STRING_SCHEMA)
.field(fieldName2, Schema.INT32_SCHEMA)
.build();
final Struct value = new Struct(schema).put(fieldName1, fieldValue1).put(fieldName2, fieldValue2);
public void shouldTakeSchemaNameWithNamespaceAndVersionFromConfig() {
final Map<String, String> config = new HashMap<>();
config.put("schema.namespace", "foo.bar");
config.put("schema.name", "baz");
config.put("schema.version", "42");
Schema schema = SchemaBuilder.struct().build();

final Map<String, String> props = new HashMap<>();
props.put("schema.name", "foo");
props.put("schema.version", "42");
xform.configure(props);
final SinkRecord record = buildRecordAndApplyConfig(schema, config);

final SinkRecord record = new SinkRecord("", 0, null, null, schema, value, 0);
assertEquals("foo.bar.baz", record.valueSchema().name());
assertEquals(42, record.valueSchema().version());
}

final SinkRecord updatedRecord = xform.apply(record);
@Test
public void shouldPrependNamespaceFromConfigToCurrentSchemaName() {
final SinkRecord record = buildRecordAndApplyConfig(
SchemaBuilder.struct().name("baz").build(), Collections.singletonMap("schema.namespace", "foo.bar")
);

assertEquals("foo", updatedRecord.valueSchema().name());
assertEquals(Integer.valueOf(42), updatedRecord.valueSchema().version());
assertEquals("foo.bar.baz", record.valueSchema().name());
}

// Make sure the struct's schema and fields all point to the new schema
assertMatchingSchema((Struct) updatedRecord.value(), updatedRecord.valueSchema());
@Test
public void shouldOverrideSchemaNameWithNamespaceAndVersionFromConfig() {
final Map<String, String> config = new HashMap<>();
config.put("schema.namespace", "foo.bar");
config.put("schema.name", "baz");
config.put("schema.version", "42");
Schema schema = SchemaBuilder.struct().name("non.baz").version(0).build();

final SinkRecord record = buildRecordAndApplyConfig(schema, config);

assertEquals("foo.bar.baz", record.valueSchema().name());
assertEquals(42, record.valueSchema().version());
}

@Test
public void valueSchemaRequired() {
public void shouldFailOnVersionWithMissingSchema() {
final SinkRecord record = new SinkRecord("", 0, null, null, null, 42, 0);
assertThrows(DataException.class, () -> xform.apply(record));
assertThrows(DataException.class, () -> xform.apply(record), "Schema required for [updating schema metadata]");
}

@Test
public void ignoreRecordWithNullValue() {
public void shouldNotFailOnEmptyRecord() {
final SinkRecord record = new SinkRecord("", 0, null, null, null, null, 0);

final SinkRecord updatedRecord = xform.apply(record);

assertNull(updatedRecord.key());
Expand All @@ -121,6 +165,33 @@ public void ignoreRecordWithNullValue() {
assertNull(updatedRecord.valueSchema());
}

@Test
public void schemaNameWithNamespaceAndVersionUpdateWithStruct() {
final Schema schema = SchemaBuilder.struct()
.name("my.origin.schema")
.field("stringField", Schema.STRING_SCHEMA)
.field("intField", Schema.INT32_SCHEMA)
.build();
final Struct value = new Struct(schema)
.put("stringField", "value")
.put("intField", 1);

final Map<String, String> props = new HashMap<>();
props.put("schema.namespace", "foo.bar");
props.put("schema.name", "baz");
props.put("schema.version", "42");
xform.configure(props);

final SinkRecord record = new SinkRecord("", 0, null, null, schema, value, 0);
final SinkRecord updatedRecord = xform.apply(record);
Struct newValue = (Struct) updatedRecord.value();

assertEquals("foo.bar.baz", updatedRecord.valueSchema().name());
assertEquals(42, updatedRecord.valueSchema().version());
// Make sure the struct's schema and fields all point to the new schema
assertMatchingSchema(newValue.schema(), updatedRecord.valueSchema());
}

@Test
public void updateSchemaOfStruct() {
final String fieldName1 = "f1";
Expand All @@ -141,7 +212,7 @@ public void updateSchemaOfStruct() {
.build();

Struct newValue = (Struct) SetSchemaMetadata.updateSchemaIn(value, newSchema);
assertMatchingSchema(newValue, newSchema);
assertMatchingSchema(newValue.schema(), newSchema);
}

@Test
Expand All @@ -151,20 +222,27 @@ public void updateSchemaOfNonStruct() {
assertSame(value, updatedValue);
}

@SuppressWarnings("ConstantConditions")
@Test
public void updateSchemaOfNull() {
Object updatedValue = SetSchemaMetadata.updateSchemaIn(null, Schema.INT32_SCHEMA);
assertNull(updatedValue);
}

protected void assertMatchingSchema(Struct value, Schema schema) {
assertSame(schema, value.schema());
assertEquals(schema.name(), value.schema().name());
for (Field field : schema.fields()) {
protected void assertMatchingSchema(Schema actual, Schema expected) {
assertSame(expected, actual);
assertEquals(expected.name(), actual.name());
for (Field field : expected.fields()) {
String fieldName = field.name();
assertEquals(schema.field(fieldName).name(), value.schema().field(fieldName).name());
assertEquals(schema.field(fieldName).index(), value.schema().field(fieldName).index());
assertSame(schema.field(fieldName).schema(), value.schema().field(fieldName).schema());
assertEquals(expected.field(fieldName).name(), actual.field(fieldName).name());
assertEquals(expected.field(fieldName).index(), actual.field(fieldName).index());
assertSame(expected.field(fieldName).schema(), actual.field(fieldName).schema());
}
}

private SinkRecord buildRecordAndApplyConfig(Schema recordSchema, Map<String, String> xformConfig) {
xform.configure(xformConfig);
final SinkRecord record = new SinkRecord("", 0, null, null, recordSchema, null, 0);
return xform.apply(record);
}
}
2 changes: 1 addition & 1 deletion docs/connect.html
Expand Up @@ -174,7 +174,7 @@ <h5><a id="connect_included_transformation" href="#connect_included_transformati
<li>ValueToKey - Replace the record key with a new key formed from a subset of fields in the record value</li>
<li>HoistField - Wrap the entire event as a single field inside a Struct or a Map</li>
<li>ExtractField - Extract a specific field from Struct and Map and include only this field in results</li>
<li>SetSchemaMetadata - modify the schema name or version</li>
<li>SetSchemaMetadata - modify the schema name and version or just the schema namespace</li>
<li>TimestampRouter - Modify the topic of a record based on original topic and timestamp. Useful when using a sink that needs to write to different tables or indexes based on timestamps</li>
<li>RegexRouter - modify the topic of a record based on original topic, replacement string and a regular expression</li>
<li>Filter - Removes messages from all further processing. This is used with a <a href="#connect_predicates">predicate</a> to selectively filter certain messages.</li>
Expand Down