Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3209: KIP-66: more single message transforms #2374

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.connect.transforms.InsertField;
import org.apache.kafka.connect.transforms.MaskField;
import org.apache.kafka.connect.transforms.RegexRouter;
import org.apache.kafka.connect.transforms.ReplaceField;
import org.apache.kafka.connect.transforms.SetSchemaMetadata;
import org.apache.kafka.connect.transforms.TimestampRouter;
import org.apache.kafka.connect.transforms.ValueToKey;
Expand All @@ -47,6 +48,7 @@ private DocInfo(String transformationName, String overview, ConfigDef configDef)

private static final List<DocInfo> TRANSFORMATIONS = Arrays.asList(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will probably grow outdated quickly. Can we use the classpath discovery code we already have to generate this list automatically?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nm, I guess the issue is that we can get the class name and ConfigDef via the Transformation interface, but not the docstring. I wonder if a simple naming assumption and reflection could more cleanly solve this than manually maintaining the list.

Copy link
Contributor Author

@shikhar shikhar Jan 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was trying with reflection initially, however not sure how to handleKey/Value variants since the PluginDiscovery business only returns concrete impls currently, and I want to include a single entry for the SMT in these cases. It could be made to work but not sure if the magic is worth it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I also like that the direct static dependency on Xform.OVERVIEW_DOC prevents it from appearing as an unused field in IntelliJ :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if the gymnastics are too complicated, maintaining it manually seems fine. We probably won't need to change this very often anyway.

new DocInfo(InsertField.class.getName(), InsertField.OVERVIEW_DOC, InsertField.CONFIG_DEF),
new DocInfo(ReplaceField.class.getName(), ReplaceField.OVERVIEW_DOC, ReplaceField.CONFIG_DEF),
new DocInfo(MaskField.class.getName(), MaskField.OVERVIEW_DOC, MaskField.CONFIG_DEF),
new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, ValueToKey.CONFIG_DEF),
new DocInfo(HoistField.class.getName(), HoistField.OVERVIEW_DOC, HoistField.CONFIG_DEF),
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.kafka.connect.transforms.util.SchemaUtil;

import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -165,24 +166,31 @@ private R applyWithSchema(R record) {

final Struct updatedValue = new Struct(updatedSchema);

copyFields(value, updatedValue);
for (Field field : value.schema().fields()) {
updatedValue.put(field.name(), value.get(field));
}

insertFields(record, updatedValue);
if (topicField != null) {
updatedValue.put(topicField.name, record.topic());
}
if (partitionField != null && record.kafkaPartition() != null) {
updatedValue.put(partitionField.name, record.kafkaPartition());
}
if (offsetField != null) {
updatedValue.put(offsetField.name, requireSinkRecord(record, PURPOSE).kafkaOffset());
}
if (timestampField != null && record.timestamp() != null) {
updatedValue.put(timestampField.name, new Date(record.timestamp()));
}
if (staticField != null && staticValue != null) {
updatedValue.put(staticField.name, staticValue);
}

return newRecord(record, updatedSchema, updatedValue);
}

private Schema makeUpdatedSchema(Schema schema) {
final SchemaBuilder builder = SchemaBuilder.struct();

builder.name(schema.name());
builder.version(schema.version());
builder.doc(schema.doc());

final Map<String, String> params = schema.parameters();
if (params != null) {
builder.parameters(params);
}
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());

for (Field field : schema.fields()) {
builder.field(field.name(), field.schema());
Expand All @@ -207,30 +215,6 @@ private Schema makeUpdatedSchema(Schema schema) {
return builder.build();
}

private void copyFields(Struct value, Struct updatedValue) {
for (Field field : value.schema().fields()) {
updatedValue.put(field.name(), value.get(field));
}
}

private void insertFields(R record, Struct value) {
if (topicField != null) {
value.put(topicField.name, record.topic());
}
if (partitionField != null && record.kafkaPartition() != null) {
value.put(partitionField.name, record.kafkaPartition());
}
if (offsetField != null) {
value.put(offsetField.name, requireSinkRecord(record, PURPOSE).kafkaOffset());
}
if (timestampField != null && record.timestamp() != null) {
value.put(timestampField.name, new Date(record.timestamp()));
}
if (staticField != null && staticValue != null) {
value.put(staticField.name, staticValue);
}
}

@Override
public void close() {
schemaUpdateCache = null;
Expand Down
Expand Up @@ -92,7 +92,7 @@ private R applySchemaless(R record) {
for (String field : maskedFields) {
updatedValue.put(field, masked(value.get(field)));
}
return updatedRecord(record, updatedValue);
return newRecord(record, updatedValue);
}

private R applyWithSchema(R record) {
Expand All @@ -102,7 +102,7 @@ private R applyWithSchema(R record) {
final Object origFieldValue = value.get(field);
updatedValue.put(field, maskedFields.contains(field.name()) ? masked(origFieldValue) : origFieldValue);
}
return updatedRecord(record, updatedValue);
return newRecord(record, updatedValue);
}

private static Object masked(Object value) {
Expand Down Expand Up @@ -133,7 +133,7 @@ public void close() {

protected abstract Object operatingValue(R record);

protected abstract R updatedRecord(R base, Object value);
protected abstract R newRecord(R base, Object value);

public static final class Key<R extends ConnectRecord<R>> extends MaskField<R> {
@Override
Expand All @@ -147,7 +147,7 @@ protected Object operatingValue(R record) {
}

@Override
protected R updatedRecord(R record, Object updatedValue) {
protected R newRecord(R record, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), updatedValue, record.valueSchema(), record.value(), record.timestamp());
}
}
Expand All @@ -164,7 +164,7 @@ protected Object operatingValue(R record) {
}

@Override
protected R updatedRecord(R record, Object updatedValue) {
protected R newRecord(R record, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), updatedValue, record.timestamp());
}
}
Expand Down
@@ -0,0 +1,230 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.connect.transforms;

import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;

public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transformation<R> {

public static final String OVERVIEW_DOC = "Filter or rename fields.";

interface ConfigName {
String BLACKLIST = "blacklist";
String WHITELIST = "whitelist";
String RENAME = "renames";
}

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.BLACKLIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
"Fields to exclude. This takes precedence over the whitelist.")
.define(ConfigName.WHITELIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
"Fields to include. If specified, only these fields will be used.")
.define(ConfigName.RENAME, ConfigDef.Type.LIST, Collections.emptyList(), new ConfigDef.Validator() {
@Override
public void ensureValid(String name, Object value) {
parseRenameMappings((List<String>) value);
}

@Override
public String toString() {
return "list of colon-delimited pairs, e.g. <code>foo:bar,abc:xyz</code>";
}
}, ConfigDef.Importance.MEDIUM, "Field rename mappings.");

private static final String PURPOSE = "field replacement";

private List<String> blacklist;
private List<String> whitelist;
private Map<String, String> renames;
private Map<String, String> reverseRenames;

private Cache<Schema, Schema> schemaUpdateCache;

@Override
public void configure(Map<String, ?> configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
blacklist = config.getList(ConfigName.BLACKLIST);
whitelist = config.getList(ConfigName.WHITELIST);
renames = parseRenameMappings(config.getList(ConfigName.RENAME));
reverseRenames = invert(renames);

schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
}

static Map<String, String> parseRenameMappings(List<String> mappings) {
final Map<String, String> m = new HashMap<>();
for (String mapping : mappings) {
final String[] parts = mapping.split(":");
if (parts.length != 2) {
throw new ConfigException(ConfigName.RENAME, mappings, "Invalid rename mapping: " + mapping);
}
m.put(parts[0], parts[1]);
}
return m;
}

static Map<String, String> invert(Map<String, String> source) {
final Map<String, String> m = new HashMap<>();
for (Map.Entry<String, String> e : source.entrySet()) {
m.put(e.getValue(), e.getKey());
}
return m;
}

boolean filter(String fieldName) {
return !blacklist.contains(fieldName) && (whitelist.isEmpty() || whitelist.contains(fieldName));
}

String renamed(String fieldName) {
final String mapping = renames.get(fieldName);
return mapping == null ? fieldName : mapping;
}

String reverseRenamed(String fieldName) {
final String mapping = reverseRenames.get(fieldName);
return mapping == null ? fieldName : mapping;
}

@Override
public R apply(R record) {
if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
}
}

private R applySchemaless(R record) {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);

final Map<String, Object> updatedValue = new HashMap<>(value.size());

for (Map.Entry<String, Object> e : value.entrySet()) {
final String fieldName = e.getKey();
if (filter(fieldName)) {
final Object fieldValue = e.getValue();
updatedValue.put(renamed(fieldName), fieldValue);
}
}

return newRecord(record, null, updatedValue);
}

private R applyWithSchema(R record) {
final Struct value = requireStruct(operatingValue(record), PURPOSE);

Schema updatedSchema = schemaUpdateCache.get(value.schema());
if (updatedSchema == null) {
updatedSchema = makeUpdatedSchema(value.schema());
schemaUpdateCache.put(value.schema(), updatedSchema);
}

final Struct updatedValue = new Struct(updatedSchema);

for (Field field : updatedSchema.fields()) {
final Object fieldValue = value.get(reverseRenamed(field.name()));
updatedValue.put(field.name(), fieldValue);
}

return newRecord(record, updatedSchema, updatedValue);
}

private Schema makeUpdatedSchema(Schema schema) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
for (Field field : schema.fields()) {
if (filter(field.name())) {
builder.field(renamed(field.name()), field.schema());
}
}
return builder.build();
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

@Override
public void close() {
schemaUpdateCache = null;
}

protected abstract Schema operatingSchema(R record);

protected abstract Object operatingValue(R record);

protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);

public static class Key<R extends ConnectRecord<R>> extends ReplaceField<R> {

@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}

@Override
protected Object operatingValue(R record) {
return record.key();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}

}

public static class Value<R extends ConnectRecord<R>> extends ReplaceField<R> {

@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}

@Override
protected Object operatingValue(R record) {
return record.value();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
}

}

}