New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31926][SQL][test-hive1.2] Fix concurrency issue for ThriftCLIService to getPortNumber #28751
Conversation
cc @cloud-fan @juliuszsompolski @maropu please take a look at this PR. |
Test build #123623 has finished for PR 28751 at commit
|
retest this please |
Test build #123627 has finished for PR 28751 at commit
|
@@ -33,6 +33,8 @@ trait SharedThriftServer extends SharedSparkSession { | |||
private var hiveServer2: HiveThriftServer2 = _ | |||
private var serverPort: Int = 0 | |||
|
|||
def mode: ServerMode.Value = ServerMode.binary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems weird to have a default value in the base test trait.
hiveServer2.getServices.asScala.foreach { | ||
case t: ThriftCLIService if t.getPortNumber != 0 => | ||
serverPort = t.getPortNumber | ||
logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we may not output this log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this fix, yes. The port binding is in another background thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this patch fix it? It seems you just added a try-catch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With https://github.com/apache/spark/pull/28751/files#diff-7610697b4f8f1bc4842c77e50807914cR178 and its implementations, the port binding is done in the same thread where we call getPortNumber
later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#28651 (comment) . there was a discussion with @juliuszsompolski before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I see!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take ThriftBinaryCLIService
for an example
Before:
we do TThreadPoolServer
initialization and serve
in the same run
function of the background thread. Then if we call getPortNumber right after startWithContext
, concurrency issue will occur. The portNum
may not reset yet when we call.
After:
we do TThreadPoolServer
initialization in the current thread and do serve
in the run
function of the background thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Nice catch.
Test build #123628 has finished for PR 28751 at commit
|
Test build #123633 has finished for PR 28751 at commit
|
Test build #123634 has finished for PR 28751 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but please revert the styling changes in java files.
workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), | ||
new ThreadFactoryWithGarbageCleanup(threadPoolName)); | ||
workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), | ||
new ThreadFactoryWithGarbageCleanup(threadPoolName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark does not have an official styling guide for Java, so I think the previous 4 space indents.
Could you revert the indent/styling changes in those files, to make tracking changes and merging between branches easier?
I find it easier to track which code is directly imported from Hive, and which was modified for Spark, if it's not modified with styling changes, so I can diff it directly with Hive files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
// Set the HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could randomly pick any free port to use. | ||
// It's much more robust than set a random port generated by ourselves ahead |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this duplicates the comment above. The two comments could be merged.
Test build #123688 has finished for PR 28751 at commit
|
retest this please |
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port " | ||
+ serverSocket.getServerSocket().getLocalPort() + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; | ||
String msg = "Starting " + getName() + " on port " + portNum + " with " + minWorkerThreads + | ||
"..." + maxWorkerThreads + " worker threads"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It this log message change needed? Does it introduce any actual changes?
I think they may be consumers waiting and parsing this line to make sure the Thriftserver is running, and parse the port etc, so if the format changes, it may break such matches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change, you are right. reverted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. LGTM except for the minor comments.
""".stripMargin.split("\n").mkString.trim | ||
} else { | ||
s"""jdbc:hive2://localhost:$serverPort""" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit format:
private lazy val jdbcUri = if (mode == ServerMode.http) {
s"jdbc:hive2://localhost:$serverPort/default;transportMode=http;httpPath=cliservice"
} else {
s"jdbc:hive2://localhost:$serverPort"
}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the existing format is correct.
hiveServer2.getServices.asScala.foreach { | ||
case t: ThriftCLIService if t.getPortNumber != 0 => | ||
serverPort = t.getPortNumber | ||
logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Nice catch.
// Wait for thrift server to be ready to serve the query, via executing simple query | ||
// till the query succeeds. See SPARK-30345 for more details. | ||
eventually(timeout(30.seconds), interval(1.seconds)) { | ||
withJdbcStatement {_.execute("SELECT 1")} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: withJdbcStatement { _.execute("SELECT 1") }
Seems not related to this pr. |
Test build #123696 has finished for PR 28751 at commit
|
Test build #123695 has finished for PR 28751 at commit
|
Test build #123697 has finished for PR 28751 at commit
|
thanks, merging to master/3.0! |
…ervice to getPortNumber ### What changes were proposed in this pull request? When` org.apache.spark.sql.hive.thriftserver.HiveThriftServer2#startWithContext` called, it starts `ThriftCLIService` in the background with a new Thread, at the same time we call `ThriftCLIService.getPortNumber,` we might not get the bound port if it's configured with 0. This PR moves the TServer/HttpServer initialization code out of that new Thread. ### Why are the changes needed? Fix concurrency issue, improve test robustness. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? add new tests Closes #28751 from yaooqinn/SPARK-31926. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 02f32cf) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Hi, All.
|
@@ -480,7 +480,8 @@ object SparkParallelTestGrouping { | |||
"org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite", | |||
"org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite", | |||
"org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite", | |||
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite", | |||
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite", | |||
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInBinarySuite", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a SBT-only workaround, doesn't it?
@@ -42,3 +42,12 @@ class ThriftServerWithSparkContextSuite extends SharedThriftServer { | |||
} | |||
} | |||
} | |||
|
|||
|
|||
class ThriftServerWithSparkContextInBinarySuite extends ThriftServerWithSparkContextSuite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new test suite never passes.
Sorry guys. This introduces a non-trivial consistent failure on both master/3.0 Maven jobs. I'll revert this inevitably. Please make another PR and pass with Maven. |
Yes, +1 to revert. |
Thanks @dongjoon-hyun @HyukjinKwon, and pardon the Inconvenience |
@@ -480,7 +480,8 @@ object SparkParallelTestGrouping { | |||
"org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite", | |||
"org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite", | |||
"org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite", | |||
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite", | |||
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite", | |||
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInBinarySuite", | |||
"org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does that mean this approach to speed up the test runner never works for maven? cc @gengliangwang @wangyum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's not related to maven I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @gengliangwang.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any way to run these test JVM-individually with maven? It seems not to be able to start 2 thrift servers with different kinds of transport modes on the shared spark session in one JVM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just run these 2 test suites one by one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The root cause I found so far is: in afterAll()
, the spark session was stopped and detached from the thread-local variable, but the hive's SessionState was not, SO it gets reused next time which causes the newly defined configs in the new test file will not take effect.
If what I‘ve found is the only issue that stops these tests from running together in a single JVM(verified locally and went well), I guess we can remove these 2 lines eventually.
Sended a new PR, #28797
…currency issue for ThriftCLIService to getPortNumber ### What changes were proposed in this pull request? This PR brings #28751 back - It once reverted by 4a25200 because of inevitable maven test failure - See related updates in this followup a0187cd - And reverted again because of the flakiness of the added unit tests - In this PR, The flakiness reason found is caused by the hive metastore connection that the SparkSQLCLIService trying to create which turns out is unnecessary at all. This metastore client points to a dummy metastore server only. - Also, add some cleanups for SharedThriftServer trait in before and after to prevent its configurations being polluted or polluting others ### Why are the changes needed? fix flaky test ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? passing sbt and maven tests Closes #28835 from yaooqinn/SPARK-31926-F. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
When
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2#startWithContext
called,it starts
ThriftCLIService
in the background with a new Thread, at the same time we callThriftCLIService.getPortNumber,
we might not get the bound port if it's configured with 0.This PR moves the TServer/HttpServer initialization code out of that new Thread.
Why are the changes needed?
Fix concurrency issue, improve test robustness.
Does this PR introduce any user-facing change?
NO
How was this patch tested?
add new tests