diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 9552e2c81bb14..f5382d040f28e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ +import scala.util.control.NonFatal + import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkEnv} @@ -233,7 +235,15 @@ private[sql] class RocksDBStateStoreProvider } override def doMaintenance(): Unit = { - rocksDB.doMaintenance() + try { + rocksDB.doMaintenance() + } catch { + // SPARK-46547 - Swallow non-fatal exception in maintenance task to avoid deadlock between + // maintenance thread and streaming aggregation operator + case NonFatal(ex) => + logWarning(s"Ignoring error while performing maintenance operations with exception=", + ex) + } } override def close(): Unit = {