Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46253][PYTHON] Plan Python data source read using MapInArrow #44170

Closed

Conversation

allisonwang-db
Copy link
Contributor

What changes were proposed in this pull request?

This PR changes how we plan Python data source read. Instead of using a regular Python UDTF, we can use an arrow UDF and plan the data source read using the MapInArrow operator.

Why are the changes needed?

To improve the performance

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

No

@allisonwang-db
Copy link
Contributor Author

cc @ueshin

@allisonwang-db
Copy link
Contributor Author

cc @cloud-fan @HyukjinKwon

def batched(iterator: Iterator, n: int) -> Iterator:
return iter(functools.partial(lambda it: list(islice(it, n)), iterator), [])

max_batch_size = int(os.environ.get("ARROW_MAX_RECORDS_PER_BATCH", "10000"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is probably fine for now because the batch size is less likely changed often, we should ideally send the configuration through the socket. Otherwise, it will create a new Python worker whenever you change this configuration instead of reusing.

def __init__(self) -> None:
self.ser = CloudPickleSerializer()
# Wrap the data source read logic in an mapInArrow UDF.
import pyarrow as pa
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and we shouldn't forget to document that it requires a PyArrow.

@HyukjinKwon
Copy link
Member

Merged to master.

pa_schema = to_arrow_schema(return_type)
column_names = return_type.fieldNames()
column_converters = [
LocalDataToArrowConversion._create_converter(field.dataType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since LocalDataToArrowConversion and ArrowTableToRowsConversion were not designed for data source api, so I think we should look into it to make sure the behavior is as expected, e.g.
1, there is a _deduplicate_field_names logic introduced in 71bac15, no sure whether is should be used in data source;
2, IIRC, the internally used to_arrow_schema doesn't support all SQL types

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants