Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32159][SQL] Fix integration between Aggregator[Array[_], _, _] and UnresolvedMapObjects #28983

Closed
wants to merge 19 commits into from

Conversation

erikerlandson
Copy link
Contributor

@erikerlandson erikerlandson commented Jul 2, 2020

Context: The fix for SPARK-27296 introduced by #25024 allows Aggregator objects to appear in queries. This works fine for aggregators with atomic input types, e.g. Aggregator[Double, _, _].

However it can cause a null pointer exception if the input type is Array[_]. This was historically considered an ignorable case for serialization of UnresolvedMapObjects, but the new ScalaAggregator class causes these expressions to be serialized over to executors because the resolve-and-bind is being deferred.

What changes were proposed in this pull request?

A new rule ResolveEncodersInScalaAgg that performs the resolution of the expressions contained in the encoders so that properly resolved expressions are serialized over to executors.

Why are the changes needed?

Applying an aggregator of the form Aggregator[Array[_], _, _] using functions.udaf() currently causes a null pointer error in Catalyst.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

A unit test has been added that does aggregation with array types for input, buffer, and output. I have done additional testing with my own custom aggregators in the spark REPL.

@erikerlandson
Copy link
Contributor Author

The fix I coded here works if the function being used to apply to array elements is "identity". Experimentally, it seems to also work if values are being cast (e.g. array elements are float but aggregator is expecting array of double). The casting expressions I see are being added externally, like (do map objects).toDoubleArray. This fix will not work if there is some non-identity transform being applied to array elements. I'm not sure if that case actually arises or not. I do not think it does in the scenario of aggregator inputs.

@erikerlandson
Copy link
Contributor Author

We can follow ResolveEncodersInUDF: add a rule to resolve the encoders in ScalaAggregator at driver side.

@cloud-fan if we do this, and resolve these on the driver, does that avoid having to resolve these UnresolvedMapObject? on the executor side?

@cloud-fan
Copy link
Contributor

does that avoid having to resolve these UnresolvedMapObject? on the executor side?

Yes. Encoder is a container of expression. If the expression is resolved, then when we serialize and send encoders to executors, we don't need to resolve it again at executor side and can use it directly.

@erikerlandson
Copy link
Contributor Author

@cloud-fan thanks, I will try adding such a rule for ScalaAggregator

@erikerlandson
Copy link
Contributor Author

erikerlandson commented Jul 3, 2020

when trying to refer to either ScalaAggregator or Aggregator over in catalyst, I'm running into some scoping problems, which are all similar to:

[error] /home/eje/git/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala:27: object expressions is not a member of package org.apache.spark.sql

I believe these are related to some scoping firewall with catalyst, for example:

package org.apache.spark.sql

/**
 * The physical execution component of Spark SQL. Note that this is a private package.
 * All classes in catalyst are considered an internal API to Spark SQL and are subject
 * to change between minor releases.
 */
package object execution

I tried moving ScalaAggregator over to org.apache.spark.sql.catalyst.expressions, but now it can't see Aggregator, and I can't move that without breaking backward compatibility.

@maropu
Copy link
Member

maropu commented Jul 4, 2020

I think you don't need to move the classes and how about using extendedResolutionRules for that purpose?

@erikerlandson
Copy link
Contributor Author

how about using extendedResolutionRules for that purpose?

Would that be safe? My reading of the extensions API is that a user could completely reset any pre-applied extensions. I don't see any other pre-defined rules being applied this way in the spark code currently.

@maropu
Copy link
Member

maropu commented Jul 4, 2020

@erikerlandson
Copy link
Contributor Author

Have you checked the existing predfined ones?

tentatively this looks to be working in my repl testing. The unit tests appear to bypass the use of session builder and they are currently failing. I'm playing with configuring an instance of SparkSessionExtensions in the unit testing spark session

@SparkQA
Copy link

SparkQA commented Jul 5, 2020

Test build #124931 has finished for PR 28983 at commit 1a501d9.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@erikerlandson
Copy link
Contributor Author

passing existing aggregation unit tests, but I still need to write a new test for array input types

@SparkQA
Copy link

SparkQA commented Jul 5, 2020

Test build #124971 has finished for PR 28983 at commit bc2d880.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class TestHiveExtensions extends (SparkSessionExtensions => Unit)

@SparkQA
Copy link

SparkQA commented Jul 5, 2020

Test build #124975 has finished for PR 28983 at commit 399cbab.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@erikerlandson
Copy link
Contributor Author

@cloud-fan @maropu using an extension rule works. The main caveat is that if a spark session is constructed via a non-standard path that sidesteps BaseSessionStateBuilder, it won't pick this rule up, for example as with TestHive.

@maropu
Copy link
Member

maropu commented Jul 6, 2020

Probably, you also need to add a new rule in HiveSessionStateBuilder.

@cloud-fan
Copy link
Contributor

Probably, you also need to add a new rule in HiveSessionStateBuilder.

Yea, this is not good and should be refactored, but this is the case for now. The extra analyzer rules have to be repeated in HiveSessionStateBuilder.

@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125220 has finished for PR 28983 at commit d3c5d4d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125259 has started for PR 28983 at commit aca7b51.

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125269 has finished for PR 28983 at commit ee96cc0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Jul 8, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125284 has finished for PR 28983 at commit ee96cc0.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125323 has finished for PR 28983 at commit ee96cc0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125359 has started for PR 28983 at commit ee96cc0.

@SparkQA
Copy link

SparkQA commented Jul 9, 2020

Test build #125397 has finished for PR 28983 at commit 622ac1c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.0!

cloud-fan pushed a commit that referenced this pull request Jul 9, 2020
… and UnresolvedMapObjects

Context: The fix for SPARK-27296 introduced by #25024 allows `Aggregator` objects to appear in queries. This works fine for aggregators with atomic input types, e.g. `Aggregator[Double, _, _]`.

However it can cause a null pointer exception if the input type is `Array[_]`.  This was historically considered an ignorable case for serialization of `UnresolvedMapObjects`, but the new ScalaAggregator class causes these expressions to be serialized over to executors because the resolve-and-bind is being deferred.

### What changes were proposed in this pull request?
A new rule `ResolveEncodersInScalaAgg` that performs the resolution of the expressions contained in the encoders so that properly resolved expressions are serialized over to executors.

### Why are the changes needed?
Applying an aggregator of the form `Aggregator[Array[_], _, _]` using `functions.udaf()` currently causes a null pointer error in Catalyst.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
A unit test has been added that does aggregation with array types for input, buffer, and output. I have done additional testing with my own custom aggregators in the spark REPL.

Closes #28983 from erikerlandson/fix-spark-32159.

Authored-by: Erik Erlandson <eerlands@redhat.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 1cb5bfc)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan cloud-fan closed this in 1cb5bfc Jul 9, 2020
case p => p.transformExpressionsUp {
case agg: ScalaAggregator[_, _, _] =>
agg.copy(
inputEncoder = agg.inputEncoder.resolveAndBind(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A followup we can do is to resolve and bind using the actual input data types, so that we can do casting or reorder fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be nice. I tried this and but the way I did it wasn't having any effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan what I had done earlier was:

object ResolveEncodersInScalaAgg extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
    case p if !p.resolved => p
    case p => p.transformExpressionsUp {
      case agg: ScalaAggregator[_, _, _] =>
        val children = agg.children
        require(children.length > 0, "Missing aggregator input")
        val dataType: DataType = if (children.length == 1) children.head.dataType else {
          StructType(children.map(_.dataType).zipWithIndex.map { case (dt, j) =>
            StructField(s"_$j", dt, true)
          })
        }
        val attrs = if (agg.inputEncoder.isSerializedAsStructForTopLevel) {
          dataType.asInstanceOf[StructType].toAttributes
        } else {
          (new StructType().add("input", dataType)).toAttributes
        }
        agg.copy(
          inputEncoder = agg.inputEncoder.resolveAndBind(attrs),
          bufferEncoder = agg.bufferEncoder.resolveAndBind())
    }
  }
}

This also passes unit tests, but it would still fail if I tried to give it Float data, so it's not automatically casting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants