[SPARK-37085][PYTHON][SQL] Add list/tuple overloads to array, struct, create_map, map_concat#34354
[SPARK-37085][PYTHON][SQL] Add list/tuple overloads to array, struct, create_map, map_concat#34354zero323 wants to merge 5 commits intoapache:masterfrom
Conversation
|
New annotations are already implemented, but I think we might have to redefine FYI @ueshin, @HyukjinKwon, @xinrong-databricks |
|
Test build #144500 has finished for PR 34354 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
What's your idea like? |
Long story short, I've been looking into different scenarios for using aliases and types. Adding inline hints, definitely introduced uses cases that we didn't have before, most notably I suspect, that some of the cases where invariant generics hit us, might be addressed with bounded type vars: from typing import overload, List, TypeVar, Union
from pyspark.sql import Column
from pyspark.sql.functions import col
ColumnOrName = Union[str, Column]
ColumnOrName_ = TypeVar("ColumnOrName_", bound=ColumnOrName)
def array(__cols: List[ColumnOrName_]): ...
column_names = ["a", "b", "c"]
array(column_names)
columns = [col(x) for x in column_names]
array(columns)but these are not universal and there might be some caveats that I don't see at the moment. I hope there will be an opportunity to discuss this stuff in a more interactive manner. (Note: |
### What changes were proposed in this pull request?
This PR changes changes `RDD[~T]` and `DStream[~T]` to `RDD[+T]` and `DStream[+T]` respectively.
### Why are the changes needed?
To improve usability of the current annotations and simplify further development of type hints. Let's take simple `RDD` to `DataFrame` as an example. Currently, the following code will not type check
```python
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([(1, 2)])
reveal_type(rdd)
spark.createDataFrame(rdd)
```
with
```
main.py:8: note: Revealed type is "pyspark.rdd.RDD[Tuple[builtins.int, builtins.int]]"
main.py:10: error: Argument 1 to "createDataFrame" of "SparkSession" has incompatible type "RDD[Tuple[int, int]]"; expected "Union[RDD[Tuple[Any, ...]], Iterable[Tuple[Any, ...]]]"
Found 1 error in 1 file (checked 1 source file)
```
To type check, `rdd` would have to be annotated with specific type, matching the signature of the `createDataFrame` method:
```python
rdd: RDD[Tuple[Any, ...]] = sc.parallelize([(1, 2)])
```
Alternatively, one could inline definition:
```python
spark.createDataFrame(sc.parallelize([(1, 2)]))
```
Similarly, with `pyspark.mllib`:
```python
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.linalg import SparseVector, Vectors
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([
Vectors.sparse(10, [1, 3, 5], [1, 1, 1]),
Vectors.sparse(10, [2, 4, 6], [1, 1, 1]),
])
KMeans.train(rdd, 2)
```
we'd get
```
main.py:14: error: Argument 1 to "train" of "KMeans" has incompatible type "RDD[SparseVector]"; expected "RDD[Union[ndarray[Any, Any], Vector, List[float], Tuple[float, ...]]]"
Found 1 error in 1 file (checked 1 source file)
```
but this time, we'd need much more complex annotation (inlining would work as well):
```python
rdd: RDD[Union[ndarray[Any, Any], Vector, List[float], Tuple[float, ...]]] = sc.parallelize([
Vectors.sparse(10, [1, 3, 5], [1, 1, 1]),
Vectors.sparse(10, [2, 4, 6], [1, 1, 1]),
])
```
This happens because
- RDD is invariant in terms of stored type.
- mypy doesn't look forward to infer types of objects depending on the usage context (similarly to Scala console / spark-shell, but unlike standalone Scala compiler, which allows us to have [examples like this](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala))
It not only makes things verbose, but also fragile and dependent on details of implementation. In the first example, where we have top level `Union`, we can just use `RDD[...]` and ignore other members.
In the second case, where `Union` is a type parameter we have to match all its components (it could be simpler if we didn't use `RDD[VectorLike]` but defined something like `RDD[ndarray] | RDD[Vector] | RDD[List[float]] | RDD[Tuple[float, ...]]]`, which should make it closer to the first case, though not semantically equivalent to the current signature).
Theoretically, we could partially address this with different definitions of aliases, like using type bounds (see discussion under #34354), but it doesn't scale well and requires same steps to be taken by every library that depends on PySpark.
See also related discussion about Scala counterpart ‒ SPARK-1296
### Does this PR introduce _any_ user-facing change?
Type hints only.
Users will be able to use both subclasses of `RDD` / `DStream` in certain contexts, without explicit annotations or casts (both examples will pass type checker in their original form).
### How was this patch tested?
Existing tests and not released data tests (SPARK-36989).
Closes #34374 from zero323/SPARK-37104.
Authored-by: zero323 <mszymkiewicz@gmail.com>
Signed-off-by: zero323 <mszymkiewicz@gmail.com>
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #145577 has finished for PR 34354 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #145589 has finished for PR 34354 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #146150 has finished for PR 34354 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
f8b396f to
7a646b1
Compare
|
Test build #146374 has finished for PR 34354 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test status failure |
|
@itholic can you take a look please? |
|
Could you rebase this branch to master ?? mypy annotation tests seems not to be passed with current status |
@itholic Done. It seems to pass everything in CI. Did you have any particular issues in mind (I've seen some weird local issues, until I cleared mypy cache after the upgrade). |
|
@zero323 hmm.. I didn't see anything particularly strange except for a few issues caused by different mypy version. It seems to be working fine now. |
|
Oh, |
| @overload | ||
| def struct(__cols: Union[List["ColumnOrName_"], Tuple["ColumnOrName_", ...]]) -> Column: | ||
| ... |
There was a problem hiding this comment.
Maybe seems like List["ColumnOrName"] just works fine.
Could you briefly explain why we need to use Union the List and Tuple ??
And could I ask if what is the ... mean in ["ColumnOrName_", ...] ??
There was a problem hiding this comment.
We still want to support (it was common user request in the past) calls like
struct(("foo", "bar"))which shouldn't be accepted without Tuple (or some supertype).
If you try
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 006d10c9fc..caf17a84b3 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1677,13 +1677,11 @@ def struct(*cols: "ColumnOrName") -> Column:
@overload
-def struct(__cols: Union[List["ColumnOrName_"], Tuple["ColumnOrName_", ...]]) -> Column:
+def struct(__cols: Union[List["ColumnOrName_"]]) -> Column:
...
-def struct(
- *cols: Union["ColumnOrName", Union[List["ColumnOrName_"], Tuple["ColumnOrName_", ...]]]
-) -> Column:
+def struct(*cols: Union["ColumnOrName", Union[List["ColumnOrName_"]]]) -> Column:
"""Creates a new struct column.
.. versionadded:: 1.4.0
diff --git a/python/pyspark/sql/tests/typing/test_functions.yml b/python/pyspark/sql/tests/typing/test_functions.yml
index efb3293472..f5f2f13c4f 100644
--- a/python/pyspark/sql/tests/typing/test_functions.yml
+++ b/python/pyspark/sql/tests/typing/test_functions.yml
@@ -66,6 +66,8 @@
create_map(col_objs)
map_concat(col_objs)
+ struct(("foo", "bar"))
+
out: |
main:29: error: No overload variant of "array" matches argument types "List[Column]", "List[Column]" [call-overload]
main:29: note: Possible overload variant:you should see error in data tests
___________________________ varargFunctionsOverloads ___________________________
/path/to/spark/python/pyspark/sql/tests/typing/test_functions.yml:19:
E pytest_mypy_plugins.utils.TypecheckAssertionError: Invalid output:
E Actual:
E main:50: error: No overload variant of "struct" matches argument type "Tuple[str, str]" [call-overload] (diff)
E main:50: note: Possible overload variants: (diff)
E main:50: note: def struct(*cols: Union[Column, str]) -> Column (diff)
E main:50: note: def [ColumnOrName_] struct(List[ColumnOrName_]) -> Column (diff)
E Expected:
E (empty)
=========================== short test summary info ============================
And could I ask if what is the
...mean in["ColumnOrName_", ...]??
Tuples are typed like product types, so Tuple[ColumnOrName_] matches tuple with exactly one column or str element. In contrast Tuple["ColumnOrName_", ...] matches tuples of arbitrary size, as long as all elements are columns or strings (there is mypy doc section that discusses this syntax further).
There was a problem hiding this comment.
Got it. Thanks for the comment!! 🙏
I don't think I've seen this one. In my local env I primarily see some errors caused by output mismatch in data tests, dynamically defined methods and stuff covered by #34946, but all of these are non-deterministic and rare, so I don't even have good place to start debugging and asking questions. |
itholic
left a comment
There was a problem hiding this comment.
It seems to be the best for now, until mypy itself supports looser typing conventions.
HyukjinKwon
left a comment
There was a problem hiding this comment.
Haven't taken a close look but I am fine with this.
|
Merged into master. Thanks everyone! |
As @ueshin pointed out, using
|
… create_map, map_concat ### What changes were proposed in this pull request? This PR adds overloads to the following `pyspark.sql.functions`: - `array` - `struct` - `create_map` - `map_concat` to support calls with a single `list` or `tuple` argument, i.e. ```python array(["foo", "bar"]) ``` ### Why are the changes needed? These calls are supported by the current implementation, but don't type check. ### Does this PR introduce _any_ user-facing change? Type checker only, as described above. ### How was this patch tested? Existing tests and manual tests (to be added in SPARK-36989) Closes apache#34354 from zero323/SPARK-37085. Authored-by: zero323 <mszymkiewicz@gmail.com> Signed-off-by: zero323 <mszymkiewicz@gmail.com>
What changes were proposed in this pull request?
This PR adds overloads to the following
pyspark.sql.functions:arraystructcreate_mapmap_concatto support calls with a single
listortupleargument, i.e.Why are the changes needed?
These calls are supported by the current implementation, but don't type check.
Does this PR introduce any user-facing change?
Type checker only, as described above.
How was this patch tested?
Existing tests and manual tests (to be added in SPARK-36989)