Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3007][SQL]Add "Dynamic Partition" support to Spark Sql hive #1919

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,23 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
*/
override def whiteList = Seq(
"add_part_exist",
"dynamic_partition_skip_default",
"infer_bucket_sort_dyn_part",
"load_dyn_part1",
"load_dyn_part2",
"load_dyn_part3",
"load_dyn_part4",
"load_dyn_part5",
"load_dyn_part6",
"load_dyn_part7",
"load_dyn_part8",
"load_dyn_part9",
"load_dyn_part10",
"load_dyn_part11",
"load_dyn_part12",
"load_dyn_part13",
"load_dyn_part14",
"load_dyn_part14_win",
"add_part_multiple",
"add_partition_no_whitelist",
"add_partition_with_whitelist",
Expand Down
33 changes: 33 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,33 @@ private[hive] class SparkHiveHadoopWriter(
null)
}

def open(dynamicPartPath: String) {
val numfmt = SparkHiveHadoopWriter.threadLocalNumberFormat.get()
numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)

val extension = Utilities.getFileExtension(
conf.value,
fileSinkConf.getCompressed,
getOutputFormat())

val outputName = "part-" + numfmt.format(splitID) + extension
val outputPath: Path = FileOutputFormat.getOutputPath(conf.value)
if (outputPath == null) {
throw new IOException("Undefined job output-path")
}
val workPath = new Path(outputPath, dynamicPartPath.substring(1)) // remove "/"
val path = new Path(workPath, outputName)
getOutputCommitter().setupTask(getTaskContext())
writer = HiveFileFormatUtils.getHiveRecordWriter(
conf.value,
fileSinkConf.getTableInfo,
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
fileSinkConf,
path,
Reporter.NULL)
}

def write(value: Writable) {
if (writer != null) {
writer.write(value)
Expand Down Expand Up @@ -192,4 +219,10 @@ private[hive] object SparkHiveHadoopWriter {
}
outputPath.makeQualified(fs)
}

val threadLocalNumberFormat = new ThreadLocal[NumberFormat] {
override def initialValue() = {
NumberFormat.getInstance()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -822,11 +822,6 @@ private[hive] object HiveQl {
cleanIdentifier(key.toLowerCase) -> None
}.toMap).getOrElse(Map.empty)

if (partitionKeys.values.exists(p => p.isEmpty)) {
throw new NotImplementedError(s"Do not support INSERT INTO/OVERWRITE with" +
s"dynamic partitioning.")
}

InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite)

case a: ASTNode =>
Expand Down
Loading