diff --git a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala index 3df1ff65498d..d1947ae93a40 100644 --- a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala +++ b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala @@ -35,14 +35,21 @@ class SparkConnectStatement(conn: SparkConnectConnection) extends Statement { override def close(): Unit = synchronized { if (!closed) { if (operationId != null) { - conn.spark.interruptOperation(operationId) + try { + conn.spark.interruptOperation(operationId) + } catch { + case _: java.net.ConnectException => + // Ignore ConnectExceptions during cleanup as the operation may have already completed + // or the server may be unavailable. The important part is marking this statement + // as closed to prevent further use. + } operationId = null } if (resultSet != null) { resultSet.close() resultSet = null } - closed = false + closed = true } } diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala index efac3bc7561f..6229e6e299db 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala @@ -93,6 +93,11 @@ object SparkConnectServerUtils { if (isDebug) { builder.redirectError(Redirect.INHERIT) builder.redirectOutput(Redirect.INHERIT) + } else { + // If output is not consumed, the stdout/stderr pipe buffers will fill up, + // causing the server process to block on write() calls + builder.redirectError(Redirect.DISCARD) + builder.redirectOutput(Redirect.DISCARD) } val process = builder.start()