Skip to content

Commit

Permalink
Update SparkHadoopWriter.scala
Browse files Browse the repository at this point in the history
  • Loading branch information
baishuo authored and liancheng committed Oct 1, 2014
1 parent 6bb5880 commit 1867e23
Showing 1 changed file with 27 additions and 0 deletions.
27 changes: 27 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,33 @@ private[hive] class SparkHiveHadoopWriter(
null)
}

def open(dynamicPartPath: String) {
val numfmt = NumberFormat.getInstance()
numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)

val extension = Utilities.getFileExtension(
conf.value,
fileSinkConf.getCompressed,
getOutputFormat())

val outputName = "part-" + numfmt.format(splitID) + extension
val outputPath: Path = FileOutputFormat.getOutputPath(conf.value)
if (outputPath == null) {
throw new IOException("Undefined job output-path")
}
val workPath = new Path(outputPath, dynamicPartPath.substring(1))//remove "/"
val path = new Path(workPath, outputName)
getOutputCommitter().setupTask(getTaskContext())
writer = HiveFileFormatUtils.getHiveRecordWriter(
conf.value,
fileSinkConf.getTableInfo,
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
fileSinkConf,
path,
null)
}

def write(value: Writable) {
if (writer != null) {
writer.write(value)
Expand Down

0 comments on commit 1867e23

Please sign in to comment.