Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3209: KIP-66: more single message transforms #2374

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ project(':core') {

task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs',
'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs',
':streams:genStreamsConfigDocs'], type: Tar) {
classifier = 'site-docs'
compression = Compression.GZIP
Expand Down Expand Up @@ -948,6 +948,13 @@ project(':connect:runtime') {
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "connect_config.html").newOutputStream()
}

task genConnectTransformationDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.connect.tools.TransformationDoc'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "connect_transforms.html").newOutputStream()
}
}

project(':connect:file') {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.connect.tools;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.HoistField;
import org.apache.kafka.connect.transforms.InsertField;
import org.apache.kafka.connect.transforms.MaskField;
import org.apache.kafka.connect.transforms.RegexRouter;
import org.apache.kafka.connect.transforms.SetSchemaMetadata;
import org.apache.kafka.connect.transforms.TimestampRouter;
import org.apache.kafka.connect.transforms.ValueToKey;

import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;

public class TransformationDoc {

private static final class DocInfo {
final String transformationName;
final String overview;
final ConfigDef configDef;

private DocInfo(String transformationName, String overview, ConfigDef configDef) {
this.transformationName = transformationName;
this.overview = overview;
this.configDef = configDef;
}
}

private static final List<DocInfo> TRANSFORMATIONS = Arrays.asList(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will probably grow outdated quickly. Can we use the classpath discovery code we already have to generate this list automatically?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nm, I guess the issue is that we can get the class name and ConfigDef via the Transformation interface, but not the docstring. I wonder if a simple naming assumption and reflection could more cleanly solve this than manually maintaining the list.

Copy link
Contributor Author

@shikhar shikhar Jan 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was trying with reflection initially, however not sure how to handleKey/Value variants since the PluginDiscovery business only returns concrete impls currently, and I want to include a single entry for the SMT in these cases. It could be made to work but not sure if the magic is worth it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I also like that the direct static dependency on Xform.OVERVIEW_DOC prevents it from appearing as an unused field in IntelliJ :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if the gymnastics are too complicated, maintaining it manually seems fine. We probably won't need to change this very often anyway.

new DocInfo(InsertField.class.getName(), InsertField.OVERVIEW_DOC, InsertField.CONFIG_DEF),
new DocInfo(MaskField.class.getName(), MaskField.OVERVIEW_DOC, MaskField.CONFIG_DEF),
new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, ValueToKey.CONFIG_DEF),
new DocInfo(HoistField.class.getName(), HoistField.OVERVIEW_DOC, HoistField.CONFIG_DEF),
new DocInfo(ExtractField.class.getName(), ExtractField.OVERVIEW_DOC, ExtractField.CONFIG_DEF),
new DocInfo(SetSchemaMetadata.class.getName(), SetSchemaMetadata.OVERVIEW_DOC, SetSchemaMetadata.CONFIG_DEF),
new DocInfo(TimestampRouter.class.getName(), TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF),
new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, RegexRouter.CONFIG_DEF)
);

private static void printTransformationHtml(PrintStream out, DocInfo docInfo) {
out.println("<div id=\"" + docInfo.transformationName + "\">");

out.print("<h5>");
out.print(docInfo.transformationName);
out.println("</h5>");

out.println(docInfo.overview);

out.println("<p/>");

out.println(docInfo.configDef.toHtmlTable());

out.println("</div>");
}

private static void printHtml(PrintStream out) throws NoSuchFieldException, IllegalAccessException, InstantiationException {
for (final DocInfo docInfo : TRANSFORMATIONS) {
printTransformationHtml(out, docInfo);
}
}

public static void main(String... args) throws Exception {
printHtml(System.out);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.connect.transforms;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

import java.util.Map;

import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;

public abstract class ExtractField<R extends ConnectRecord<R>> implements Transformation<R> {

public static final String OVERVIEW_DOC =
"Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data."
+ "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+ "or value (<code>" + Value.class.getCanonicalName() + "</code>).";

private static final String FIELD_CONFIG = "field";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract.");

private static final String PURPOSE = "field extraction";

private String fieldName;

@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
fieldName = config.getString(FIELD_CONFIG);
}

@Override
public R apply(R record) {
final Schema schema = operatingSchema(record);
if (schema == null) {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
return newRecord(record, null, value.get(fieldName));
} else {
final Struct value = requireStruct(operatingValue(record), PURPOSE);
return newRecord(record, schema.field(fieldName).schema(), value.get(fieldName));
}
}

@Override
public void close() {
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

protected abstract Schema operatingSchema(R record);

protected abstract Object operatingValue(R record);

protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);

public static class Key<R extends ConnectRecord<R>> extends ExtractField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}

@Override
protected Object operatingValue(R record) {
return record.key();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}
}

public static class Value<R extends ConnectRecord<R>> extends ExtractField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}

@Override
protected Object operatingValue(R record) {
return record.value();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,21 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

import java.util.Collections;
import java.util.Map;

public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class HoistField<R extends ConnectRecord<R>> implements Transformation<R> {

public static final String FIELD_CONFIG = "field";
public static final String OVERVIEW_DOC =
"Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data."
+ "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+ "or value (<code>" + Value.class.getCanonicalName() + "</code>).";

private static final ConfigDef CONFIG_DEF = new ConfigDef()
private static final String FIELD_CONFIG = "field";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM,
"Field name for the single field that will be created in the resulting Struct.");
"Field name for the single field that will be created in the resulting Struct or Map.");

private Cache<Schema, Schema> schemaUpdateCache;

Expand All @@ -53,15 +59,19 @@ public R apply(R record) {
final Schema schema = operatingSchema(record);
final Object value = operatingValue(record);

Schema updatedSchema = schemaUpdateCache.get(schema);
if (updatedSchema == null) {
updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build();
schemaUpdateCache.put(schema, updatedSchema);
}
if (schema == null) {
return newRecord(record, null, Collections.singletonMap(fieldName, value));
} else {
Schema updatedSchema = schemaUpdateCache.get(schema);
if (updatedSchema == null) {
updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build();
schemaUpdateCache.put(schema, updatedSchema);
}

final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value);
final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value);

return newRecord(record, updatedSchema, updatedValue);
return newRecord(record, updatedSchema, updatedValue);
}
}

@Override
Expand All @@ -80,11 +90,7 @@ public ConfigDef config() {

protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);

/**
* Wraps the record key in a {@link org.apache.kafka.connect.data.Struct} with specified field name.
*/
public static class Key<R extends ConnectRecord<R>> extends HoistToStruct<R> {

public static class Key<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
Expand All @@ -99,14 +105,9 @@ protected Object operatingValue(R record) {
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}

}

/**
* Wraps the record value in a {@link org.apache.kafka.connect.data.Struct} with specified field name.
*/
public static class Value<R extends ConnectRecord<R>> extends HoistToStruct<R> {

public static class Value<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
Expand All @@ -121,7 +122,6 @@ protected Object operatingValue(R record) {
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
}

}

}
Loading