Skip to content

Commit

Permalink
Addresses @yhuai's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Sep 18, 2014
1 parent 096bbbc commit b20a3dc
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ case class InsertIntoHiveTable(
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
writerContainer.executorSideSetup(context.stageId, context.partitionId, attemptNumber)
writerContainer.open()

iterator.foreach { row =>
var i = 0
while (i < fieldOIs.length) {
// TODO (lian) avoid per row dynamic dispatching and pattern matching cost in `wrap`
outputData(i) = wrap(row(i), fieldOIs(i))
i += 1
}
Expand All @@ -164,7 +164,6 @@ case class InsertIntoHiveTable(
}

writerContainer.close()
writerContainer.commit()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,7 @@ private[hive] class SparkHiveWriterContainer(
setIDs(jobId, splitId, attemptId)
setConfParams()
committer.setupTask(taskContext)
}

/**
* Create a `HiveRecordWriter`. A relative dynamic partition path can be used to create a writer
* for writing data to a dynamic partition.
*/
def open() {
writer = HiveFileFormatUtils.getHiveRecordWriter(
conf.value,
fileSinkConf.getTableInfo,
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
fileSinkConf,
FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
Reporter.NULL)
initWriters()
}

protected def getOutputName: String = {
Expand All @@ -100,9 +87,26 @@ private[hive] class SparkHiveWriterContainer(
def close() {
// Seems the boolean value passed into close does not matter.
writer.close(false)
commit()
}

def commitJob() {
committer.commitJob(jobContext)
}

def commit() {
protected def initWriters() {
// NOTE this method is executed at the executor side.
// For Hive tables without partitions or with only static partitions, only 1 writer is needed.
writer = HiveFileFormatUtils.getHiveRecordWriter(
conf.value,
fileSinkConf.getTableInfo,
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
fileSinkConf,
FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
Reporter.NULL)
}

protected def commit() {
if (committer.needsTaskCommit(taskContext)) {
try {
committer.commitTask(taskContext)
Expand All @@ -118,10 +122,6 @@ private[hive] class SparkHiveWriterContainer(
}
}

def commitJob() {
committer.commitJob(jobContext)
}

// ********* Private Functions *********

private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
Expand Down Expand Up @@ -168,12 +168,15 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(

@transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _

override def open(): Unit = {
override protected def initWriters(): Unit = {
// NOTE: This method is executed at the executor side.
// Actual writers are created for each dynamic partition on the fly.
writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter]
}

override def close(): Unit = {
writers.values.foreach(_.close(false))
commit()
}

override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
Expand All @@ -185,20 +188,21 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
}
.mkString

val path = {
val outputPath = FileOutputFormat.getOutputPath(conf.value)
assert(outputPath != null, "Undefined job output-path")
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
new Path(workPath, getOutputName)
}

def newWriter = {
val newFileSinkDesc = new FileSinkDesc(
fileSinkConf.getDirName + dynamicPartPath,
fileSinkConf.getTableInfo,
fileSinkConf.getCompressed)
newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)

val path = {
val outputPath = FileOutputFormat.getOutputPath(conf.value)
assert(outputPath != null, "Undefined job output-path")
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
new Path(workPath, getOutputName)
}

HiveFileFormatUtils.getHiveRecordWriter(
conf.value,
fileSinkConf.getTableInfo,
Expand Down

0 comments on commit b20a3dc

Please sign in to comment.