Skip to content

Commit

Permalink
ut
Browse files Browse the repository at this point in the history
  • Loading branch information
LantaoJin committed Sep 23, 2020
1 parent 1583f9a commit fdab8b9
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 16 deletions.
Expand Up @@ -184,16 +184,10 @@ object FileCommitProtocol extends Logging {
}
// If that still doesn't exist, try the one with (jobId: string, outputPath: String).
require(!dynamicPartitionOverwrite,
"Dynamic Partition Overwrite is enabled but" +
s" the committer ${className} does not have the appropriate constructor")
try {
logDebug("Falling back to (String, String) constructor")
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
ctor.newInstance(jobId, outputPath)
} catch {
case _: NoSuchMethodException =>
// no suitable ctor, throw exception
throw new RuntimeException("No constructor found for FileCommitProtocol!")
}
"Dynamic Partition Overwrite is enabled but" +
s" the committer ${className} does not have the appropriate constructor")
logDebug("Falling back to (String, String) constructor")
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
ctor.newInstance(jobId, outputPath)
}
}
Expand Up @@ -58,8 +58,8 @@ class SQLHadoopMapReduceCommitProtocol(
}

// They are only used in driver
private var totalPartitions: Set[String] = Set.empty
private var totalCreatedFiles: Long = 0L
@volatile private var totalPartitions: Set[String] = Set.empty
@volatile private var totalCreatedFiles: Long = 0L

override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
var committer = super.setupCommitter(context)
Expand Down Expand Up @@ -101,8 +101,7 @@ class SQLHadoopMapReduceCommitProtocol(
override def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {
logDebug(s"onTaskCommit($taskCommit)")
if (hasValidPath) {
val (addedAbsPathFiles, allPartitionPaths) =
taskCommit.obj.asInstanceOf[(Map[String, String], Set[String])]
val (_, allPartitionPaths) = taskCommit.obj.asInstanceOf[(Map[String, String], Set[String])]
val partitionsPerTask = allPartitionPaths.size
if (partitionsPerTask > maxDynamicPartitionsPerTask) {
throw new SparkException(s"Task tried to create $partitionsPerTask dynamic partitions," +
Expand Down
Expand Up @@ -871,7 +871,7 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
}

// total files restriction
withSQLConf(SQLConf.DYNAMIC_PARTITION_MAX_CREATED_FILES.key -> "3") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_MAX_CREATED_FILES.key -> "2") {
sql(
"""
|create table hive_dynamic_partition_bucket(i int) stored as parquet
Expand Down

0 comments on commit fdab8b9

Please sign in to comment.