Skip to content

Commit

Permalink
[SPARK-27878][SQL] Support ARRAY(sub-SELECT) expressions
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Jul 11, 2019
1 parent 1b23267 commit fcef61e
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 2 deletions.
Expand Up @@ -703,6 +703,7 @@ primaryExpression
FROM srcStr=valueExpression ')' #trim
| OVERLAY '(' input=valueExpression PLACING replace=valueExpression
FROM position=valueExpression (FOR length=valueExpression)? ')' #overlay
| ARRAY '(' query ')' #array
;

constant
Expand Down
Expand Up @@ -1533,6 +1533,18 @@ class Analyzer(
ListQuery(plan, exprs, exprId, plan.output)
})
InSubquery(values, expr.asInstanceOf[ListQuery])
case a @ ArraySubquery(sub, _, exprId) if !sub.resolved =>
resolveSubQuery(a, plans) { (plan, children) =>
// Array subquery must return one column as output.
if (plan.output.size != 1) {
failAnalysis(
s"Array subquery must return only one column, but got ${plan.output.size}")
}
ScalarSubquery(Aggregate(Seq.empty, Seq(
Alias(AggregateExpression(CollectList(plan.output.head), Complete, false), "array()")
()
), plan))
}
}
}

Expand Down
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -70,6 +71,26 @@ case class CreateArray(children: Seq[Expression]) extends Expression {
override def prettyName: String = "array"
}

case class ArraySubquery(
plan: LogicalPlan,
children: Seq[Expression] = Seq.empty,
exprId: ExprId = NamedExpression.newExprId)
extends SubqueryExpression(plan, children, exprId) with Unevaluable {
override def dataType: DataType = {
assert(plan.schema.fields.nonEmpty, "Array subquery should have only one column")
plan.schema.fields.head.dataType
}
override def nullable: Boolean = true
override def withNewPlan(plan: LogicalPlan): ArraySubquery = copy(plan = plan)
override def toString: String = s"array-subquery#${exprId.id} $conditionString"
override lazy val canonicalized: Expression = {
ArraySubquery(
plan.canonicalized,
children.map(_.canonicalized),
ExprId(0))
}
}

private [sql] object GenArrayData {
/**
* Return Java code pieces based on DataType and array size to allocate ArrayData class
Expand Down
Expand Up @@ -1436,6 +1436,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}

/**
* Create an Array from a query.
*/
override def visitArray(ctx: ArrayContext): Expression = withOrigin(ctx) {
ArraySubquery(plan(ctx.query))
}

/**
* Create a (windowed) Function expression.
*/
Expand Down
Expand Up @@ -780,4 +780,8 @@ class ExpressionParserSuite extends AnalysisTest {
assertEqual("current_timestamp", UnresolvedAttribute.quoted("current_timestamp"))
}
}

test("Array from subquery") {
assertEqual("array(SELECT c FROM t)", ArraySubquery(table("t").select('c)))
}
}
Expand Up @@ -73,6 +73,8 @@ trait PlanTestBase extends PredicateHelper with SQLHelper { self: Suite =>
e.copy(exprId = ExprId(0))
case l: ListQuery =>
l.copy(exprId = ExprId(0))
case a: ArraySubquery =>
a.copy(exprId = ExprId(0))
case a: AttributeReference =>
AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
case a: Alias =>
Expand Down
7 changes: 7 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/array.sql
Expand Up @@ -90,3 +90,10 @@ select
size(date_array),
size(timestamp_array)
from primitive_arrays;

-- array from subquery
select array(select 1);
select array(select a from data);
select array(select a from data where false);
select array(select 1, 2);
select array(select a, a from data);
Expand Up @@ -28,7 +28,7 @@ create temporary view int4_tbl as select * from values
-- from generate_series(1, 3) s2 group by s2) ss
-- order by 1, 2;

-- [SPARK-27878] Support ARRAY(sub-SELECT) expressions
-- [SPARK-27769] Handling of sublinks within outer-level aggregates.
-- explain (verbose, costs off)
-- select array(select sum(x+y) s
-- from generate_series(1,3) y group by y order by s)
Expand Down
44 changes: 43 additions & 1 deletion sql/core/src/test/resources/sql-tests/results/array.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 12
-- Number of queries: 17


-- !query 0
Expand Down Expand Up @@ -160,3 +160,45 @@ from primitive_arrays
struct<size(boolean_array):int,size(tinyint_array):int,size(smallint_array):int,size(int_array):int,size(bigint_array):int,size(decimal_array):int,size(double_array):int,size(float_array):int,size(date_array):int,size(timestamp_array):int>
-- !query 11 output
1 2 2 2 2 2 2 2 2 2


-- !query 12
select array(select 1)
-- !query 12 schema
struct<scalarsubquery():array<int>>
-- !query 12 output
[1]


-- !query 13
select array(select a from data)
-- !query 13 schema
struct<scalarsubquery():array<string>>
-- !query 13 output
["one","two"]


-- !query 14
select array(select a from data where false)
-- !query 14 schema
struct<scalarsubquery():array<string>>
-- !query 14 output
[]


-- !query 15
select array(select 1, 2)
-- !query 15 schema
struct<>
-- !query 15 output
org.apache.spark.sql.AnalysisException
Array subquery must return only one column, but got 2;


-- !query 16
select array(select a, a from data)
-- !query 16 schema
struct<>
-- !query 16 output
org.apache.spark.sql.AnalysisException
Array subquery must return only one column, but got 2;

0 comments on commit fcef61e

Please sign in to comment.