New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-3209: KIP-66: more single message transforms #2374
Changes from 5 commits
f34cc71
09af66b
022f492
c5260a7
06b47a8
cdd8855
04dc9f3
e90f989
692901f
5772e95
2b99585
1063909
db7faf0
b4aa9ee
3186c9f
9bb5908
8676dc5
16c9259
a374bf9
7ee22e5
aa325db
be67396
7eae01a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* <p/> | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* <p/> | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
**/ | ||
|
||
package org.apache.kafka.connect.transforms; | ||
|
||
import org.apache.kafka.common.config.ConfigDef; | ||
import org.apache.kafka.connect.connector.ConnectRecord; | ||
import org.apache.kafka.connect.data.Schema; | ||
import org.apache.kafka.connect.data.Struct; | ||
import org.apache.kafka.connect.errors.DataException; | ||
import org.apache.kafka.connect.transforms.util.SimpleConfig; | ||
|
||
import java.util.Map; | ||
|
||
public abstract class ExtractField<R extends ConnectRecord<R>> implements Transformation<R> { | ||
|
||
public static final String FIELD_CONFIG = "field"; | ||
|
||
private static final ConfigDef CONFIG_DEF = new ConfigDef() | ||
.define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract."); | ||
|
||
private String fieldName; | ||
|
||
@Override | ||
public void configure(Map<String, ?> props) { | ||
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); | ||
fieldName = config.getString(FIELD_CONFIG); | ||
} | ||
|
||
@Override | ||
public R apply(R record) { | ||
final Schema schema = operatingSchema(record); | ||
final Object value = operatingValue(record); | ||
|
||
if (schema == null) { | ||
if (!(value instanceof Map)) { | ||
throw new DataException("Only Map supported in schemaless mode"); | ||
} | ||
return newRecord(record, null, ((Map) value).get(fieldName)); | ||
} else { | ||
if (schema.type() != Schema.Type.STRUCT) { | ||
throw new DataException("Only STRUCT schema type supported, was: " + schema.type()); | ||
} | ||
return newRecord(record, schema.field(fieldName).schema(), ((Struct) value).get(fieldName)); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
} | ||
|
||
@Override | ||
public ConfigDef config() { | ||
return CONFIG_DEF; | ||
} | ||
|
||
protected abstract Schema operatingSchema(R record); | ||
|
||
protected abstract Object operatingValue(R record); | ||
|
||
protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); | ||
|
||
public static class Key<R extends ConnectRecord<R>> extends ExtractField<R> { | ||
@Override | ||
protected Schema operatingSchema(R record) { | ||
return record.keySchema(); | ||
} | ||
|
||
@Override | ||
protected Object operatingValue(R record) { | ||
return record.key(); | ||
} | ||
|
||
@Override | ||
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { | ||
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp()); | ||
} | ||
} | ||
|
||
public static class Value<R extends ConnectRecord<R>> extends ExtractField<R> { | ||
@Override | ||
protected Schema operatingSchema(R record) { | ||
return record.valueSchema(); | ||
} | ||
|
||
@Override | ||
protected Object operatingValue(R record) { | ||
return record.value(); | ||
} | ||
|
||
@Override | ||
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { | ||
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp()); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* <p> | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* <p> | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
**/ | ||
|
||
package org.apache.kafka.connect.transforms; | ||
|
||
import org.apache.kafka.common.config.ConfigDef; | ||
import org.apache.kafka.connect.connector.ConnectRecord; | ||
import org.apache.kafka.connect.data.ConnectSchema; | ||
import org.apache.kafka.connect.data.Schema; | ||
import org.apache.kafka.connect.errors.DataException; | ||
import org.apache.kafka.connect.transforms.util.SimpleConfig; | ||
|
||
import java.util.Map; | ||
|
||
public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements Transformation<R> { | ||
|
||
public interface ConfigName { | ||
String SCHEMA_NAME = "schema.name"; | ||
String SCHEMA_VERSION = "schema.version"; | ||
} | ||
|
||
private static final ConfigDef CONFIG_DEF = new ConfigDef() | ||
.define(ConfigName.SCHEMA_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Schema name to set.") | ||
.define(ConfigName.SCHEMA_VERSION, ConfigDef.Type.INT, null, ConfigDef.Importance.HIGH, "Schema version to set."); | ||
|
||
private String schemaName; | ||
private Integer schemaVersion; | ||
|
||
@Override | ||
public void configure(Map<String, ?> configs) { | ||
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); | ||
schemaName = config.getString(ConfigName.SCHEMA_NAME); | ||
schemaVersion = config.getInt(ConfigName.SCHEMA_VERSION); | ||
} | ||
|
||
@Override | ||
public R apply(R record) { | ||
if (schemaName == null && schemaVersion == null) return record; // no-op | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In some cases like these I wonder if we should throw an exception during There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good point. There are a few such transformations. I'll add an exception if the configuration is turning it into a no-op. |
||
final Schema schema = operatingSchema(record); | ||
if (schema == null) { | ||
throw new DataException("Cannot update metadata on null Schema"); | ||
} | ||
final boolean isArray = schema.type() == Schema.Type.ARRAY; | ||
final boolean isMap = schema.type() == Schema.Type.MAP; | ||
final Schema updatedSchema = new ConnectSchema( | ||
schema.type(), | ||
schema.isOptional(), | ||
schema.defaultValue(), | ||
schemaName != null ? schemaName : schema.name(), | ||
schemaVersion != null ? schemaVersion : schema.version(), | ||
schema.doc(), | ||
schema.parameters(), | ||
schema.fields(), | ||
isMap ? schema.keySchema() : null, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you even need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These accessors throw an exception if the type doesn't line up... |
||
isMap || isArray ? schema.valueSchema() : null | ||
); | ||
return updatedRecord(record, updatedSchema); | ||
} | ||
|
||
@Override | ||
public ConfigDef config() { | ||
return CONFIG_DEF; | ||
} | ||
|
||
@Override | ||
public void close() { | ||
} | ||
|
||
protected abstract Schema operatingSchema(R record); | ||
|
||
protected abstract R updatedRecord(R record, Schema updatedSchema); | ||
|
||
/** | ||
* Set the schema name, version or both on the record's key schema. | ||
*/ | ||
public static class Key<R extends ConnectRecord<R>> extends SetSchemaMetadata<R> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lack of multiple inheritance is really annoying in cases like this.... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious, how would multiple inheritance help? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All these classes use the same pattern are defining the same overrides for It's obviously not a big deal here since the implementations are trivial, just kind of annoying code bloat. |
||
@Override | ||
protected Schema operatingSchema(R record) { | ||
return record.keySchema(); | ||
} | ||
|
||
@Override | ||
protected R updatedRecord(R record, Schema updatedSchema) { | ||
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, record.key(), record.valueSchema(), record.value(), record.timestamp()); | ||
} | ||
} | ||
|
||
/** | ||
* Set the schema name, version or both on the record's value schema. | ||
*/ | ||
public static class Value<R extends ConnectRecord<R>> extends SetSchemaMetadata<R> { | ||
@Override | ||
protected Schema operatingSchema(R record) { | ||
return record.valueSchema(); | ||
} | ||
|
||
@Override | ||
protected R updatedRecord(R record, Schema updatedSchema) { | ||
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp()); | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this can be private instead? Ditto for other classes. I thought it was possibly for tests but it doesn't seem to be used anywhere anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do