[SPARK-56166][PYTHON] Use ArrowBatchTransformer.enforce_schema to replace column-wise type coercion logic#54967
Conversation
…lace manual type coercion logic ### What changes were proposed in this pull request? Replace manual column-by-column type coercion with `ArrowBatchTransformer.enforce_schema` in three places: 1. `ArrowStreamArrowUDTFSerializer.apply_type_coercion` in serializers.py 2. `ArrowStreamArrowUDFSerializer.create_batch` in serializers.py 3. `process_results` in worker.py (scalar Arrow iter UDF path) Also: - Add `arrow_cast` parameter to `enforce_schema` for strict type matching mode - Add `KeyError` handling in `enforce_schema` for missing columns with user-friendly error - Remove now-unused `coerce_arrow_array` imports from serializers.py and worker.py ### Why are the changes needed? These three places duplicated the same coerce-and-reassemble logic that `enforce_schema` already provides. Consolidating reduces code duplication and ensures consistent error handling. ### Does this PR introduce _any_ user-facing change? Error messages for type/schema mismatches in Arrow UDTFs are slightly changed to be consistent with other Arrow UDF error messages. ### How was this patch tested? Existing tests in `test_arrow_udtf.py` and `test_arrow_udf_scalar.py`. ### Was this patch authored or co-authored using generative AI tooling? Yes.
c537147 to
e6a55c9
Compare
| # If so, use index-based access (faster than name lookup). | ||
| batch_names = [batch.schema.field(i).name for i in range(batch.num_columns)] | ||
| target_names = [field.name for field in arrow_schema] | ||
| use_index = batch_names == target_names |
There was a problem hiding this comment.
on use_index, do we need to take care of nested cases?
There was a problem hiding this comment.
Good question! I believe the answer is no, as use_index only controls top-level column lookup. When column names are in the same order it uses positional index for speed, otherwise it falls back to name-based lookup.
Nested type coercion is handled by pa.Array.cast(), which Arrow applies recursively, so nested struct/list fields are covered regardless of the use_index path. I added a unit test (test_enforce_schema_nested_cast) to verify this.
166fbf0 to
5f12756
Compare
zhengruifeng
left a comment
There was a problem hiding this comment.
Overall this is a clean refactoring that consolidates three duplicated coerce-and-reassemble code paths into enforce_schema. The coerce_arrow_array removal is complete (no dangling references), the keyword-only signature change is safe for all callers, and the new unit tests cover the added parameters well. One minor nit inline.
|
merged to master |
What changes were proposed in this pull request?
Replace manual column-by-column type coercion with
ArrowBatchTransformer.enforce_schemain three places:ArrowStreamArrowUDTFSerializer.apply_type_coercionin serializers.pyArrowStreamArrowUDFSerializer.create_batchin serializers.pyprocess_resultsin worker.pyAlso:
arrow_castparameter toenforce_schemafor strict type matching modeKeyErrorhandling inenforce_schemafor missing columns with user-friendly errorWhy are the changes needed?
These three places duplicated the same coerce-and-reassemble logic that
enforce_schemaalready provides.Does this PR introduce any user-facing change?
Error messages for type/schema mismatches in Arrow UDTFs are slightly changed to be consistent with other Arrow UDF error messages.
How was this patch tested?
Existing tests in
test_arrow_udtf.pyandtest_arrow_udf_scalar.py.Was this patch authored or co-authored using generative AI tooling?
No.