-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47404][SQL] Add hooks to release the ANTLR DFA cache after parsing SQL #45526
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for making this contribution, @markj-db .
Could you provide a reference link to the corresponding code in the PR description?
Spark’s ANTLR SQL parser is derived from the Presto ANTLR SQL Parser, and Presto has added hooks to be able to clear this DFA cache.
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
@@ -800,4 +800,39 @@ class SparkSqlParserSuite extends AnalysisTest with SharedSparkSession { | |||
start = 0, | |||
stop = 63)) | |||
} | |||
|
|||
def getMemoryUsage: Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. private
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
Outdated
Show resolved
Hide resolved
test("release ANTLR cache after parsing") { | ||
val baselineMemory = getMemoryUsage // On my system, this is about 61 MiB | ||
parser.parsePlan(s"select ${awfulQuery(8)} from range(10)") | ||
val estimatedCacheOverhead = getMemoryUsage - baselineMemory // about 119 MiB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worrying about the possibility where this test case might introduce a new flakiness in our GitHub Action environment. Especially, Apple Silicon environment. Do you think this test case clears up the leftovers after completing this test suite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry a little about flakiness too, but the thresholding is intended to prevent that from being an issue.
Why especially Apple Silicon?
What leftovers are you referring to? The only thing I can think of is the ANTLR cache, and this test is explicitly verifying that the ANTLR cache is cleared. So I think yes, all leftovers are cleared?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments. cc @cloud-fan
@@ -4589,6 +4589,20 @@ object SQLConf { | |||
.stringConf | |||
.createWithDefault("versionAsOf") | |||
|
|||
val RELEASE_ANTLR_CACHE_AFTER_PARSE = | |||
buildConf("spark.sql.releaseAntlrCacheAfterParse") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: afterParsing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
assert(memoryUsageWithoutRelease > baselineMemory + (1 - tolerance) * estimatedCacheOverhead) | ||
assert(memoryUsageWithoutRelease < baselineMemory + (1 + tolerance) * estimatedCacheOverhead) | ||
withSQLConf("spark.sql.releaseAntlrCacheAfterParse" -> "true") { | ||
parser.parsePlan("select id from range(10)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about run this query multiple times, so the result could be more accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but even running repeatedly, I'm worried about the flakiness if the test runs with others in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would I measure the flakiness of this test? I think I've made a reasonable attempt to mitigate this risk, but I'm happy to try verifying. Under what circumstances do multiple tests run concurrently in the same JVM?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Running in SBT, my experience was the testcases in the suite run serially. I observed the new test to take about 2 seconds:
SPARK-47404: release ANTLR cache after parsing (1 second, 855 milliseconds)
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of recommending this, why don't we do this from Spark driver side by itself? For example, instead of having a boolean configuration like this, we can have an integer configuration to clear up. The default value could be Int.MaxValue.
Could you be more specific about what the integer means in your proposal? I had considered a few possibilities for making this simpler to work with:
I left this manual because there's a tradeoff here where neither behaviour is clearly superior. For example, if the driver will be handling many similar queries over its lifetime and none of them are terribly large, the performance benefit of the cache probably outweighs the reclaimed memory (e.g. imagine a cluster that's optimized for minimizing latency on small queries). The current behaviour is probably appropriate for most Spark users most of the time, so leaving this as a manual tuning option seems like the right balance of complexity, effort, and impact. |
* Drop the existing parser caches and create a new one. | ||
* | ||
* ANTLR retains caches in its parser that are never released. This speeds up parsing of future | ||
* input, but it can consume a lot of memory depending on the input seen so far. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this cache work within a query or only for future queries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's my understanding that it works for future tokens in the same query as well as for future queries.
ParserInterpreter.parse
calls ParserInterpreter.visitState
, which calls ParserInterpreter.visitDecisionState
, which calls ParserATNSimulator.adaptivePredict
. The ParserATNSimulator
was constructed with a reference to the static DFA cache and ParserATNSimulator.adaptivePredict
reads/updates that cache via ParserATNSimulator.addDFAState
which calls ATNConfigSet.optimizeConfigs
, which calls ATNSimulator.getCachedContext
, which calls PredictionContext.getCachedContext
, which calls PredictionContextCache.add
and PredictionContextCache.get
.
So, in short, any time there's a prediction during parsing, whether it's later in an input stream or in a subsequent input stream, it's using this cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I say:
The
ParserATNSimulator
was constructed with a reference to the static DFA cache
This is best seen in the ANTLR generated code. Here's a snippet:
public class SqlBaseParser extends Parser {
static { RuntimeMetaData.checkVersion("4.8", RuntimeMetaData.VERSION); }
protected static final DFA[] _decisionToDFA;
protected static final PredictionContextCache _sharedContextCache =
new PredictionContextCache();
...
public SqlBaseParser(TokenStream input) {
super(input);
_interp = new ParserATNSimulator(this,_ATN,_decisionToDFA,_sharedContextCache);
}
...
static {
_decisionToDFA = new DFA[_ATN.getNumberOfDecisions()];
for (int i = 0; i < _ATN.getNumberOfDecisions(); i++) {
_decisionToDFA[i] = new DFA(_ATN.getDecisionState(i), i);
}
}
}
Done |
} | ||
|
||
private def conf: SqlApiConf = SqlApiConf.get | ||
} | ||
|
||
object AbstractSqlParser { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm seeing two sparkr
tests failing with:
Error in `handleErrors(returnStatus, conn)`: java.lang.IncompatibleClassChangeError: class org.apache.spark.sql.execution.SparkSqlParser cannot inherit from final class org.apache.spark.sql.catalyst.parser.AbstractSqlParser
https://github.com/markj-db/spark/actions/runs/8288610971/job/22688738738
Is this because of the addition of this companion object? Can I run the sparkr
tests locally? I don't see them mentioned on https://spark.apache.org/developer-tools.html, but I'll keep looking around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to run the tests with:
SKIP_UNIDOC=true JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 dev/run-tests --parallelism 1 --modules sparkr
However, I'm seeing a different error:
── Error ('test_basic.R:25:3'): create DataFrame from list or data.frame ───────
Error in `handleErrors(returnStatus, conn)`: java.lang.NoSuchMethodError: 'void org.eclipse.jetty.servlet.ServletHolder.<init>(jakarta.servlet.Servlet)'
at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:118)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I suggested this.
I suppose you could also clear the cache every N queries
(1) every N queries
Integer
configuration is superior to (2) every 1 query
Boolean
configuration (this PR) because (1) includes (2) and allow users to control it in a various way. For me, this PR (every 1 query
) sounds like a little too extreme.
I left this manual because there's a tradeoff here where neither behaviour is clearly superior.
WDYT, @markj-db , @cloud-fan , @linhongliu-db ?
@dongjoon-hyun I can certainly implement your proposal if the consensus among reviewers is that we'd like to go that direction. The downside I can see in what you suggest is that it adds complexity (the user now has to pick
The two strategies I favour are:
I think adding an integer here increases the configuration space, but not in a way that I think is likely to benefit the end user. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, I understand the point, @markj-db .
If then, I want to see how much the performance is going to degrade with this new configuration is enabled. Could you provide some insight?
What changes were proposed in this pull request?
Add hooks to release the ANTLR DFA cache after parsing SQL
Why are the changes needed?
ANTLR builds a DFA cache while parsing to speed up parsing of similar future inputs. However, this cache is never cleared and can only grow. Extremely large SQL inputs can lead to very large DFA caches (>20GiB in one extreme case I've seen).
Spark’s ANTLR SQL parser is derived from the Presto ANTLR SQL Parser (see SPARK-13713 and #11557), and Presto has added hooks to be able to clear this DFA cache (trinodb/trino#3186). I think Spark should have similar hooks.
Does this PR introduce any user-facing change?
New
SQLConf
to control the behaviour (spark.sql.parser.releaseAntlrCacheAfterParse
)How was this patch tested?
New unit test
Was this patch authored or co-authored using generative AI tooling?
No