Skip to content

Commit

Permalink
modify according micheal's advice
Browse files Browse the repository at this point in the history
  • Loading branch information
baishuo authored and liancheng committed Sep 17, 2014
1 parent c3ab36d commit b47c9bf
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 93 deletions.
19 changes: 11 additions & 8 deletions sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,17 @@ private[hive] class SparkHiveHadoopWriter(
null)
}

/**
* create an HiveRecordWriter. imitate the above function open()
* @param dynamicPartPath the relative path for dynamic partition
*
* since this function is used to create different writer for
* different dynamic partition.So we need a parameter dynamicPartPath
* and use it we can calculate a new path and pass the new path to
* the function HiveFileFormatUtils.getHiveRecordWriter
*/
def open(dynamicPartPath: String) {
val numfmt = SparkHiveHadoopWriter.threadLocalNumberFormat.get()
val numfmt = NumberFormat.getInstance()
numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)

Expand All @@ -108,7 +117,7 @@ private[hive] class SparkHiveHadoopWriter(
if (outputPath == null) {
throw new IOException("Undefined job output-path")
}
val workPath = new Path(outputPath, dynamicPartPath.substring(1)) // remove "/"
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) // remove "/"
val path = new Path(workPath, outputName)
getOutputCommitter().setupTask(getTaskContext())
writer = HiveFileFormatUtils.getHiveRecordWriter(
Expand Down Expand Up @@ -219,10 +228,4 @@ private[hive] object SparkHiveHadoopWriter {
}
outputPath.makeQualified(fs)
}

val threadLocalNumberFormat = new ThreadLocal[NumberFormat] {
override def initialValue() = {
NumberFormat.getInstance()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ case class InsertIntoHiveTable(
obj
}

/**
* since we should get directory of dynamic partition from upstream RDD
* reference the code "serializer.serialize(outputData, standardOI) -> dynamicPartPath"
* So The type of the elment in RDD is (Writable, String)
*/
def saveAsHiveFile(
rdd: RDD[(Writable, String)],
valueClass: Class[_],
Expand Down Expand Up @@ -142,96 +147,94 @@ case class InsertIntoHiveTable(
if (dynamicPartNum == 0) {
writer = new SparkHiveHadoopWriter(conf.value, fileSinkConf)
writer.preSetup()
sc.sparkContext.runJob(rdd, writeToFile _)
writer.commitJob()
} else {
writerMap = new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
sc.sparkContext.runJob(rdd, writeToFile _)
for ((k,v) <- writerMap) {
v.commitJob()
}
writerMap.clear()
}

def writeToFile(context: TaskContext, iter: Iterator[(Writable, String)]) {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
// writer for No Dynamic Partition
if (dynamicPartNum == 0) {
writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()
}
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt

var count = 0
// writer for Dynamic Partition
var writer2: SparkHiveHadoopWriter = null
while(iter.hasNext) {
val record = iter.next()
count += 1
if (record._2 == null) { // without Dynamic Partition
writer.write(record._1)
} else { // for Dynamic Partition
val location = fileSinkConf.getDirName
val partLocation = location + record._2 // this is why the writer can write to different file
writer2 = writerMap.get(record._2) match {
case Some(writer)=> writer
case None => {
val tempWriter = new SparkHiveHadoopWriter(conf.value,
new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false))
tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
tempWriter.open(record._2)
writerMap += (record._2 -> tempWriter)
tempWriter
}
}
writer2.write(record._1)
}
if (dynamicPartNum == 0) { // for All static partition
writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()
// writer for Dynamic Partition
while(iter.hasNext) {
val record = iter.next()
writer.write(record._1)
}
if (dynamicPartNum == 0) {
writer.close()
writer.commit()
} else {
} else { // if there is dynamic Partition
while(iter.hasNext) {
val record = iter.next()
val location = fileSinkConf.getDirName
val partLocation = location + record._2 // different writer related with different file
def createNewWriter(): SparkHiveHadoopWriter = {
val tempWriter = new SparkHiveHadoopWriter(conf.value,
new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false))
tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
tempWriter.open(record._2)
writerMap += (record._2 -> tempWriter)
tempWriter
}
val writer2 = writerMap.getOrElseUpdate(record._2, createNewWriter)
writer2.write(record._1)
}
for ((k,v) <- writerMap) {
v.close()
v.commit()
}
}
}

sc.sparkContext.runJob(rdd, writeToFile _)
if (dynamicPartNum == 0) {
writer.commitJob()
} else {
for ((k,v) <- writerMap) {
v.commitJob()
}
writerMap.clear()
}
}
/*
* e.g.
* for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ...
* return: /part1=val1/part2=val2
* for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ...
* return: /part2=val2
* for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ...
* return: /part2=val2/part3=val3
*/

/**
* Returns the Dynamic partition directory for the given row.
* @param partCols an array containing the string names of the partition columns
*
* we get the last dynamicPartNum elements from partCols and
* last dynamicPartNum elements from the current row,
* then we can construct a String for dynamic partition directory
* For example:
* for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ...
* return: /part1=val1/part2=val2
* for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ...
* return: /part2=val2
* for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ...
* return: /part2=val2/part3=val3
*/
private def getDynamicPartDir(partCols: Array[String],
row: Row,
dynamicPartNum: Int,
defaultPartName: String): String = {
assert(dynamicPartNum > 0)
// TODO needs optimization
partCols
.takeRight(dynamicPartNum)
.zip(row.takeRight(dynamicPartNum))
.map { case (c, v) => s"/$c=${handleNull(v, defaultPartName)}" }
.mkString
}

/*
* if rowVal is null or "",will return HiveConf.get(hive.exec.default.partition.name) with default
* */
private def handleNull(rowVal: Any, defaultPartName: String): String = {
if (rowVal == null ||String.valueOf(rowVal).length == 0) {
defaultPartName
} else {
String.valueOf(rowVal)
}
/**
* Returns `rowVal` as a String.
* If `rowVal` is null or equal to "", returns the default partition name.
*/
private def handleNull(rowVal: Any, defaultPartName: String): String = {
if (rowVal == null ||String.valueOf(rowVal).length == 0) {
defaultPartName
} else {
String.valueOf(rowVal)
}
}

override def execute() = result
Expand All @@ -253,36 +256,36 @@ case class InsertIntoHiveTable(
val tableLocation = table.hiveQlTable.getDataLocation
val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
var tmpDynamicPartNum = 0
var numStaPart = 0

val numDynamicPartitions = partition.values.filter(_.isEmpty).size
val numStaticPartitions = partition.values.filter(_.isDefined).size
val partitionSpec = partition.map {
case (key, Some(value)) =>
numStaPart += 1
key -> value
case (key, None) =>
tmpDynamicPartNum += 1
key -> ""
}
val dynamicPartNum = tmpDynamicPartNum

val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableWritable(jobConf)
// check if the partition spec is valid
if (dynamicPartNum > 0) {
if (numDynamicPartitions > 0) {
if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg())
}
if (numStaPart == 0 &&
if (numStaticPartitions == 0 &&
sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg())
}
// check if static partition appear after dynamic partitions
var tmpNumStaticPartitions = numStaticPartitions
for ((k,v) <- partitionSpec) {
if (partitionSpec(k) == "") {
if (numStaPart > 0) { // found a DP, but there exists ST as subpartition
if (tmpNumStaticPartitions > 0) { // found a DP, but there exists ST as subpartition
throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg())
}
} else {
numStaPart -= 1
tmpNumStaticPartitions -= 1
}
}
}
Expand All @@ -299,27 +302,24 @@ case class InsertIntoHiveTable(
val outputData = new Array[Any](fieldOIs.length)
val defaultPartName = jobConfSer.value.get(
"hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__")
var partColStr: Array[String] = null;
if (fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") != null) {
partColStr = fileSinkConf
.getTableInfo
.getProperties
.getProperty("partition_columns")
.split("/")
}

val partitionColumns = fileSinkConf.getTableInfo.
getProperties.getProperty("partition_columns") // a String like "colname1/colname2"
val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull

iter.map { row =>
var dynamicPartPath: String = null
if (dynamicPartNum > 0) {
dynamicPartPath = getDynamicPartDir(partColStr, row, dynamicPartNum, defaultPartName)
if (numDynamicPartitions > 0) {
dynamicPartPath = getDynamicPartDir(partitionColumnNames, row,
numDynamicPartitions, defaultPartName)
}
var i = 0
while (i < fieldOIs.length) {
// Casts Strings to HiveVarchars when necessary.
outputData(i) = wrap(row(i), fieldOIs(i))
i += 1
}

// pass the dynamicPartPath to downStream RDD
serializer.serialize(outputData, standardOI) -> dynamicPartPath
}
}
Expand All @@ -329,7 +329,7 @@ case class InsertIntoHiveTable(
fileSinkConf,
jobConfSer,
sc.hiveconf.getBoolean("hive.exec.compress.output", false),
dynamicPartNum)
numDynamicPartitions)

val outputPath = FileOutputFormat.getOutputPath(jobConf)
// Have to construct the format of dbname.tablename.
Expand All @@ -346,13 +346,13 @@ case class InsertIntoHiveTable(
val inheritTableSpecs = true
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
if (dynamicPartNum > 0) {
if (numDynamicPartitions > 0) {
db.loadDynamicPartitions(
outputPath,
qualifiedTableName,
partitionSpec,
overwrite,
dynamicPartNum,
numDynamicPartitions,
holdDDLTime,
isSkewedStoreAsSubdir
)
Expand Down

0 comments on commit b47c9bf

Please sign in to comment.