-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-3209: KIP-66: single message transforms #2299
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
… a base ConfigDef
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
*/ | ||
public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable { | ||
|
||
/** Apply transformation to the {@code record} and return another record object (which may be {@code record} itself). Must be thread-safe. **/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will only support simple 1:{0,1} transformations – i.e. map and filter operations
I think we should add in the javadoc that the return record object can be null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shikhar Trivial change, but this seems like a good idea so folks know they can do filtering.
Nice work! Currently I'm using Kafka connect to consume data from Kafka and push them to Elasticsearch using kafka-connect-elasticsearch. In our use case there's one limitation with this implementation. Basically what we are doing is:
Could we change the -R apply(R record);
+List<R> apply(R record); |
Yes, you can do this with SMTs instead. The difference is when the transformation occurs. SMTs are generic and occur before the data hits any serialization format specific changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shikhar A few more nits, but looks like we're almost there.
@@ -55,6 +55,11 @@ public TimestampType timestampType() { | |||
} | |||
|
|||
@Override | |||
public SinkRecord newRecord(String topic, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just noticed that the partition is omitted from this API. Was this intentional? For sinks it would definitely be weird to modify. Sources can technically specify the partition. Is there any chance we'd want to include the Kafka partition in this API as well?
|
||
for (String alias : new LinkedHashSet<>(transformAliases)) { | ||
final String prefix = TRANSFORMS_CONFIG + "." + alias + "."; | ||
final String groupPrefix = TRANSFORMS_GROUP + ": " + alias; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this isn't really a prefix, it's just a new group
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an arg to newDef.embed()
, it can be a prefix (if a transformation's ConfigDef
key spec has group name, otherwise it acts as the group). But it makes sense to just call it group
here so will do.
Collections.sort(connectorPlugins, new Comparator<ConnectorPluginInfo>() { | ||
@Override | ||
public int compare(ConnectorPluginInfo a, ConnectorPluginInfo b) { | ||
return a.clazz().compareTo(b.clazz()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Transformations are sorted by comparing the canonical names of the classes. Should we do that here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConnectorPluginInfo.clazz()
is the canonical class name
@@ -219,7 +230,7 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { | |||
log.trace("Wrote record successfully: topic {} partition {} offset {}", | |||
recordMetadata.topic(), recordMetadata.partition(), | |||
recordMetadata.offset()); | |||
commitTaskRecord(record); | |||
commitTaskRecord(preTransformRecord); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting, I don't think we really considered the potentially 2x memory usage increase this can cause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't be a 2x increase since we are not closing over the transformed record
in this callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The callback uses both the pre- and post-transformed record which is why I was saying 2x. I'm not too worried about it, it's just something we didn't realize during the review.
.define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, | ||
"Field name for the single field that will be created in the resulting Struct."); | ||
|
||
private Cache<Schema, Schema> schemaUpdateCache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
and initialize in constructor instead? Doesn't seem to depend on the config at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Cache
API does not provide a clear()
method, and I'm relying on nulling out the field in close()
. So creating it in init()
seems appropriate.
/** | ||
* This transformation allows inserting configured attributes of the record metadata as fields in the record key. | ||
* It also allows adding a static data field. | ||
* The record key is required to be of type {@link Schema.Type#STRUCT}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is outdated based on the addition of schemaless support.
/** | ||
* This transformation allows inserting configured attributes of the record metadata as fields in the record value. | ||
* It also allows adding a static data field. | ||
* The record value is required to be of type {@link Schema.Type#STRUCT}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also out of date here.
*/ | ||
public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable { | ||
|
||
/** Apply transformation to the {@code record} and return another record object (which may be {@code record} itself). Must be thread-safe. **/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the thread-safe comment actually true? Won't these be instantiated per-task and only execute in that task's thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Presently, this is true. However it makes room for potentially operating in parallel over records -- easier to have the contract now than add it later. I'm open to not doing this since it allows simplifications e.g. not needing SynchronizedCache
, if you don't think finer-grained parallelism in the workers is a likely direction.
@Mogztter Sorry, I missed the last part of your question. Transformations are 1:1 or 1:0 only for a reason. Connect tracks offsets for connectors automatically and allowing a flatMap-like transformation would at best make handling those offsets (which are defined by the connector in the case of sources) a lot harder or in the worst case break the guarantees the framework can provide. The intent of this feature is to only do lightweight transformations; Connect is still focused on moving data between systems and is not intended to be a fully-featured transformation engine. The basic transformations are being added just to support things like removing PII and doing very basic per-message data cleanup to avoid having to make extra copies of the data. If you get into more complicated transformations, you should take a look at Kafka Streams which is designed for that. |
@Mogztter what @ewencp said, sorry I should have included this as a 'Rejected alternative' in the KIP. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@@ -219,7 +230,7 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) { | |||
log.trace("Wrote record successfully: topic {} partition {} offset {}", | |||
recordMetadata.topic(), recordMetadata.partition(), | |||
recordMetadata.offset()); | |||
commitTaskRecord(record); | |||
commitTaskRecord(preTransformRecord); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The callback uses both the pre- and post-transformed record which is why I was saying 2x. I'm not too worried about it, it's just something we didn't realize during the review.
LGTM. Going to merge this now, but we'll have to remember to follow up and close the JIRA since JIRA is currently down, although I guess we have the remaining transformations to tackle still anyway. |
Thanks @ewencp for the thorough review! |
…ourceTask Followup to #2299 for KAFKA-3209 Author: Shikhar Bhushan <shikhar@confluent.io> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes #2365 from shikhar/2299-followup
Besides API and runtime changes, this PR also includes 2 data transformations (`InsertField`, `HoistToStruct`) and 1 routing transformation (`TimestampRouter`). There is some gnarliness in `ConnectorConfig` / `ConfigDef` around creating, parsing and validating a dynamic `ConfigDef`. Author: Shikhar Bhushan <shikhar@confluent.io> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes apache#2299 from shikhar/smt-2017
…ourceTask Followup to apache#2299 for KAFKA-3209 Author: Shikhar Bhushan <shikhar@confluent.io> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes apache#2365 from shikhar/2299-followup
Besides API and runtime changes, this PR also includes 2 data transformations (
InsertField
,HoistToStruct
) and 1 routing transformation (TimestampRouter
).There is some gnarliness in
ConnectorConfig
/ConfigDef
around creating, parsing and validating a dynamicConfigDef
.