Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3209: KIP-66: more single message transforms #2374

Closed
wants to merge 23 commits into from
Closed

KAFKA-3209: KIP-66: more single message transforms #2374

wants to merge 23 commits into from

Conversation

shikhar
Copy link
Contributor

@shikhar shikhar commented Jan 13, 2017

Renames HoistToStruct SMT to HoistField.

Adds the following SMTs:
ExtractField
MaskField
RegexRouter
ReplaceField
SetSchemaMetadata
ValueToKey

Adds HTML doc generation and updates to connect.html.

@asfbot
Copy link

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/866/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/868/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/863/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/861/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/866/
Test PASSed (JDK 8 and Scala 2.12).

@@ -37,7 +37,7 @@

public abstract class InsertField<R extends ConnectRecord<R>> implements Transformation<R> {

public interface Keys {
public interface ConfigName {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this can be private instead? Ditto for other classes. I thought it was possibly for tests but it doesn't seem to be used anywhere anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

/**
* Set the schema name, version or both on the record's key schema.
*/
public static class Key<R extends ConnectRecord<R>> extends SetSchemaMetadata<R> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lack of multiple inheritance is really annoying in cases like this....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, how would multiple inheritance help?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these classes use the same pattern are defining the same overrides for operatingSchema and updatedRecord. If you put that into a single abstract class, then the definition of the Key and Value types would just be one line -- the class declaration that inherits from both SetSchemaMetadata and this new abstract class that provides the overrides.

It's obviously not a big deal here since the implementations are trivial, just kind of annoying code bloat.

public R apply(R record) {
final Schema valueSchema = record.valueSchema();
if (valueSchema == null) {
if (!(record.value() instanceof Map)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We end up doing this same set of checks in a bunch of places. Any way we could reduce the duplicated code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll factor out a couple of helper functions to call into requireMap and requireStructSchema


Schema keySchema = valueToKeySchemaCache.get(valueSchema);
if (keySchema == null) {
final SchemaBuilder keySchemaBuilder = SchemaBuilder.struct();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably skip it for now, but a cache for this conversion might be a nice improvement eventually. I'd guess that with schemas you're probably mostly seeing the same conversion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a cache here though, valueToKeySchemaCache

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearly I was reading this without my 👓

@asfbot
Copy link

asfbot commented Jan 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/861/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/947/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/949/
Test PASSed (JDK 8 and Scala 2.11).

@shikhar
Copy link
Contributor Author

shikhar commented Jan 17, 2017

@ewencp should be ready for review.

Currently the new generated/connect_transforms.html page is not being included from connect.html, I'll get that into a followup PR that adds a section giving an overview of SMT and usage.

@asfbot
Copy link

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/947/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/955/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/953/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/957/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/959/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/957/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/953/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/995/
Test FAILed (JDK 8 and Scala 2.11).

final String topic = matcher.replaceFirst(replacement);
return record.newRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
}
return record;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if it may be useful here to have an option to drop records (return null here) that don't match the regex against the record.topic().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not sure. I suspect I'd actually prefer an error in that case.

@shikhar
Copy link
Contributor Author

shikhar commented Jan 19, 2017

@ewencp addressed comments

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1031/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1033/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1031/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1033/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1035/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1033/
Test PASSed (JDK 7 and Scala 2.10).

@shikhar
Copy link
Contributor Author

shikhar commented Jan 19, 2017

@ewencp sneaked in ReplaceField SMT as well. Won't add any more :-)

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1044/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1042/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1042/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1049/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1049/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1051/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1073/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1071/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1071/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@ewencp ewencp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shikhar This LGTM. I have one comment about whether we're inadvertently advertising internal APIs as external, but this isn't really a blocker. I'm going to commit as is and we can always follow up if needed.


import java.util.Map;

public class SchemaUtil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are transforms ending up in the javadocs? Just curious if we're making stuff like this appear as public APIs when we really just want this to be an internal utility.

asfgit pushed a commit that referenced this pull request Jan 21, 2017
Renames `HoistToStruct` SMT to `HoistField`.

Adds the following SMTs:
`ExtractField`
`MaskField`
`RegexRouter`
`ReplaceField`
`SetSchemaMetadata`
`ValueToKey`

Adds HTML doc generation and updates to `connect.html`.

Author: Shikhar Bhushan <shikhar@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2374 from shikhar/more-smt
@asfgit asfgit closed this in a8aa756 Jan 21, 2017
@shikhar shikhar deleted the more-smt branch January 21, 2017 01:13
soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
Renames `HoistToStruct` SMT to `HoistField`.

Adds the following SMTs:
`ExtractField`
`MaskField`
`RegexRouter`
`ReplaceField`
`SetSchemaMetadata`
`ValueToKey`

Adds HTML doc generation and updates to `connect.html`.

Author: Shikhar Bhushan <shikhar@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#2374 from shikhar/more-smt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants