Skip to content

Commit

Permalink
[SPARK-28273][SQL][PYTHON] Convert and port 'pgSQL/case.sql' into UDF…
Browse files Browse the repository at this point in the history
… test base

## What changes were proposed in this pull request?

This PR adds some tests converted from `pgSQL/case.sql'` to test UDFs. Please see contribution guide of this umbrella ticket - [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).

This PR also contains two minor fixes:

1. Change name of Scala UDF from `UDF:name(...)` to `name(...)` to be consistent with Python'

2. Fix Scala UDF at `IntegratedUDFTestUtils.scala ` to handle `null` in strings.

<details><summary>Diff comparing to 'pgSQL/case.sql'</summary>
<p>

```diff
diff --git a/sql/core/src/test/resources/sql-tests/results/pgSQL/case.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out
index fa078d16d6d..55bef64338f 100644
--- a/sql/core/src/test/resources/sql-tests/results/pgSQL/case.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/pgSQL/udf-case.sql.out
 -115,7 +115,7  struct<>
 -- !query 13
 SELECT '3' AS `One`,
   CASE
-    WHEN 1 < 2 THEN 3
+    WHEN CAST(udf(1 < 2) AS boolean) THEN 3
   END AS `Simple WHEN`
 -- !query 13 schema
 struct<One:string,Simple WHEN:int>
 -126,10 +126,10  struct<One:string,Simple WHEN:int>
 -- !query 14
 SELECT '<NULL>' AS `One`,
   CASE
-    WHEN 1 > 2 THEN 3
+    WHEN 1 > 2 THEN udf(3)
   END AS `Simple default`
 -- !query 14 schema
-struct<One:string,Simple default:int>
+struct<One:string,Simple default:string>
 -- !query 14 output
 <NULL> NULL

 -137,17 +137,17  struct<One:string,Simple default:int>
 -- !query 15
 SELECT '3' AS `One`,
   CASE
-    WHEN 1 < 2 THEN 3
-    ELSE 4
+    WHEN udf(1) < 2 THEN udf(3)
+    ELSE udf(4)
   END AS `Simple ELSE`
 -- !query 15 schema
-struct<One:string,Simple ELSE:int>
+struct<One:string,Simple ELSE:string>
 -- !query 15 output
 3      3

 -- !query 16
-SELECT '4' AS `One`,
+SELECT udf('4') AS `One`,
   CASE
     WHEN 1 > 2 THEN 3
     ELSE 4
 -159,10 +159,10  struct<One:string,ELSE default:int>

 -- !query 17
-SELECT '6' AS `One`,
+SELECT udf('6') AS `One`,
   CASE
-    WHEN 1 > 2 THEN 3
-    WHEN 4 < 5 THEN 6
+    WHEN CAST(udf(1 > 2) AS boolean) THEN 3
+    WHEN udf(4) < 5 THEN 6
     ELSE 7
   END AS `Two WHEN with default`
 -- !query 17 schema
 -173,7 +173,7  struct<One:string,Two WHEN with default:int>

 -- !query 18
 SELECT '7' AS `None`,
-  CASE WHEN rand() < 0 THEN 1
+  CASE WHEN rand() < udf(0) THEN 1
   END AS `NULL on no matches`
 -- !query 18 schema
 struct<None:string,NULL on no matches:int>
 -182,36 +182,36  struct<None:string,NULL on no matches:int>

 -- !query 19
-SELECT CASE WHEN 1=0 THEN 1/0 WHEN 1=1 THEN 1 ELSE 2/0 END
+SELECT CASE WHEN CAST(udf(1=0) AS boolean) THEN 1/0 WHEN 1=1 THEN 1 ELSE 2/0 END
 -- !query 19 schema
-struct<CASE WHEN (1 = 0) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
+struct<CASE WHEN CAST(udf((1 = 0)) AS BOOLEAN) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
 -- !query 19 output
 1.0

 -- !query 20
-SELECT CASE 1 WHEN 0 THEN 1/0 WHEN 1 THEN 1 ELSE 2/0 END
+SELECT CASE 1 WHEN 0 THEN 1/udf(0) WHEN 1 THEN 1 ELSE 2/0 END
 -- !query 20 schema
-struct<CASE WHEN (1 = 0) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
+struct<CASE WHEN (1 = 0) THEN (CAST(1 AS DOUBLE) / CAST(CAST(udf(0) AS DOUBLE) AS DOUBLE)) WHEN (1 = 1) THEN CAST(1 AS DOUBLE) ELSE (CAST(2 AS DOUBLE) / CAST(0 AS DOUBLE)) END:double>
 -- !query 20 output
 1.0

 -- !query 21
-SELECT CASE WHEN i > 100 THEN 1/0 ELSE 0 END FROM case_tbl
+SELECT CASE WHEN i > 100 THEN udf(1/0) ELSE udf(0) END FROM case_tbl
 -- !query 21 schema
-struct<CASE WHEN (i > 100) THEN (CAST(1 AS DOUBLE) / CAST(0 AS DOUBLE)) ELSE CAST(0 AS DOUBLE) END:double>
+struct<CASE WHEN (i > 100) THEN udf((cast(1 as double) / cast(0 as double))) ELSE udf(0) END:string>
 -- !query 21 output
-0.0
-0.0
-0.0
-0.0
+0
+0
+0
+0

 -- !query 22
-SELECT CASE 'a' WHEN 'a' THEN 1 ELSE 2 END
+SELECT CASE 'a' WHEN 'a' THEN udf(1) ELSE udf(2) END
 -- !query 22 schema
-struct<CASE WHEN (a = a) THEN 1 ELSE 2 END:int>
+struct<CASE WHEN (a = a) THEN udf(1) ELSE udf(2) END:string>
 -- !query 22 output
 1

 -283,7 +283,7  big

 -- !query 27
-SELECT * FROM CASE_TBL WHERE COALESCE(f,i) = 4
+SELECT * FROM CASE_TBL WHERE udf(COALESCE(f,i)) = 4
 -- !query 27 schema
 struct<i:int,f:double>
 -- !query 27 output
 -291,7 +291,7  struct<i:int,f:double>

 -- !query 28
-SELECT * FROM CASE_TBL WHERE NULLIF(f,i) = 2
+SELECT * FROM CASE_TBL WHERE udf(NULLIF(f,i)) = 2
 -- !query 28 schema
 struct<i:int,f:double>
 -- !query 28 output
 -299,10 +299,10  struct<i:int,f:double>

 -- !query 29
-SELECT COALESCE(a.f, b.i, b.j)
+SELECT udf(COALESCE(a.f, b.i, b.j))
   FROM CASE_TBL a, CASE2_TBL b
 -- !query 29 schema
-struct<coalesce(f, CAST(i AS DOUBLE), CAST(j AS DOUBLE)):double>
+struct<udf(coalesce(f, cast(i as double), cast(j as double))):string>
 -- !query 29 output
 -30.3
 -30.3
 -332,8 +332,8  struct<coalesce(f, CAST(i AS DOUBLE), CAST(j AS DOUBLE)):double>

 -- !query 30
 SELECT *
-  FROM CASE_TBL a, CASE2_TBL b
-  WHERE COALESCE(a.f, b.i, b.j) = 2
+   FROM CASE_TBL a, CASE2_TBL b
+   WHERE udf(COALESCE(a.f, b.i, b.j)) = 2
 -- !query 30 schema
 struct<i:int,f:double,i:int,j:int>
 -- !query 30 output
 -342,7 +342,7  struct<i:int,f:double,i:int,j:int>

 -- !query 31
-SELECT '' AS Five, NULLIF(a.i,b.i) AS `NULLIF(a.i,b.i)`,
+SELECT udf('') AS Five, NULLIF(a.i,b.i) AS `NULLIF(a.i,b.i)`,
   NULLIF(b.i, 4) AS `NULLIF(b.i,4)`
   FROM CASE_TBL a, CASE2_TBL b
 -- !query 31 schema
 -377,7 +377,7  struct<Five:string,NULLIF(a.i,b.i):int,NULLIF(b.i,4):int>
 -- !query 32
 SELECT '' AS `Two`, *
   FROM CASE_TBL a, CASE2_TBL b
-  WHERE COALESCE(f,b.i) = 2
+  WHERE CAST(udf(COALESCE(f,b.i) = 2) AS boolean)
 -- !query 32 schema
 struct<Two:string,i:int,f:double,i:int,j:int>
 -- !query 32 output
 -388,15 +388,15  struct<Two:string,i:int,f:double,i:int,j:int>
 -- !query 33
 SELECT CASE
   (CASE vol('bar')
-    WHEN 'foo' THEN 'it was foo!'
-    WHEN vol(null) THEN 'null input'
+    WHEN udf('foo') THEN 'it was foo!'
+    WHEN udf(vol(null)) THEN 'null input'
     WHEN 'bar' THEN 'it was bar!' END
   )
-  WHEN 'it was foo!' THEN 'foo recognized'
-  WHEN 'it was bar!' THEN 'bar recognized'
-  ELSE 'unrecognized' END
+  WHEN udf('it was foo!') THEN 'foo recognized'
+  WHEN 'it was bar!' THEN udf('bar recognized')
+  ELSE 'unrecognized' END AS col
 -- !query 33 schema
-struct<CASE WHEN (CASE WHEN (UDF:vol(bar) = foo) THEN it was foo! WHEN (UDF:vol(bar) = UDF:vol(null)) THEN null input WHEN (UDF:vol(bar) = bar) THEN it was bar! END = it was foo!) THEN foo recognized WHEN (CASE WHEN (UDF:vol(bar) = foo) THEN it was foo! WHEN (UDF:vol(bar) = UDF:vol(null)) THEN null input WHEN (UDF:vol(bar) = bar) THEN it was bar! END = it was bar!) THEN bar recognized ELSE unrecognized END:string>
+struct<col:string>
 -- !query 33 output
 bar recognized
```

</p>
</details>

#25069 contains the same minor fixes as it's required to write the tests.

## How was this patch tested?

Tested as guided in [SPARK-27921](https://issues.apache.org/jira/browse/SPARK-27921).

Closes #25070 from HyukjinKwon/SPARK-28273.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
HyukjinKwon authored and cloud-fan committed Jul 9, 2019
1 parent a5ff922 commit fe3e34d
Show file tree
Hide file tree
Showing 8 changed files with 713 additions and 13 deletions.
6 changes: 3 additions & 3 deletions python/pyspark/sql/udf.py
Expand Up @@ -376,17 +376,17 @@ def registerJavaFunction(self, name, javaClassName, returnType=None):
>>> spark.udf.registerJavaFunction(
... "javaStringLength", "test.org.apache.spark.sql.JavaStringLength", IntegerType())
>>> spark.sql("SELECT javaStringLength('test')").collect()
[Row(UDF:javaStringLength(test)=4)]
[Row(javaStringLength(test)=4)]
>>> spark.udf.registerJavaFunction(
... "javaStringLength2", "test.org.apache.spark.sql.JavaStringLength")
>>> spark.sql("SELECT javaStringLength2('test')").collect()
[Row(UDF:javaStringLength2(test)=4)]
[Row(javaStringLength2(test)=4)]
>>> spark.udf.registerJavaFunction(
... "javaStringLength3", "test.org.apache.spark.sql.JavaStringLength", "integer")
>>> spark.sql("SELECT javaStringLength3('test')").collect()
[Row(UDF:javaStringLength3(test)=4)]
[Row(javaStringLength3(test)=4)]
"""

jdt = None
Expand Down
Expand Up @@ -57,8 +57,7 @@ case class ScalaUDF(

override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic)

override def toString: String =
s"${udfName.map(name => s"UDF:$name").getOrElse("UDF")}(${children.mkString(", ")})"
override def toString: String = s"${udfName.getOrElse("UDF")}(${children.mkString(", ")})"

// scalastyle:off line.size.limit

Expand Down
272 changes: 272 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/udf/pgSQL/udf-case.sql
@@ -0,0 +1,272 @@
--
-- Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
--
--
-- CASE
-- https://github.com/postgres/postgres/blob/REL_12_BETA1/src/test/regress/sql/case.sql
-- Test the CASE statement
--
-- This test suite contains two Cartesian products without using explicit CROSS JOIN syntax.
-- Thus, we set spark.sql.crossJoin.enabled to true.

-- This test file was converted from pgSQL/case.sql.
-- Note that currently registered UDF returns a string. So there are some differences, for instance
-- in string cast within UDF in Scala and Python.

set spark.sql.crossJoin.enabled=true;
CREATE TABLE CASE_TBL (
i integer,
f double
) USING parquet;

CREATE TABLE CASE2_TBL (
i integer,
j integer
) USING parquet;

INSERT INTO CASE_TBL VALUES (1, 10.1);
INSERT INTO CASE_TBL VALUES (2, 20.2);
INSERT INTO CASE_TBL VALUES (3, -30.3);
INSERT INTO CASE_TBL VALUES (4, NULL);

INSERT INTO CASE2_TBL VALUES (1, -1);
INSERT INTO CASE2_TBL VALUES (2, -2);
INSERT INTO CASE2_TBL VALUES (3, -3);
INSERT INTO CASE2_TBL VALUES (2, -4);
INSERT INTO CASE2_TBL VALUES (1, NULL);
INSERT INTO CASE2_TBL VALUES (NULL, -6);

--
-- Simplest examples without tables
--

SELECT '3' AS `One`,
CASE
WHEN CAST(udf(1 < 2) AS boolean) THEN 3
END AS `Simple WHEN`;

SELECT '<NULL>' AS `One`,
CASE
WHEN 1 > 2 THEN udf(3)
END AS `Simple default`;

SELECT '3' AS `One`,
CASE
WHEN udf(1) < 2 THEN udf(3)
ELSE udf(4)
END AS `Simple ELSE`;

SELECT udf('4') AS `One`,
CASE
WHEN 1 > 2 THEN 3
ELSE 4
END AS `ELSE default`;

SELECT udf('6') AS `One`,
CASE
WHEN CAST(udf(1 > 2) AS boolean) THEN 3
WHEN udf(4) < 5 THEN 6
ELSE 7
END AS `Two WHEN with default`;

SELECT '7' AS `None`,
CASE WHEN rand() < udf(0) THEN 1
END AS `NULL on no matches`;

-- Constant-expression folding shouldn't evaluate unreachable subexpressions
SELECT CASE WHEN CAST(udf(1=0) AS boolean) THEN 1/0 WHEN 1=1 THEN 1 ELSE 2/0 END;
SELECT CASE 1 WHEN 0 THEN 1/udf(0) WHEN 1 THEN 1 ELSE 2/0 END;

-- [SPARK-27923] PostgreSQL throws an exception but Spark SQL is NULL
-- However we do not currently suppress folding of potentially
-- reachable subexpressions
SELECT CASE WHEN i > 100 THEN udf(1/0) ELSE udf(0) END FROM case_tbl;

-- Test for cases involving untyped literals in test expression
SELECT CASE 'a' WHEN 'a' THEN udf(1) ELSE udf(2) END;

--
-- Examples of targets involving tables
--

SELECT '' AS `Five`,
CASE
WHEN i >= 3 THEN i
END AS `>= 3 or Null`
FROM CASE_TBL;

SELECT '' AS `Five`,
CASE WHEN i >= 3 THEN (i + i)
ELSE i
END AS `Simplest Math`
FROM CASE_TBL;

SELECT '' AS `Five`, i AS `Value`,
CASE WHEN (i < 0) THEN 'small'
WHEN (i = 0) THEN 'zero'
WHEN (i = 1) THEN 'one'
WHEN (i = 2) THEN 'two'
ELSE 'big'
END AS `Category`
FROM CASE_TBL;

SELECT '' AS `Five`,
CASE WHEN ((i < 0) or (i < 0)) THEN 'small'
WHEN ((i = 0) or (i = 0)) THEN 'zero'
WHEN ((i = 1) or (i = 1)) THEN 'one'
WHEN ((i = 2) or (i = 2)) THEN 'two'
ELSE 'big'
END AS `Category`
FROM CASE_TBL;

--
-- Examples of qualifications involving tables
--

--
-- NULLIF() and COALESCE()
-- Shorthand forms for typical CASE constructs
-- defined in the SQL standard.
--

SELECT * FROM CASE_TBL WHERE udf(COALESCE(f,i)) = 4;

SELECT * FROM CASE_TBL WHERE udf(NULLIF(f,i)) = 2;

SELECT udf(COALESCE(a.f, b.i, b.j))
FROM CASE_TBL a, CASE2_TBL b;

SELECT *
FROM CASE_TBL a, CASE2_TBL b
WHERE udf(COALESCE(a.f, b.i, b.j)) = 2;

SELECT udf('') AS Five, NULLIF(a.i,b.i) AS `NULLIF(a.i,b.i)`,
NULLIF(b.i, 4) AS `NULLIF(b.i,4)`
FROM CASE_TBL a, CASE2_TBL b;

SELECT '' AS `Two`, *
FROM CASE_TBL a, CASE2_TBL b
WHERE CAST(udf(COALESCE(f,b.i) = 2) AS boolean);

-- We don't support update now.
--
-- Examples of updates involving tables
--

-- UPDATE CASE_TBL
-- SET i = CASE WHEN i >= 3 THEN (- i)
-- ELSE (2 * i) END;

-- SELECT * FROM CASE_TBL;

-- UPDATE CASE_TBL
-- SET i = CASE WHEN i >= 2 THEN (2 * i)
-- ELSE (3 * i) END;

-- SELECT * FROM CASE_TBL;

-- UPDATE CASE_TBL
-- SET i = CASE WHEN b.i >= 2 THEN (2 * j)
-- ELSE (3 * j) END
-- FROM CASE2_TBL b
-- WHERE j = -CASE_TBL.i;

-- SELECT * FROM CASE_TBL;

--
-- Nested CASE expressions
--

-- This test exercises a bug caused by aliasing econtext->caseValue_isNull
-- with the isNull argument of the inner CASE's CaseExpr evaluation. After
-- evaluating the vol(null) expression in the inner CASE's second WHEN-clause,
-- the isNull flag for the case test value incorrectly became true, causing
-- the third WHEN-clause not to match. The volatile function calls are needed
-- to prevent constant-folding in the planner, which would hide the bug.

-- Wrap this in a single transaction so the transient '=' operator doesn't
-- cause problems in concurrent sessions
-- BEGIN;

-- CREATE FUNCTION vol(text) returns text as
-- 'begin return $1; end' language plpgsql volatile;

SELECT CASE
(CASE vol('bar')
WHEN udf('foo') THEN 'it was foo!'
WHEN udf(vol(null)) THEN 'null input'
WHEN 'bar' THEN 'it was bar!' END
)
WHEN udf('it was foo!') THEN 'foo recognized'
WHEN 'it was bar!' THEN udf('bar recognized')
ELSE 'unrecognized' END AS col;

-- We don't support the features below:
-- 1. CREATE DOMAIN ...
-- 2. CREATE OPERATOR ...
-- 3. CREATE TYPE ...

-- In this case, we can't inline the SQL function without confusing things.
-- CREATE DOMAIN foodomain AS text;

-- CREATE FUNCTION volfoo(text) returns foodomain as
-- 'begin return $1::foodomain; end' language plpgsql volatile;

-- CREATE FUNCTION inline_eq(foodomain, foodomain) returns boolean as
-- 'SELECT CASE $2::text WHEN $1::text THEN true ELSE false END' language sql;

-- CREATE OPERATOR = (procedure = inline_eq,
-- leftarg = foodomain, rightarg = foodomain);

-- SELECT CASE volfoo('bar') WHEN 'foo'::foodomain THEN 'is foo' ELSE 'is not foo' END;

-- ROLLBACK;

-- Test multiple evaluation of a CASE arg that is a read/write object (#14472)
-- Wrap this in a single transaction so the transient '=' operator doesn't
-- cause problems in concurrent sessions
-- BEGIN;

-- CREATE DOMAIN arrdomain AS int[];

-- CREATE FUNCTION make_ad(int,int) returns arrdomain as
-- 'declare x arrdomain;
-- begin
-- x := array[$1,$2];
-- return x;
-- end' language plpgsql volatile;

-- CREATE FUNCTION ad_eq(arrdomain, arrdomain) returns boolean as
-- 'begin return array_eq($1, $2); end' language plpgsql;

-- CREATE OPERATOR = (procedure = ad_eq,
-- leftarg = arrdomain, rightarg = arrdomain);

-- SELECT CASE make_ad(1,2)
-- WHEN array[2,4]::arrdomain THEN 'wrong'
-- WHEN array[2,5]::arrdomain THEN 'still wrong'
-- WHEN array[1,2]::arrdomain THEN 'right'
-- END;

-- ROLLBACK;

-- Test interaction of CASE with ArrayCoerceExpr (bug #15471)
-- BEGIN;

-- CREATE TYPE casetestenum AS ENUM ('e', 'f', 'g');

-- SELECT
-- CASE 'foo'::text
-- WHEN 'foo' THEN ARRAY['a', 'b', 'c', 'd'] || enum_range(NULL::casetestenum)::text[]
-- ELSE ARRAY['x', 'y']
-- END;

-- ROLLBACK;

--
-- Clean up
--

DROP TABLE CASE_TBL;
DROP TABLE CASE2_TBL;
set spark.sql.crossJoin.enabled=false;
Expand Up @@ -396,7 +396,7 @@ SELECT CASE
WHEN 'it was bar!' THEN 'bar recognized'
ELSE 'unrecognized' END
-- !query 33 schema
struct<CASE WHEN (CASE WHEN (UDF:vol(bar) = foo) THEN it was foo! WHEN (UDF:vol(bar) = UDF:vol(null)) THEN null input WHEN (UDF:vol(bar) = bar) THEN it was bar! END = it was foo!) THEN foo recognized WHEN (CASE WHEN (UDF:vol(bar) = foo) THEN it was foo! WHEN (UDF:vol(bar) = UDF:vol(null)) THEN null input WHEN (UDF:vol(bar) = bar) THEN it was bar! END = it was bar!) THEN bar recognized ELSE unrecognized END:string>
struct<CASE WHEN (CASE WHEN (vol(bar) = foo) THEN it was foo! WHEN (vol(bar) = vol(null)) THEN null input WHEN (vol(bar) = bar) THEN it was bar! END = it was foo!) THEN foo recognized WHEN (CASE WHEN (vol(bar) = foo) THEN it was foo! WHEN (vol(bar) = vol(null)) THEN null input WHEN (vol(bar) = bar) THEN it was bar! END = it was bar!) THEN bar recognized ELSE unrecognized END:string>
-- !query 33 output
bar recognized

Expand Down

0 comments on commit fe3e34d

Please sign in to comment.