-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python: Convert Avro to Iceberg schema #4742
Conversation
8c00b5b
to
113e2b2
Compare
Proposal to change the types into dataclasses. This has several improvments: - We can use the dataclasss field(repr=True) to include the fields in the representation, instead of building our own strings - We can assign the types in the post_init when they are dynamic (List, Maps, Structs etc) , or just override them when they are static (Primitives) - We don't have to implement any eq methods because they come for free - The types are frozen, which is kind of nice since we re-use them - The code is much more consise - We can assign the min/max of the int/long/float as Final as of 3.8: https://peps.python.org/pep-0591/ My inspiration was the comment by Kyle: apache#4742 (comment) This would entail implementing eq, but why not use the generated one since we're comparing all the attributes :) Would love to get you input
Proposal to change the types into dataclasses. This has several improvments: - We can use the dataclasss field(repr=True) to include the fields in the representation, instead of building our own strings - We can assign the types in the post_init when they are dynamic (List, Maps, Structs etc) , or just override them when they are static (Primitives) - We don't have to implement any eq methods because they come for free - The types are frozen, which is kind of nice since we re-use them - The code is much more consise - We can assign the min/max of the int/long/float as Final as of 3.8: https://peps.python.org/pep-0591/ My inspiration was the comment by Kyle: apache#4742 (comment) This would entail implementing eq, but why not use the generated one since we're comparing all the attributes :) Would love to get you input
* Change types into dataclasses Proposal to change the types into dataclasses. This has several improvments: - We can use the dataclasss field(repr=True) to include the fields in the representation, instead of building our own strings - We can assign the types in the post_init when they are dynamic (List, Maps, Structs etc) , or just override them when they are static (Primitives) - We don't have to implement any eq methods because they come for free - The types are frozen, which is kind of nice since we re-use them - The code is much more consise - We can assign the min/max of the int/long/float as Final as of 3.8: https://peps.python.org/pep-0591/ My inspiration was the comment by Kyle: #4742 (comment) This would entail implementing eq, but why not use the generated one since we're comparing all the attributes :) Would love to get you input * Remove explicit repr and eq * Use @cached_property to cache the string Add missing words to spelling * Add additional guard for initializing StructType using kwargs * Replace type with field_type
113e2b2
to
9d6fa80
Compare
9d6fa80
to
0a1b456
Compare
Thanks! I'll take another look. |
""" | ||
# In the case of a primitive field | ||
if isinstance(field, str): | ||
return AvroSchemaConversion.PRIMITIVE_FIELD_TYPE_MAP[field] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the logic in these methods hard to follow and I think it's because the cases are not cleanly separated by method. This is a good example. This method is handling an Avro field, but this is checking if the field is not a field and is instead a primitive type. That should never happen in a schema so it raises questions about when this method is called.
This is one reason why we use the visitor pattern elsewhere. Keeping the logic to traverse a schema separate from the logic to actually do something with it is useful, but it also keeps you using a consistent and focused pattern to construct this logic: here's how to convert a record, here's how to convert a field, here's how to convert a map, etc.
Since this isn't handling an Avro Schema class, I wasn't originally going to suggest it, but I think this would be cleaner and easier to review/maintain if it were structured around Avro's schema model:
def _convert_schema(schema: str | Dict[str, Any]):
if isinstance(schema, str):
return AvroSchemaConversion.PRIMITIVE_FIELD_TYPE_MAP[schema]
elif isinstance(schema, dict):
type_name = schema["type"]
if type_name == "record":
return _convert_record(schema)
elif type_name == "union":
...
elif type_name == "map":
...
elif type_name == "array":
...
else:
logical_type = schema.get("logicalType")
if logicalType:
...
else:
return AvroSchemaConversion.PRIMITIVE_FIELD_TYPE_MAP[type_name]
else:
raise ValueError(f"Cannot convert invalid schema: {schema}")
def _convert_record(schema: Dict[str, Any]):
... # calls _convert_field
def _convert_field(field: Dict[str, Any]):
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's much cleaner indeed. The method grew a bit over time, but I really like the suggestion of decoupling the field and type. I've updated the code.
python/src/iceberg/schema.py
Outdated
@property | ||
def columns(self) -> Iterable[NestedField]: | ||
def columns(self) -> Tuple[NestedField]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be Tuple[NestedField, ...]
since it isn't a single field in the tuple?
From the typing.Tuple docs:
To specify a variable-length tuple of homogeneous type, use literal ellipsis, e.g. Tuple[int, ...]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good one. I know that we're going a bit back and forth on this annotation, but I've double-checked using the inspector, and it is a Tuple at runtime 👍🏻
Returns: | ||
The Iceberg equivalent field | ||
""" | ||
assert "field-id" in field, "Missing field-id in the Avro field, this is required for converting it to an Iceberg schema" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is thrown when using assert
rather than raise ValueError
?
Also, we try to be more direct with error messages by starting with what was attempted, then giving context for what went wrong. It's also assumed that if an exception is raised then some requirement wasn't met so we'd typically leave out the "this is required ..." part. How about f"Cannot convert field, missing field-id: {field}"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point. I'm not against assert's personally, but there is a battle in the Python community if you should use them for checks like these. Technically you can ignore them using a flag python -O
, and they are meant for developers. I've changed them to a ValueError to be in line with the rest of the exceptions (which I like).
I've updated the message to your suggestion. I think it is good to have the field in there 👍🏻
plain_type, element_is_optional = self._resolve_union(avro_type[inner_field_name]) | ||
if isinstance(plain_type, dict): | ||
# We need the element-id downstream | ||
plain_type["field-id"] = avro_type[id_field] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly prefer not modifying the incoming data like this is doing. I think avro_type
and parts of that type should be handled as though they are immutable.
Also, I don't think that this works. _convert_schema
is going to return a type. For primitive types, there is no way to pass in the field-id
property, and in all cases the field-id
is going to be ignored because Iceberg types don't carry field IDs.
I think the simple solution to both problems is to handle inner types in the map and array methods directly, rather than using a common method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Ryan, I agree. I don't think we really need the method at all. I've removed it and just resolved the type instead of the field. Still some old code dangling around.
LOGICAL_FIELD_TYPE_MAPPING: Dict[str, PrimitiveType] = { | ||
"date": DateType(), | ||
"time-millis": TimeType(), | ||
"timestamp-millis": TimestampType(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg will only write time-micros
and timestamp-micros
because at least microsecond precision is required by the SQL spec. The mappings for millis are correct, but we should also be able to handle micros.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added the mapping, but I think we need to correct the scale when reading the data. Analog to the Java impl:
iceberg/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
Lines 137 to 140 in 90225d6
case "timestamp-millis": | |
// adjust to microseconds | |
ValueReader<Long> longs = ValueReaders.longs(); | |
return (ValueReader<Long>) (decoder, ignored) -> longs.read(decoder, null) * 1000L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly. We'll need to be aware of whether it is in millis or micros when reading the file!
Returns: | ||
""" | ||
if record_type["type"] != "record": | ||
raise ValueError(f"Expected type, got: {record_type}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expected record?
elif logical_type == "map": | ||
return self._convert_logical_map_type(avro_logical_type) | ||
elif (logical_type, physical_type) in LOGICAL_FIELD_TYPE_MAPPING: | ||
return LOGICAL_FIELD_TYPE_MAPPING[(logical_type, physical_type)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just also realized that this doesn't check for the adjustToUtc
flag that we use to distinguish between TimestampType()
and TimestamptzType()
. We'll have to add that.
Returns: | ||
A Iceberg DecimalType | ||
""" | ||
return DecimalType(precision=avro_type["precision"], scale=avro_type["scale"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you're validating physical type above, you may want to validate that the physical type here is fixed, long, or int.
fields = avro_type["items"]["fields"] | ||
if len(fields) != 2: | ||
raise ValueError(f'Invalid key-value pair schema: {avro_type["items"]}') | ||
key = self._convert_field(list(filter(lambda f: f["name"] == "key", fields))[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Too bad it is more complicated, but thanks for fixing this.
There are a few minor things to fix, but overall I think this looks good. I'm going to commit it to unblock reader work and we can follow up with minor updates. Thanks, @Fokko! |
* Change types into dataclasses Proposal to change the types into dataclasses. This has several improvments: - We can use the dataclasss field(repr=True) to include the fields in the representation, instead of building our own strings - We can assign the types in the post_init when they are dynamic (List, Maps, Structs etc) , or just override them when they are static (Primitives) - We don't have to implement any eq methods because they come for free - The types are frozen, which is kind of nice since we re-use them - The code is much more consise - We can assign the min/max of the int/long/float as Final as of 3.8: https://peps.python.org/pep-0591/ My inspiration was the comment by Kyle: apache/iceberg#4742 (comment) This would entail implementing eq, but why not use the generated one since we're comparing all the attributes :) Would love to get you input * Remove explicit repr and eq * Use @cached_property to cache the string Add missing words to spelling * Add additional guard for initializing StructType using kwargs * Replace type with field_type
* Change types into dataclasses Proposal to change the types into dataclasses. This has several improvments: - We can use the dataclasss field(repr=True) to include the fields in the representation, instead of building our own strings - We can assign the types in the post_init when they are dynamic (List, Maps, Structs etc) , or just override them when they are static (Primitives) - We don't have to implement any eq methods because they come for free - The types are frozen, which is kind of nice since we re-use them - The code is much more consise - We can assign the min/max of the int/long/float as Final as of 3.8: https://peps.python.org/pep-0591/ My inspiration was the comment by Kyle: apache/iceberg#4742 (comment) This would entail implementing eq, but why not use the generated one since we're comparing all the attributes :) Would love to get you input * Remove explicit repr and eq * Use @cached_property to cache the string Add missing words to spelling * Add additional guard for initializing StructType using kwargs * Replace type with field_type
Converts an Avro schema into an Iceberg one.
Looked at https://github.com/apache/iceberg/blob/master/python_legacy/iceberg/core/avro/avro_to_iceberg.py but I decided to rewrite it to make it more pythonic.