diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 92811c67cdb1d..8b6d2cf96b4bc 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -184,16 +184,10 @@ object FileCommitProtocol extends Logging { } // If that still doesn't exist, try the one with (jobId: string, outputPath: String). require(!dynamicPartitionOverwrite, - "Dynamic Partition Overwrite is enabled but" + - s" the committer ${className} does not have the appropriate constructor") - try { - logDebug("Falling back to (String, String) constructor") - val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String]) - ctor.newInstance(jobId, outputPath) - } catch { - case _: NoSuchMethodException => - // no suitable ctor, throw exception - throw new RuntimeException("No constructor found for FileCommitProtocol!") - } + "Dynamic Partition Overwrite is enabled but" + + s" the committer ${className} does not have the appropriate constructor") + logDebug("Falling back to (String, String) constructor") + val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String]) + ctor.newInstance(jobId, outputPath) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index f08fca276867d..e7af53f80e84d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -58,8 +58,8 @@ class SQLHadoopMapReduceCommitProtocol( } // They are only used in driver - private var totalPartitions: Set[String] = Set.empty - private var totalCreatedFiles: Long = 0L + @volatile private var totalPartitions: Set[String] = Set.empty + @volatile private var totalCreatedFiles: Long = 0L override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { var committer = super.setupCommitter(context) @@ -101,8 +101,7 @@ class SQLHadoopMapReduceCommitProtocol( override def onTaskCommit(taskCommit: TaskCommitMessage): Unit = { logDebug(s"onTaskCommit($taskCommit)") if (hasValidPath) { - val (addedAbsPathFiles, allPartitionPaths) = - taskCommit.obj.asInstanceOf[(Map[String, String], Set[String])] + val (_, allPartitionPaths) = taskCommit.obj.asInstanceOf[(Map[String, String], Set[String])] val partitionsPerTask = allPartitionPaths.size if (partitionsPerTask > maxDynamicPartitionsPerTask) { throw new SparkException(s"Task tried to create $partitionsPerTask dynamic partitions," + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index dd12247613f31..388153b77a59d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -871,7 +871,7 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter } // total files restriction - withSQLConf(SQLConf.DYNAMIC_PARTITION_MAX_CREATED_FILES.key -> "3") { + withSQLConf(SQLConf.DYNAMIC_PARTITION_MAX_CREATED_FILES.key -> "2") { sql( """ |create table hive_dynamic_partition_bucket(i int) stored as parquet