Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44700][SQL] Rule OptimizeCsvJsonExprs should not be applied to expression like from_json(regexp_replace) #42376

Closed

Conversation

monkeyboy123
Copy link
Contributor

@monkeyboy123 monkeyboy123 commented Aug 7, 2023

What changes were proposed in this pull request?

Rule OptimizeCsvJsonExprs should not be applied to expression like from_json(regexp_replace)

Why are the changes needed?

It causes performance regression.

Does this PR introduce any user-facing change?

yes,
sql like this:

select tmp.* 
 from
 (select
        device_id, ads_id, 
        from_json(regexp_replace(device_personas, '(?<=(\\\\\\{|,))"device_', '"user_device_'), ${device_schema}) as tmp
        from input )

${device_schema} includes more than 100 fields.

before this pr:
it takes 42 minutes.
image

After this pr:
it takes 6 minutes.
image

If Rule: OptimizeJsonExprs not been applied,
in physical plan : ProjectExec
function: InterpretedUnsafeProjection.createProjection or GenerateUnsafeProjection.generate will eliminate common expression,so that regexp_replace will been computed just one time.

If Rule: OptimizeJsonExprs been applied, regexp_replace will been computed as many times as numbers of ${device_schema} fields .

BTW, it hard to find root cause, in this examples, it took me 2 days to find out the root cause.

How was this patch tested?

NO, it just a rule optimization for OptimizeJsonExprs

@github-actions github-actions bot added the SQL label Aug 7, 2023
@monkeyboy123
Copy link
Contributor Author

gently ping @viirya Could you help me to reivew it? also cc @cloud-fan

@cloud-fan
Copy link
Contributor

cc @wangyum do you have any ideas? It seems any optimization that changes the expression shape may break common subexpression elimination (CSE). It's hard to come up with a good cost model to fix it. I think a better idea is to make CSE a plan-level optimization, so that we can find all common subexpressions before optimizing expressions. But it's hard to do.

@monkeyboy123 is it possible to rewrite your query and use subquery alias or CTE to hold the expression result, to avoid repeated execution? or you can disable this optimization by setting spark.sql.optimizer.excludedRules to include this rule.

@monkeyboy123
Copy link
Contributor Author

monkeyboy123 commented Aug 8, 2023

cc @wangyum do you have any ideas? It seems any optimization that changes the expression shape may break common subexpression elimination (CSE). It's hard to come up with a good cost model to fix it. I think a better idea is to make CSE a plan-level optimization, so that we can find all common subexpressions before optimizing expressions. But it's hard to do.

@monkeyboy123 is it possible to rewrite your query and use subquery alias or CTE to hold the expression result, to avoid repeated execution? or you can disable this optimization by setting spark.sql.optimizer.excludedRules to include this rule.

@cloud-fan I can disable this optimization by setting spark.sql.optimizer.enableJsonExpressionOptimization to false, but i think it is a common case that someone will encounter, maybe can we add more cases ,such as RegExpReplace or RegExpExtract etc, to deal with this case temporarily?

@wangyum
Copy link
Member

wangyum commented Aug 9, 2023

@monkeyboy123 Have you enabled spark.sql.optimizer.collapseProjectAlwaysInline?

Seq("""{"a":1, "b":0.8}""").toDF("s").write.saveAsTable("t")
val df = sql(
  """
    |SELECT j.*
    |FROM   (SELECT from_json(regexp_replace(s, 'a', 'new_a'), 'new_a INT, b DOUBLE') AS j
    |        FROM   t) tmp
    |""".stripMargin)
df.explain(true)

set spark.sql.optimizer.collapseProjectAlwaysInline=false:

== Optimized Logical Plan ==
Project [j#17.new_a AS new_a#20, j#17.b AS b#21]
+- Project [from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)) AS j#17]
   +- Relation spark_catalog.default.t[s#18] parquet

== Physical Plan ==
*(2) Project [j#17.new_a AS new_a#20, j#17.b AS b#21]
+- Project [from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)) AS j#17]
   +- *(1) ColumnarToRow
      +- FileScan parquet spark_catalog.default.t[s#18] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<s:string>

set spark.sql.optimizer.collapseProjectAlwaysInline=true:

== Optimized Logical Plan ==
Project [from_json(StructField(new_a,IntegerType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).new_a AS new_a#20, from_json(StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).b AS b#21]
+- Relation spark_catalog.default.t[s#18] parquet

== Physical Plan ==
Project [from_json(StructField(new_a,IntegerType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).new_a AS new_a#20, from_json(StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).b AS b#21]
+- *(1) ColumnarToRow
   +- FileScan parquet spark_catalog.default.t[s#18] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<s:string>

@monkeyboy123 monkeyboy123 reopened this Aug 9, 2023
@monkeyboy123
Copy link
Contributor Author

@monkeyboy123 Have you enabled spark.sql.optimizer.collapseProjectAlwaysInline?

@wangyum Actually, i encounter this problem in spark 3.1.1, but i can do a check in spark 3.4.x.

@monkeyboy123
Copy link
Contributor Author

It seems like it happens in spark 3.1.1. It has been fixed in spark 3.4.x.
@cloud-fan @wangyum

@wangyum
Copy link
Member

wangyum commented Aug 10, 2023

Thanks @monkeyboy123. Please upgrade your Spark to the latest version.

@wangyum wangyum closed this Aug 10, 2023
@wankunde
Copy link
Contributor

I think we can reuse the result of the same regexp_replace functions, for example, we can reuse the result of regexp_replace(s, 'a', 'x') :

SELECT from_json(regexp_replace(s, 'a', 'x'), 'x INT, b DOUBLE').x,
       from_json(regexp_replace(s, 'a', 'x'), 'x INT, b DOUBLE').b
FROM values('{"a":1, "b":0.8}') t(s)

Filed another PR for this purpose: #42450

cc @cloud-fan @wangyum

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants