Skip to content

Conversation

@zhangyt26
Copy link

This will create a stable hash for the plan if there are more than one projection to break down

What changes were proposed in this pull request?

Use LinkedHashSet so projections order is always stable

Why are the changes needed?

The same spark plan should compare equal even after analyzer phase.

Does this PR introduce any user-facing change?

No

How was this patch tested?

import spark.implicits._
import org.apache.spark.sql.{Row, functions => SparkFuncs}
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
import scala.collection.immutable.ListMap

val df1 = Seq((1, 2, 3, 4), (5, 6, 7, 8), (9, 10, 11, 12))
  .toDF("a", "b", "c", "d")
  .select(SparkFuncs.struct("a", "b", "c", "d").alias("df1_struct"))

val df2 =
  df1.groupBy().agg(SparkFuncs.first("df1_struct").as("df1_struct_first")).withColumn("e", SparkFuncs.lit("100"))

val df3 = spark
  .createDataFrame(List(Row(0)).asJava, StructType(Seq(StructField("EMPTY", IntegerType))))
  .withColumn("num1", SparkFuncs.lit(1000))
  .withColumn("num2", SparkFuncs.lit(2000))
  .withColumn("num3", SparkFuncs.lit(3000))
  .withColumn("num4", SparkFuncs.lit(4000))
  .select(SparkFuncs.struct("num1", "num2", "num3", "num4").alias("df3_struct"))
  .cache()

val df4 = df2
  .join(
      df2
        .join(df3, Seq(), "inner")
        .groupBy()
        .agg(SparkFuncs.first("df3_struct").as("df3_struct_first")),
      Seq(),
      "left")
  .select(
      SparkFuncs.col("*"),
      SparkFuncs
        .posexplode(SparkFuncs.array(SparkFuncs.lit("x"), SparkFuncs.lit("y"), SparkFuncs.lit("z")))
        .as(Seq("pos", "val"))
  )

// run this multiple times and it is unstable on master, but stable after the patch
df4
  .withColumns(
      ListMap(
          "col1" -> SparkFuncs.lit(100),
          "col2" -> SparkFuncs
            .when(
                SparkFuncs.col("val").eqNullSafe(SparkFuncs.lit("x")),
                SparkFuncs.col("df1_struct_first").getItem("a"))
            .otherwise(
                SparkFuncs
                  .when(
                      SparkFuncs.col("val").eqNullSafe(SparkFuncs.lit("y")),
                      SparkFuncs.col("df1_struct_first").getItem("b"))
                  .otherwise(SparkFuncs
                    .when(
                        SparkFuncs.col("val").eqNullSafe(SparkFuncs.lit("z")),
                        SparkFuncs.col("df1_struct_first").getItem("c")))),
          "col3" -> SparkFuncs.col("col2").plus(SparkFuncs.lit(1000)),
          "col4" -> SparkFuncs.col("df3_struct_first").getItem("num1").multiply(SparkFuncs.lit(10)),
          "col5" -> SparkFuncs.col("df3_struct_first").getItem("num2").multiply(SparkFuncs.lit(10)),
          "col6" -> SparkFuncs.col("df3_struct_first").getItem("num3").multiply(SparkFuncs.lit(10)),
          "col7" -> SparkFuncs.col("df3_struct_first").getItem("num4").multiply(SparkFuncs.lit(10)),
          "col8" -> SparkFuncs.col("col4").minus(SparkFuncs.col("col5")),
          "col9" -> SparkFuncs.col("col8").plus(SparkFuncs.lit(1000))
      ))
  .semanticHash()

Was this patch authored or co-authored using generative AI tooling?

no

This will create a stable hash for the plan if there are more than one
projection to break down
@github-actions github-actions bot added the SQL label Aug 1, 2024
@zhangyt26
Copy link
Author

even easier repro: spark.sql("SELECT 100 as col1, 200 as col2, col1 * col2").semanticHash

@allisonwang-db
Copy link
Contributor

cc @anchovYu

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Nov 16, 2024
@github-actions github-actions bot closed this Nov 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants