From 571d3560765e1821d95317431799717e22bca1b3 Mon Sep 17 00:00:00 2001 From: vinodkc Date: Wed, 26 Nov 2025 09:10:09 -0800 Subject: [PATCH 1/2] Fix flaky tests --- .../connect/client/jdbc/SparkConnectStatement.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala index 3df1ff65498d..4c6ae4adef44 100644 --- a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala +++ b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala @@ -35,14 +35,21 @@ class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { override def close(): Unit = synchronized { if (!closed) { if (operationId != null) { - conn.spark.interruptOperation(operationId) + try { + conn.spark.interruptOperation(operationId) + } catch { + case _: Exception => + // Ignore exceptions during cleanup as the operation may have already completed + // or the server may be unavailable. The important part is marking this statement + // as closed to prevent further use. + } operationId = null } if (resultSet != null) { resultSet.close() resultSet = null } - closed = false + closed = true } } From ce44c814c07b660a60d20eee11bdcde25011e6cb Mon Sep 17 00:00:00 2001 From: vinodkc Date: Thu, 27 Nov 2025 08:19:39 -0800 Subject: [PATCH 2/2] Prevent buffer accumulation and ensures the server never blocks on logging --- .../sql/connect/client/jdbc/SparkConnectStatement.scala | 4 ++-- .../apache/spark/sql/connect/test/RemoteSparkSession.scala | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala index 4c6ae4adef44..d1947ae93a40 100644 --- a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala +++ b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala @@ -38,8 +38,8 @@ class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { try { conn.spark.interruptOperation(operationId) } catch { - case _: Exception => - // Ignore exceptions during cleanup as the operation may have already completed + case _: java.net.ConnectException => + // Ignore ConnectExceptions during cleanup as the operation may have already completed // or the server may be unavailable. The important part is marking this statement // as closed to prevent further use. } diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala index efac3bc7561f..6229e6e299db 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala @@ -93,6 +93,11 @@ object SparkConnectServerUtils { if (isDebug) { builder.redirectError(Redirect.INHERIT) builder.redirectOutput(Redirect.INHERIT) + } else { + // If output is not consumed, the stdout/stderr pipe buffers will fill up, + // causing the server process to block on write() calls + builder.redirectError(Redirect.DISCARD) + builder.redirectOutput(Redirect.DISCARD) } val process = builder.start()