Skip to content

Commit

Permalink
[SPARK-45357][CONNECT][TESTS][3.5] Normalize dataframeId when compa…
Browse files Browse the repository at this point in the history
…ring `CollectMetrics` in `SparkConnectProtoSuite`

### What changes were proposed in this pull request?
This PR add a new function `normalizeDataframeId` to sets the `dataframeId` to the constant 0 of `CollectMetrics`  before comparing `LogicalPlan` in the test case of `SparkConnectProtoSuite`.

### Why are the changes needed?
The test scenario in `SparkConnectProtoSuite` does not need to compare the `dataframeId` in `CollectMetrics`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Manually check

run

```
build/mvn clean install -pl connector/connect/server -am -DskipTests
build/mvn test -pl connector/connect/server
```

**Before**

```
- Test observe *** FAILED ***
  == FAIL: Plans do not match ===
  !CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 0   CollectMetrics my_metric, [min(id#0) AS min_val#0, max(id#0) AS max_val#0, sum(id#0) AS sum(id)#0L], 53
   +- LocalRelation <empty>, [id#0, name#0]                                                                 +- LocalRelation <empty>, [id#0, name#0] (PlanTest.scala:179)
```

**After**

```
Run completed in 41 seconds, 631 milliseconds.
Total number of tests run: 882
Suites: completed 24, aborted 0
Tests: succeeded 882, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45141 from LuciferYang/SPARK-45357-35.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
LuciferYang authored and dongjoon-hyun committed Feb 16, 2024
1 parent 1c1c5fa commit c61d89a
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Observation,
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{CollectMetrics, Distinct, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.connect.common.InvalidPlanInput
import org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
Expand Down Expand Up @@ -1067,7 +1067,10 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {

// Compares proto plan with LogicalPlan.
private def comparePlans(connectPlan: proto.Relation, sparkPlan: LogicalPlan): Unit = {
def normalizeDataframeId(plan: LogicalPlan): LogicalPlan = plan transform {
case cm: CollectMetrics => cm.copy(dataframeId = 0)
}
val connectAnalyzed = analyzePlan(transform(connectPlan))
comparePlans(connectAnalyzed, sparkPlan, false)
comparePlans(normalizeDataframeId(connectAnalyzed), normalizeDataframeId(sparkPlan), false)
}
}

0 comments on commit c61d89a

Please sign in to comment.