Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28227][SQL] Support TRANSFORM with aggregation. #25028

Closed
wants to merge 6 commits into from

Conversation

AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Jul 2, 2019

What changes were proposed in this pull request?

For Spark SQL, it can't support sql like :
SELECT TRANSFORM ( d2, max(d1) as maxd1, cast(sum(d3) as string))
USING 'cat' AS (a,b,c)
FROM script_trans
WHERE d1 <= 100
GROUP BY d2
HAVING maxd1 > 0

But in hive, it can support this kind SQL.
This makes our SQL migration difficult and complex。
This PR is to support use Aggregation with TRANSFORM and make SQL migration from Hive to Sparkeasier.

How was this patch tested?

Added unit test.

@AngersZhuuuu AngersZhuuuu reopened this Jul 2, 2019
@AngersZhuuuu AngersZhuuuu changed the title [WIP][SPARK-28227][SQL] Support TRANSFORM with aggregation. [SPARK-28227][SQL] Support TRANSFORM with aggregation. Jul 3, 2019
@AngersZhuuuu
Copy link
Contributor Author

@cloud-fan @gatorsmile @HyukjinKwon @jerryshao @wangyum
Hi all, could you help to review this and give some advise.

@AngersZhuuuu
Copy link
Contributor Author

@dongjoon-hyun
Could you help to review this for me and can you help me to @ who work on this part

@wangyum
Copy link
Member

wangyum commented Jul 12, 2019

Could we implement ScriptTransformation in sql/core first(SPARK-15694)?

@HyukjinKwon
Copy link
Member

Yea, I agree with @wangyum's. Can we? @AngersZhuuuu

@AngersZhuuuu
Copy link
Contributor Author

Yea, I agree with @wangyum's. Can we? @AngersZhuuuu

Nowadays I am doing for support SparkThriftServer to enable proxy client user's authentication and make a PR for master.
#25179

After that, I will focus on this problem.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

@AngersZhuuuu, let's fix #25028 (comment) first. Feel free to open another PR.

@AngersZhuuuu
Copy link
Contributor Author

@AngersZhuuuu, let's fix #25028 (comment) first. Feel free to open another PR.

It's OK.

@alfozan
Copy link

alfozan commented Oct 29, 2019

@AngersZhuuuu one small regression:

test query:
MAP k / 10 USING 'cat' AS (one) from (select 10 as k);

Results:
with master:
1.0

with this PR:

Exception:
Error in query: cannot resolve '(k / 10)' given input columns: [(CAST(k AS DOUBLE) / CAST(10 AS DOUBLE))];;

This is a simplified test case from HiveCompatibilitySuite:
https://github.com/apache/spark/blob/master/sql/hive/src/test/resources/ql/src/test/queries/clientpositive/mapreduce1.q

@AngersZhuuuu
Copy link
Contributor Author

@AngersZhuuuu one small regression:

test query:
MAP k / 10 USING 'cat' AS (one) from (select 10 as k);

Results:
with master:
1.0

with this PR:

Exception:
Error in query: cannot resolve '(k / 10)' given input columns: [(CAST(k AS DOUBLE) / CAST(10 AS DOUBLE))];;

This is a simplified test case from HiveCompatibilitySuite:
https://github.com/apache/spark/blob/master/sql/hive/src/test/resources/ql/src/test/queries/clientpositive/mapreduce1.q

Yeah, i found this problem too. In some complex column expressions, we can't find a direct attribute name for a col, so I use sql pretty name. But some complex col's final name is not the same as toPrettySQL. Then cause this problem.

@alfozan
Copy link

alfozan commented Oct 29, 2019

Regarding the issue in #25028 (comment)

Instead of

    val namedExpressions = expressions.map {
      case e: NamedExpression => e
      case e: Expression => UnresolvedAlias(e)
    }

How about also using a manual aliasing function to create an alias for complex column expressions (i.e. BinaryExpression) instead of relying on toPrettySQL

Example:

   val aliasFunc = (position: Int, e: Expression) => s"gen_alias_${position}"

    val namedExpressions = expressions.zipWithIndex.map {
      case (e: NamedExpression, _) => e
      case (e: BinaryExpression, index) => UnresolvedAlias(e, Some(aliasFunc(index, _)))
      case (e: Expression, _) => UnresolvedAlias(e)
    }

Test query runs successfully.

@AngersZhuuuu
Copy link
Contributor Author

Regarding the issue in #25028 (comment)

Instead of

    val namedExpressions = expressions.map {
      case e: NamedExpression => e
      case e: Expression => UnresolvedAlias(e)
    }

How about also using a manual aliasing function to create an alias for complex column expressions (i.e. BinaryExpression) instead of relying on toPrettySQL

Example:

   val aliasFunc = (position: Int, e: Expression) => s"gen_alias_${position}"

    val namedExpressions = expressions.zipWithIndex.map {
      case (e: NamedExpression, _) => e
      case (e: BinaryExpression, index) => UnresolvedAlias(e, Some(aliasFunc(index, _)))
      case (e: Expression, _) => UnresolvedAlias(e)
    }

Test query runs successfully.

Good suggestion, make subquery 's output and transform's input with same alias. Can solve this problem. Nice method.

@alfozan
Copy link

alfozan commented Oct 29, 2019

Follow up - I think It's even better to always use a manual aliasing function and not just for a subset of expressions:

   val aliasFunc = (position: Int, e: Expression) => s"gen_alias_${position}"

    val namedExpressions = expressions.zipWithIndex.map {
      case (e: NamedExpression, _) => e
      case (e: Expression, index) => UnresolvedAlias(e, Some(aliasFunc(index, _)))
    }

Comment on lines +514 to +517
val namedExpressions = expressions.map {
case e: NamedExpression => e
case e: Expression => UnresolvedAlias(e)
}
Copy link

@alfozan alfozan Oct 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re discussion in #25028 (comment) and #25028 (comment)

Alternative suggestion:

   val aliasFunc = (position: Int, e: Expression) => s"gen_alias_${position}"

    val namedExpressions = expressions.zipWithIndex.map {
      case (e: NamedExpression, _) => e
      case (e: Expression, index) => UnresolvedAlias(e, Some(aliasFunc(index, _)))
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is ok even to rename all expression with aliasFunc in this place.
After current busy job I will restart this pr and follow your nice suggestion.

@alfozan
Copy link

alfozan commented Oct 29, 2019

Another issue:

test query:

FROM (select 1 as key, 100 as value) src
MAP src.*, src.key, CAST(src.key / 10 AS INT), CAST(src.key % 10 AS INT), src.value
USING 'cat' AS (k, v, tkey, ten, one, tvalue);

Results:
with master:
1 100 1 0 1 100

with this PR:
1 100 1 100 1 0

This is a simplified test case from HiveCompatibilitySuite:
https://github.com/apache/spark/blob/master/sql/hive/src/test/resources/ql/src/test/queries/clientpositive/mapreduce7.q

@alfozan
Copy link

alfozan commented Oct 29, 2019

simpler test case:

FROM (select 1 as key, 100 as value) src
MAP src.*, CAST(src.key % 10 AS INT), src.value
USING 'cat' AS (k, v, one, tvalue);

@AngersZhuuuu
Copy link
Contributor Author

simpler test case:

FROM (select 1 as key, 100 as value) src
MAP src.*, CAST(src.key % 10 AS INT), src.value
USING 'cat' AS (k, v, one, tvalue);

Star cause the problem . Trying solve this .

@AngersZhuuuu
Copy link
Contributor Author

simpler test case:

FROM (select 1 as key, 100 as value) src
MAP src.*, CAST(src.key % 10 AS INT), src.value
USING 'cat' AS (k, v, one, tvalue);

Found the reason, in current mode of trasform, in here

case t: ScriptTransformation if containsStar(t.input) =>

It will expand star with t.child.output, after t.child was analyzed, it will have src.key, src.value, gen_alias_2 src.value output, then expand method will make all t.child's output match src as transform's input. Then this error happened.

change to

      // If the script transformation input contains Stars, expand it.
      case t: ScriptTransformation if containsStar(t.input) =>
        t.copy(
          input = t.child.output
        )

This change is reasonable since transform's input is it's child's output.

@alfozan
Copy link

alfozan commented Nov 1, 2019

Another test query:

SELECT TRANSFORM(key, abs(key))
USING 'cat'
FROM (SELECT DISTINCT key FROM src);

The physical plan looks different with this PR applied. in particular, HashAggregate gets another output column, while it should be just one:

HashAggregate(keys=[key#18], functions=[], output=[key#18, **another_output**])

@AngersZhuuuu
Copy link
Contributor Author

Another test query:

SELECT TRANSFORM(key, abs(key))
USING 'cat'
FROM (SELECT DISTINCT key FROM src);

The physical plan looks different with this PR applied. in particular, HashAggregate gets another output column, while it should be just one:

HashAggregate(keys=[key#18], functions=[], output=[key#18, **another_output**])

Since we can solve every thing with path a Seq(UnresolvedStar(None)) to ScriptTrasnform

@alfozan
Copy link

alfozan commented Nov 1, 2019

Here's what I mean:

test query:

SELECT TRANSFORM(key, abs(key))
USING 'cat'
FROM (SELECT DISTINCT key FROM src);

The physical plan without this PR (on master):

ScriptTransformation [key#7, abs(key#7)], cat, [key#9, value#10], HiveScriptIOSchema(List(),List(),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),List((field.delim,	)),List((field.delim,	)),Some(org.apache.hadoop.hive.ql.exec.TextRecordReader),Some(org.apache.hadoop.hive.ql.exec.TextRecordWriter),true)
+- *(2) HashAggregate(keys=[1#16], functions=[], output=[key#7])
   +- Exchange hashpartitioning(1#16, 1), true, [id=#27]
      +- *(1) HashAggregate(keys=[1 AS 1#16], functions=[], output=[1#16])
         +- *(1) Scan OneRowRelation[]

The physical plan with this PR:

ScriptTransformation [key#0, abs(key)#9], cat, [key#2, value#3], HiveScriptIOSchema(List(),List(),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),List((field.delim,	)),List((field.delim,	)),Some(org.apache.hadoop.hive.ql.exec.TextRecordReader),Some(org.apache.hadoop.hive.ql.exec.TextRecordWriter),true)
+- *(2) HashAggregate(keys=[1#10], functions=[], output=[key#0, abs(key)#9])
   +- Exchange hashpartitioning(1#10, 1), true, [id=#21]
      +- *(1) HashAggregate(keys=[1 AS 1#10], functions=[], output=[1#10])
         +- *(1) Scan OneRowRelation[]

Difference:
+- *(2) HashAggregate(keys=[1#16], functions=[], output=[key#7]) vs
+- *(2) HashAggregate(keys=[1#10], functions=[], output=[key#0, abs(key)#9])

@AngersZhuuuu
Copy link
Contributor Author

Another issue:

query:
MAP k / 10 USING 'cat' AS (aa) from (select 10 as k);

Error in query: cannot resolve '(k / 10)' given input columns: [(CAST(k AS DOUBLE) / CAST(10 AS DOUBLE))];;

This can be solved by #25028 (comment)

@alfozan
Copy link

alfozan commented Nov 1, 2019

Another issue:
query:
MAP k / 10 USING 'cat' AS (aa) from (select 10 as k);
Error in query: cannot resolve '(k / 10)' given input columns: [(CAST(k AS DOUBLE) / CAST(10 AS DOUBLE))];;

This can be solved by #25028 (comment)

Yes we already discussed a solution in #25028 (comment)
which I can confirm it works.

Currently the open issue is #25028 (comment)

@AngersZhuuuu
Copy link
Contributor Author

AngersZhuuuu commented Nov 1, 2019

Here's what I mean:

test query:

SELECT TRANSFORM(key, abs(key))
USING 'cat'
FROM (SELECT DISTINCT key FROM src);

The physical plan without this PR (on master):

ScriptTransformation [key#7, abs(key#7)], cat, [key#9, value#10], HiveScriptIOSchema(List(),List(),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),List((field.delim,	)),List((field.delim,	)),Some(org.apache.hadoop.hive.ql.exec.TextRecordReader),Some(org.apache.hadoop.hive.ql.exec.TextRecordWriter),true)
+- *(2) HashAggregate(keys=[1#16], functions=[], output=[key#7])
   +- Exchange hashpartitioning(1#16, 1), true, [id=#27]
      +- *(1) HashAggregate(keys=[1 AS 1#16], functions=[], output=[1#16])
         +- *(1) Scan OneRowRelation[]

The physical plan with this PR:

ScriptTransformation [key#0, abs(key)#9], cat, [key#2, value#3], HiveScriptIOSchema(List(),List(),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),List((field.delim,	)),List((field.delim,	)),Some(org.apache.hadoop.hive.ql.exec.TextRecordReader),Some(org.apache.hadoop.hive.ql.exec.TextRecordWriter),true)
+- *(2) HashAggregate(keys=[1#10], functions=[], output=[key#0, abs(key)#9])
   +- Exchange hashpartitioning(1#10, 1), true, [id=#21]
      +- *(1) HashAggregate(keys=[1 AS 1#10], functions=[], output=[1#10])
         +- *(1) Scan OneRowRelation[]

Difference:
+- *(2) HashAggregate(keys=[1#16], functions=[], output=[key#7]) vs
+- *(2) HashAggregate(keys=[1#10], functions=[], output=[key#0, abs(key)#9])

This right since we now tread transform's child as an entire logical plan. It 's final output is two column, it's right. And transform use it 's out put as transform 's input. Reasonable

@AngersZhuuuu
Copy link
Contributor Author

@alfozan
You can see my branch's newest change. and check the update

@AngersZhuuuu
Copy link
Contributor Author

AngersZhuuuu commented Nov 1, 2019

Difference:
+- *(2) HashAggregate(keys=[1#16], functions=[], output=[key#7]) vs
+- *(2) HashAggregate(keys=[1#10], functions=[], output=[key#0, abs(key)#9])

Seems after my pr. the output is right.

@alfozan
Copy link

alfozan commented Nov 1, 2019

@alfozan
You can see my branch's newest change. and check the update

Could you share a link to the new branch/PR?

@AngersZhuuuu
Copy link
Contributor Author

@alfozan
You can see my branch's newest change. and check the update

Could you share a link to the new branch/PR?

AngersZhuuuu@f18a6f9

@alfozan
Copy link

alfozan commented Nov 1, 2019

Looks good! Thank you.

@AngersZhuuuu
Copy link
Contributor Author

Looks good! Thank you.

Also thank you for so many error case to make this pr better.

@AngersZhuuuu
Copy link
Contributor Author

@alfozan
Seems you are working on https://issues.apache.org/jira/browse/SPARK-15694
But I am confused that current ScripTransform execution process has been implemented, are you clear what we need to do?

@alfozan
Copy link

alfozan commented Nov 5, 2019

@alfozan
Seems you are working on https://issues.apache.org/jira/browse/SPARK-15694
But I am confused that current ScripTransform execution process has been implemented, are you clear what we need to do?

yes - we presented our work in https://databricks.com/session_eu19/powering-custom-apps-at-facebook-using-spark-script-transformation. I plan on sending the PRs later this month.

@AngersZhuuuu
Copy link
Contributor Author

yes - we presented the work in https://databricks.com/session_eu19/powering-custom-apps-at-facebook-using-spark-script-transformation and I'll send the PRs later this month.

please ping me when you raise a pr to learn your great job.

@AngersZhuuuu
Copy link
Contributor Author

@HyukjinKwon
Can i reopen this since @alfozan is working or transform script.

@HyukjinKwon
Copy link
Member

@AngersZhuuuu, I didn't read all fully but can we make https://issues.apache.org/jira/browse/SPARK-15694 done first as @wangyum's advice?

@AngersZhuuuu
Copy link
Contributor Author

@AngersZhuuuu, I didn't read all fully but can we make https://issues.apache.org/jira/browse/SPARK-15694 done first as @wangyum's advice?

Seems @alfozan is doing that part?

@HyukjinKwon
Copy link
Member

Shall we wait for that first? I would like to see how it's implemented first, and possibly we should match the impl.

@alfozan
Copy link

alfozan commented Dec 24, 2019

@AngersZhuuuu, I didn't read all fully but can we make https://issues.apache.org/jira/browse/SPARK-15694 done first as @wangyum's advice?

Seems @alfozan is doing that part?

Yes I'm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants