-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4714: Flatten and Cast single message transforms (KIP-66) #2458
Conversation
@kkonstantine @hachikuji Could you review? These are 2 of the last 3 built-in SMTs we have from KIP-66. I'd like to get these in since we've seen asks for both on the mailing lists (directly, for casting when the connector can't constrain the range of values, and indirectly for Flatten for use with connectors that can only handle flat schemas). The final SMT missing is TimestampConverter, which I think is really useful, but probably too complex (both implementation + testing) to squeeze in. However, these two are useful enough that I think we should push them through if possible. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few comments. Looks ok to me.
|
||
@Override | ||
public R apply(R record) { | ||
if (operatingSchema(record) == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could have been a wonderful one-liner, but now I see that everywhere this version of apply
is the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not the same everywhere, but in many places yes. Can always clean up and simplify later, if it's worth it. I also mentioned in a previous PR to @shikhar that there is a fair amount of duplicated code that might be removed if we provided a common base class for transformations that fit a few requirements (e.g., support schema & schemaless versions requiring struct/map, can be applied to either Key or Value, etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree to punt.
public class CastTest { | ||
|
||
@Test | ||
public void castWholeRecordKeySchemad() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is Schemad
a typo everywhere in this test? Or is it meant as an adjective indeed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meant as an adjective. Could easily swap for WithSchema
. I debated on this and was unhappy with all the options I came up with :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm up for creating new words.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like withSchema
. I also wondered if it was a typo and others probably will too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to WithSchema
|
||
// As a special case for casting the entire value (e.g. the incoming key is a int64 but you know it could be an | ||
// int32 and want the smaller width), we use an otherwise invalid field name in the cast spec to track this. | ||
private static final String WHOLE_VALUE_CAST = ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this refer to "identity conversion"? Is this a better name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's not identity. The original idea for the transformation was casting the type of fields (in a Struct w/ schemas, in a Map w/o schemas). But then I realized that if you apply it to keys, it might need to apply to the entire record (either key or value in the ConnectRecord) rather than a field. I was definitely stretching to find a name to refer to this...
Relatedly, there's some future improvements to be added here. Currently the code only handles casting the entire value or fields within the value if it is a Struct/Map. Still missing is handling hierarchical structures, i.e. in the spec, a field would ideally support some sort of dotted notation to allow for casting in nested structs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see now. But that's a case where an example has an edge in intuition.
throw new ConnectException("Unexpected type in Cast transformation: " + value.getClass()); | ||
case BOOLEAN: | ||
if (value instanceof Number) | ||
return ((Number) value).longValue() > 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this return false
only on 0
? Instead of <=0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we still return false
on negative numbers. Did the fix get included?
private static Map<String, Schema.Type> parseFieldTypes(List<String> mappings) { | ||
final Map<String, Schema.Type> m = new HashMap<>(); | ||
for (String mapping : mappings) { | ||
final String[] parts = mapping.split(":"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not trimming the parts after split, feels a bit restrictive. Do we want to use a util method that does both?
Change shouldn't have big impact because I see it's only used here and in ReplaceField
transform.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConfigDef
already handles trimming, so we shouldn't need to worry about it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was referring to the parts after split though:
int8 : int32
vs
int8:int32
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack, added trim() calls before using the values
import java.util.Map; | ||
import java.util.Set; | ||
|
||
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't leave a comment in Requirements
, but requireMap
needs an annotation to suppress unchecked cast exception to avoid a warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What command complains?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just noticed by reading and then ran Intellij's code inspection.
I see -Xlint:unchecked is not enabled in our gradle script, so we won't see it for now.
https://github.com/apache/kafka/blob/trunk/build.gradle#L128
Up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
throw new ConnectException("Flatten transformation does not support " + entry.getValue().getClass() | ||
+ " for record without schemas (for field " + fieldName + ")."); | ||
} | ||
entry.getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a no-op
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, was just some random leftover code.
public void topLevelStructRequired() { | ||
final Flatten<SourceRecord> xform = new Flatten.Value<>(); | ||
xform.configure(Collections.<String, String>emptyMap()); | ||
xform.apply(new SourceRecord(null, null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to fit in one line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this was copy/paste. It is somewhat cleaner in that the three lines break up source partition/offset, topic/partition, and value schema/value. But I'm not tied to that split, so I've realigned them all on the same line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought so, but wasn't excited with the grouping and it's not a long line anyways.
public void topLevelMapRequired() { | ||
final Flatten<SourceRecord> xform = new Flatten.Value<>(); | ||
xform.configure(Collections.<String, String>emptyMap()); | ||
xform.apply(new SourceRecord(null, null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here as well.
private void applySchemaless(Map<String, Object> originalRecord, String fieldNamePrefix, Map<String, Object> newRecord) { | ||
for (Map.Entry<String, Object> entry : originalRecord.entrySet()) { | ||
final String fieldName = fieldName(fieldNamePrefix, entry.getKey()); | ||
switch (ConnectSchema.schemaType(entry.getValue().getClass())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the possibility of a null
value in an entry of such a map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point. I've added handling for nulls that just applies the null to the same field and returns.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Just a followup on the mapping's split and the unchecked exception warning. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments in Cast
. I'll get to the rest this evening.
.define(SPEC_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.Validator() { | ||
@Override | ||
public void ensureValid(String name, Object valueObject) { | ||
List<String> value = (List<String>) valueObject; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unchecked cast. maybe we should add @SuppressWarnings
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
} | ||
}, | ||
ConfigDef.Importance.HIGH, | ||
"List of fields and the type to cast them to of the form field1:type,field2:type to cast fields of " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we document somewhere the list of type names that we expect or is that assumed known?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, added a list here.
} | ||
if (parts.length == 1) { | ||
if (mappings.size() > 1) { | ||
throw new ConfigException("Cast transformations that specify a type to cast the entire value to " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell, "int32" and ":int32" have the same effect, but the latter will skip this check since the string will split into two parts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, interesting point. That's unintentional -- I wanted a sentinel for the single type case. I think ":int32"
should probably be invalid since an empty string shouldn't be a valid field name.
In any case, in refactoring, I simplified the use of WHOLE_VALUE_CAST
and it should be safe for it to now be null
instead so it can't conflict with a valid field name.
} | ||
|
||
private R applySchemaless(R record) { | ||
Schema.Type recordCastType = casts.get(WHOLE_VALUE_CAST); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: naming is a little inconsistent. Should it be something like wholeValueCastType
? Similar in the case with a schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the name
|
||
} | ||
|
||
private static Object castType(Object value, Schema.Type targetType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe castTypeTo
, castTo
or castAs
?
+ " which is not supported by Connect's data API"); | ||
} | ||
// Ensure the type we are trying to cast from is supported | ||
validCastType(inferredType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A little odd that this throws ConfigException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack, adjusted that method to work throw ConfigException
when validating the target types and DataException
on validating the input records
Schema.Type recordCastType = casts.get(WHOLE_VALUE_CAST); | ||
|
||
Schema updatedSchema = schemaUpdateCache.get(valueSchema); | ||
if (updatedSchema == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems we could turn this block into a function, say getOrBuildSchema
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, refactored.
if (valueSchema.isOptional()) | ||
builder.optional(); | ||
if (valueSchema.defaultValue() != null) | ||
builder.defaultValue(valueSchema.defaultValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is no conversion needed for the default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, I got it on the fields but not on the entire record. fixed and added test.
final Struct updatedValue = new Struct(updatedSchema); | ||
for (Field field : value.schema().fields()) { | ||
final Object origFieldValue = value.get(field); | ||
updatedValue.put(updatedSchema.field(field.name()), casts.containsKey(field.name()) ? castType(origFieldValue, casts.get(field.name())) : origFieldValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: long line. Maybe we can put the second argument into a variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
throw new ConnectException("Unexpected type in Cast transformation: " + value.getClass()); | ||
case BOOLEAN: | ||
if (value instanceof Number) | ||
return ((Number) value).longValue() > 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we still return false
on negative numbers. Did the fix get included?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple questions for Flatten
. Looks good.
final Struct updatedValue = new Struct(updatedSchema); | ||
buildWithSchema(value, "", updatedValue); | ||
return newRecord(record, updatedSchema, updatedValue); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unneeded newline
newRecord.put(fieldName(fieldNamePrefix, entry.getKey()), null); | ||
return; | ||
} | ||
switch (ConnectSchema.schemaType(value.getClass())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check the result of schemaType
for null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, added the same check that's done in Cast
case BYTES: | ||
newRecord.put(fieldName, record.get(field)); | ||
break; | ||
case STRUCT: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own knowledge, is the MAP
type only for schemaless data? You would never have a nested schemaless object inside a struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can have MAP
for schemas, just as you can in Avro (in fact they are a bit more flexible due to other serialization formats supporting more than just string keys). However, they can't be flattened here since we can't generate the schema without knowing what the keys will be (and in fact because keys have arbitrary structure as well and aren't necessarily just strings, it may not even be easy to generate a meaningful field name for them).
newSchema.field(fieldName, field.schema()); | ||
break; | ||
case STRUCT: | ||
buildUpdatedSchema(field.schema(), fieldName, newSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it makes sense to propagate optionality and default values when recursing. I.e. if a parent Struct
was optional all the flattened fields resulting from it should be optional. And in the absence of a default value on a child field if there was a default parent Struct
, use that `Struct's field value as default flattened field value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was a really good observation. Turns out this isn't too hard to layer onto the existing implementation, although I have to admit that I would think the semantics can be confusing to reason about. Thankfully I don't think most folks use default values on anything but individual primitive fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I miss @shikhar!
9c5616d
to
da5f329
Compare
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Any updates here? It'd be nice if KIP-66 was updated with which transformers are implemented in Kafka |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
|
||
private static final String PURPOSE = "cast types"; | ||
|
||
private static final Set<Schema.Type> SUPPORTED_CAST_TARGETS = new HashSet<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: we should probably include support for casting to/from the Decimal logical type as well.
da5f329
to
7b23498
Compare
@hachikuji @kkonstantine Took another pass at this and I think I've addressed all the comments. Interesting how tricky some of these transformations can get, @shikhar's observation about optional/default values for Flatten in particular... @xiongtx I'll update the wiki to include that info |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
/cc @gwenshap Not sure how I omitted you thus far, pretty sure you want to see some of these transformations included :) |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the comments have been addressed and this looks ready to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. LGTM.
No description provided.