Skip to content

Comments

Supporting TTL with Multiple State Variables, using ForkJoinPool instead of single thread#2

Merged
ericm-db merged 3 commits intottl-pocfrom
mult-ttl
Mar 5, 2024
Merged

Supporting TTL with Multiple State Variables, using ForkJoinPool instead of single thread#2
ericm-db merged 3 commits intottl-pocfrom
mult-ttl

Conversation

@ericm-db
Copy link
Owner

@ericm-db ericm-db commented Mar 5, 2024

What changes were proposed in this pull request?

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

@ericm-db ericm-db changed the title Mult ttl Supporting TTL with Multiple State Variables, using ForkJoinPool instead of single thread Mar 5, 2024
@ericm-db ericm-db merged commit a59763f into ttl-poc Mar 5, 2024
ericm-db pushed a commit that referenced this pull request Jun 26, 2024
… throw internal error

### What changes were proposed in this pull request?

This PR fixes the error messages and classes when Python UDFs are used in higher order functions.

### Why are the changes needed?

To show the proper user-facing exceptions with error classes.

### Does this PR introduce _any_ user-facing change?

Yes, previously it threw internal error such as:

```python
from pyspark.sql.functions import transform, udf, col, array
spark.range(1).select(transform(array("id"), lambda x: udf(lambda y: y)(x))).collect()
```

Before:

```
py4j.protocol.Py4JJavaError: An error occurred while calling o74.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 0.0 failed 1 times, most recent failure: Lost task 15.0 in stage 0.0 (TID 15) (ip-192-168-123-103.ap-northeast-2.compute.internal executor driver): org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: <lambda>(lambda x_0#3L)#2 SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
	at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
```

After:

```
pyspark.errors.exceptions.captured.AnalysisException: [INVALID_LAMBDA_FUNCTION_CALL.UNEVALUABLE] Invalid lambda function call. Python UDFs should be used in a lambda function at a higher order function. However, "<lambda>(lambda x_0#3L)" was a Python UDF. SQLSTATE: 42K0D;
Project [transform(array(id#0L), lambdafunction(<lambda>(lambda x_0#3L)#2, lambda x_0#3L, false)) AS transform(array(id), lambdafunction(<lambda>(lambda x_0#3L), namedlambdavariable()))#4]
+- Range (0, 1, step=1, splits=Some(16))
```

### How was this patch tested?

Unittest was added

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47079 from HyukjinKwon/SPARK-48706.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
ericm-db pushed a commit that referenced this pull request Dec 12, 2024
### What changes were proposed in this pull request?
Fix self-join after `applyInArrow`, the same issue of `applyInPandas` was fixed in apache#31429

### Why are the changes needed?
bug fix

before:
```
In [1]: import pyarrow as pa

In [2]: df = spark.createDataFrame([(1, 1)], ("k", "v"))

In [3]: def arrow_func(key, table):
   ...:     return pa.Table.from_pydict({"x": [2], "y": [2]})
   ...:

In [4]: df2 = df.groupby("k").applyInArrow(arrow_func, schema="x long, y long")

In [5]: df2.show()
24/12/04 17:47:43 WARN CheckAllocator: More than one DefaultAllocationManager on classpath. Choosing first found
+---+---+
|  x|  y|
+---+---+
|  2|  2|
+---+---+

In [6]: df2.join(df2)
...
Failure when resolving conflicting references in Join:
'Join Inner
:- FlatMapGroupsInArrow [k#0L], arrow_func(k#0L, v#1L)#2, [x#3L, y#4L]
:  +- Project [k#0L, k#0L, v#1L]
:     +- LogicalRDD [k#0L, v#1L], false
+- FlatMapGroupsInArrow [k#12L], arrow_func(k#12L, v#13L)#2, [x#3L, y#4L]
   +- Project [k#12L, k#12L, v#13L]
      +- LogicalRDD [k#12L, v#13L], false

Conflicting attributes: "x", "y". SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
	at org.apache.spark.SparkException$.internalError(SparkException.scala:79)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:798)
```

after:
```
In [6]: df2.join(df2)
Out[6]: DataFrame[x: bigint, y: bigint, x: bigint, y: bigint]

In [7]: df2.join(df2).show()
+---+---+---+---+
|  x|  y|  x|  y|
+---+---+---+---+
|  2|  2|  2|  2|
+---+---+---+---+
```

### Does this PR introduce _any_ user-facing change?
bug fix

### How was this patch tested?
added tests

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#49056 from zhengruifeng/fix_arrow_join.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant