Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3209: KIP-66: more single message transforms #2374

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f34cc71
SetSchemaMetadata SMT
shikhar Jan 13, 2017
09af66b
Support schemaless and rename HoistToStruct->Hoist; add the inverse E…
shikhar Jan 13, 2017
022f492
InsertField transform -- fix bad name for interface containing config…
shikhar Jan 13, 2017
c5260a7
ValueToKey SMT
shikhar Jan 13, 2017
06b47a8
Clearer naming -- HoistField, ExtractField
shikhar Jan 13, 2017
cdd8855
Address review comments; docgen coming along
shikhar Jan 17, 2017
04dc9f3
Auto-generate docs/generated/connect_transforms.html
shikhar Jan 17, 2017
e90f989
html tweaks
shikhar Jan 17, 2017
692901f
consistent doc style
shikhar Jan 17, 2017
5772e95
Add RegexRouter SMT
shikhar Jan 18, 2017
2b99585
consistent naming
shikhar Jan 18, 2017
1063909
MaskField SMT
shikhar Jan 18, 2017
db7faf0
empty comment
shikhar Jan 18, 2017
b4aa9ee
Missed note in overview doc for MaskField SMT
shikhar Jan 18, 2017
3186c9f
Use Class object rather than canon name in masked value mappings
shikhar Jan 19, 2017
9bb5908
MaskField.fields -> maskedFields for clarity
shikhar Jan 19, 2017
8676dc5
Clearer overview doc for ValueToKey
shikhar Jan 19, 2017
16c9259
Config validation, try to fail-fast on no-op configs
shikhar Jan 19, 2017
a374bf9
checkstyle
shikhar Jan 19, 2017
7ee22e5
ReplaceField SMT and some consistency/cleanups refactoring
shikhar Jan 19, 2017
aa325db
Minor InsertField tweak -- docs and config check
shikhar Jan 20, 2017
be67396
User doc for transformations in connect.html
shikhar Jan 20, 2017
7eae01a
Better word choice
shikhar Jan 20, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ project(':core') {

task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs',
'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs',
':streams:genStreamsConfigDocs'], type: Tar) {
classifier = 'site-docs'
compression = Compression.GZIP
Expand Down Expand Up @@ -948,6 +948,13 @@ project(':connect:runtime') {
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "connect_config.html").newOutputStream()
}

task genConnectTransformationDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.connect.tools.TransformationDoc'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "connect_transforms.html").newOutputStream()
}
}

project(':connect:file') {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.connect.tools;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.HoistField;
import org.apache.kafka.connect.transforms.InsertField;
import org.apache.kafka.connect.transforms.MaskField;
import org.apache.kafka.connect.transforms.RegexRouter;
import org.apache.kafka.connect.transforms.ReplaceField;
import org.apache.kafka.connect.transforms.SetSchemaMetadata;
import org.apache.kafka.connect.transforms.TimestampRouter;
import org.apache.kafka.connect.transforms.ValueToKey;

import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;

public class TransformationDoc {

private static final class DocInfo {
final String transformationName;
final String overview;
final ConfigDef configDef;

private DocInfo(String transformationName, String overview, ConfigDef configDef) {
this.transformationName = transformationName;
this.overview = overview;
this.configDef = configDef;
}
}

private static final List<DocInfo> TRANSFORMATIONS = Arrays.asList(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will probably grow outdated quickly. Can we use the classpath discovery code we already have to generate this list automatically?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nm, I guess the issue is that we can get the class name and ConfigDef via the Transformation interface, but not the docstring. I wonder if a simple naming assumption and reflection could more cleanly solve this than manually maintaining the list.

Copy link
Contributor Author

@shikhar shikhar Jan 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was trying with reflection initially, however not sure how to handleKey/Value variants since the PluginDiscovery business only returns concrete impls currently, and I want to include a single entry for the SMT in these cases. It could be made to work but not sure if the magic is worth it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I also like that the direct static dependency on Xform.OVERVIEW_DOC prevents it from appearing as an unused field in IntelliJ :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if the gymnastics are too complicated, maintaining it manually seems fine. We probably won't need to change this very often anyway.

new DocInfo(InsertField.class.getName(), InsertField.OVERVIEW_DOC, InsertField.CONFIG_DEF),
new DocInfo(ReplaceField.class.getName(), ReplaceField.OVERVIEW_DOC, ReplaceField.CONFIG_DEF),
new DocInfo(MaskField.class.getName(), MaskField.OVERVIEW_DOC, MaskField.CONFIG_DEF),
new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, ValueToKey.CONFIG_DEF),
new DocInfo(HoistField.class.getName(), HoistField.OVERVIEW_DOC, HoistField.CONFIG_DEF),
new DocInfo(ExtractField.class.getName(), ExtractField.OVERVIEW_DOC, ExtractField.CONFIG_DEF),
new DocInfo(SetSchemaMetadata.class.getName(), SetSchemaMetadata.OVERVIEW_DOC, SetSchemaMetadata.CONFIG_DEF),
new DocInfo(TimestampRouter.class.getName(), TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF),
new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, RegexRouter.CONFIG_DEF)
);

private static void printTransformationHtml(PrintStream out, DocInfo docInfo) {
out.println("<div id=\"" + docInfo.transformationName + "\">");

out.print("<h5>");
out.print(docInfo.transformationName);
out.println("</h5>");

out.println(docInfo.overview);

out.println("<p/>");

out.println(docInfo.configDef.toHtmlTable());

out.println("</div>");
}

private static void printHtml(PrintStream out) throws NoSuchFieldException, IllegalAccessException, InstantiationException {
for (final DocInfo docInfo : TRANSFORMATIONS) {
printTransformationHtml(out, docInfo);
}
}

public static void main(String... args) throws Exception {
printHtml(System.out);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.connect.transforms;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

import java.util.Map;

import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;

public abstract class ExtractField<R extends ConnectRecord<R>> implements Transformation<R> {

public static final String OVERVIEW_DOC =
"Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data."
+ "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+ "or value (<code>" + Value.class.getCanonicalName() + "</code>).";

private static final String FIELD_CONFIG = "field";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract.");

private static final String PURPOSE = "field extraction";

private String fieldName;

@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
fieldName = config.getString(FIELD_CONFIG);
}

@Override
public R apply(R record) {
final Schema schema = operatingSchema(record);
if (schema == null) {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
return newRecord(record, null, value.get(fieldName));
} else {
final Struct value = requireStruct(operatingValue(record), PURPOSE);
return newRecord(record, schema.field(fieldName).schema(), value.get(fieldName));
}
}

@Override
public void close() {
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

protected abstract Schema operatingSchema(R record);

protected abstract Object operatingValue(R record);

protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);

public static class Key<R extends ConnectRecord<R>> extends ExtractField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}

@Override
protected Object operatingValue(R record) {
return record.key();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}
}

public static class Value<R extends ConnectRecord<R>> extends ExtractField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}

@Override
protected Object operatingValue(R record) {
return record.value();
}

@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,21 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

import java.util.Collections;
import java.util.Map;

public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class HoistField<R extends ConnectRecord<R>> implements Transformation<R> {

public static final String FIELD_CONFIG = "field";
public static final String OVERVIEW_DOC =
"Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data."
+ "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+ "or value (<code>" + Value.class.getCanonicalName() + "</code>).";

private static final ConfigDef CONFIG_DEF = new ConfigDef()
private static final String FIELD_CONFIG = "field";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM,
"Field name for the single field that will be created in the resulting Struct.");
"Field name for the single field that will be created in the resulting Struct or Map.");

private Cache<Schema, Schema> schemaUpdateCache;

Expand All @@ -53,15 +59,19 @@ public R apply(R record) {
final Schema schema = operatingSchema(record);
final Object value = operatingValue(record);

Schema updatedSchema = schemaUpdateCache.get(schema);
if (updatedSchema == null) {
updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build();
schemaUpdateCache.put(schema, updatedSchema);
}
if (schema == null) {
return newRecord(record, null, Collections.singletonMap(fieldName, value));
} else {
Schema updatedSchema = schemaUpdateCache.get(schema);
if (updatedSchema == null) {
updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build();
schemaUpdateCache.put(schema, updatedSchema);
}

final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value);
final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value);

return newRecord(record, updatedSchema, updatedValue);
return newRecord(record, updatedSchema, updatedValue);
}
}

@Override
Expand All @@ -80,11 +90,7 @@ public ConfigDef config() {

protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);

/**
* Wraps the record key in a {@link org.apache.kafka.connect.data.Struct} with specified field name.
*/
public static class Key<R extends ConnectRecord<R>> extends HoistToStruct<R> {

public static class Key<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
Expand All @@ -99,14 +105,9 @@ protected Object operatingValue(R record) {
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}

}

/**
* Wraps the record value in a {@link org.apache.kafka.connect.data.Struct} with specified field name.
*/
public static class Value<R extends ConnectRecord<R>> extends HoistToStruct<R> {

public static class Value<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
Expand All @@ -121,7 +122,6 @@ protected Object operatingValue(R record) {
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
}

}

}
Loading