Skip to content

Commit

Permalink
update file after test
Browse files Browse the repository at this point in the history
  • Loading branch information
baishuo authored and liancheng committed Sep 17, 2014
1 parent a3961d9 commit 88d0110
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 146 deletions.
10 changes: 8 additions & 2 deletions sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private[hive] class SparkHiveHadoopWriter(
}

def open(dynamicPartPath: String) {
val numfmt = NumberFormat.getInstance()
val numfmt = SparkHiveHadoopWriter.threadLocalNumberFormat.get()
numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)

Expand All @@ -108,7 +108,7 @@ private[hive] class SparkHiveHadoopWriter(
if (outputPath == null) {
throw new IOException("Undefined job output-path")
}
val workPath = new Path(outputPath, dynamicPartPath.substring(1))//remove "/"
val workPath = new Path(outputPath, dynamicPartPath.substring(1)) // remove "/"
val path = new Path(workPath, outputName)
getOutputCommitter().setupTask(getTaskContext())
writer = HiveFileFormatUtils.getHiveRecordWriter(
Expand Down Expand Up @@ -219,4 +219,10 @@ private[hive] object SparkHiveHadoopWriter {
}
outputPath.makeQualified(fs)
}

val threadLocalNumberFormat = new ThreadLocal[NumberFormat] {
override def initialValue() = {
NumberFormat.getInstance()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.ErrorMsg
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.parse.SemanticException
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector._
Expand Down Expand Up @@ -104,91 +103,132 @@ case class InsertIntoHiveTable(
}

def saveAsHiveFile(
rdd: RDD[Writable],
rdd: RDD[(Writable, String)],
valueClass: Class[_],
fileSinkConf: FileSinkDesc,
conf: JobConf,
isCompressed: Boolean) {
conf: SerializableWritable[JobConf],
isCompressed: Boolean,
dynamicPartNum: Int) {
if (valueClass == null) {
throw new SparkException("Output value class not set")
}
conf.setOutputValueClass(valueClass)
conf.value.setOutputValueClass(valueClass)
if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
throw new SparkException("Output format class not set")
}
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
conf.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
// to store compression information.
conf.set("mapred.output.compress", "true")
conf.value.set("mapred.output.compress", "true")
fileSinkConf.setCompressed(true)
fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec"))
fileSinkConf.setCompressType(conf.get("mapred.output.compression.type"))
fileSinkConf.setCompressCodec(conf.value.get("mapred.output.compression.codec"))
fileSinkConf.setCompressType(conf.value.get("mapred.output.compression.type"))
}
conf.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(
conf,
SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf))
conf.value.setOutputCommitter(classOf[FileOutputCommitter])

FileOutputFormat.setOutputPath(
conf.value,
SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf.value))
log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
var writer: SparkHiveHadoopWriter = null
//Map restore writesr for Dynamic Partition
var writerMap: scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] = null
if (dynamicPartNum == 0) {
writer = new SparkHiveHadoopWriter(conf.value, fileSinkConf)
writer.preSetup()
} else {
writerMap = new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
}

val writer = new SparkHiveHadoopWriter(conf, fileSinkConf)
writer.preSetup()

def writeToFile(context: TaskContext, iter: Iterator[Writable]) {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt

def writeToFile(context: TaskContext, iter: Iterator[(Writable, String)]) {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
// writer for No Dynamic Partition
if (dynamicPartNum == 0) {
writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()
} else {

var count = 0
while(iter.hasNext) {
val record = iter.next()
count += 1
writer.write(record)
}

writer.close()
writer.commit()
}

sc.sparkContext.runJob(rdd, writeToFile _)
writer.commitJob()
}

def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int, jobConf: JobConf) :String = {
dynamicPartNum2 match {
case 0 =>""
case i => {
val colsNum = tableInfo.getProperties.getProperty("columns").split("\\,").length
val partColStr = tableInfo.getProperties.getProperty("partition_columns")
val partCols = partColStr.split("/")
var buf = new StringBuffer()
if (partCols.length == dynamicPartNum2) {
for (j <- 0 until partCols.length) {
buf.append("/").append(partCols(j)).append("=").append(handleNull(row(colsNum + j ), jobConf))
}
} else {
for (j <- 0 until dynamicPartNum2) {
buf.append("/").append(partCols(j + partCols.length - dynamicPartNum2)).append("=").append(handleNull(row(colsNum + j), jobConf))
var count = 0
// writer for Dynamic Partition
var writer2: SparkHiveHadoopWriter = null
while(iter.hasNext) {
val record = iter.next()
count += 1
if (record._2 == null) { // without Dynamic Partition
writer.write(record._1)
} else { // for Dynamic Partition
val location = fileSinkConf.getDirName
val partLocation = location + record._2 // this is why the writer can write to different file
writer2 = writerMap.get(record._2) match {
case Some(writer)=> writer
case None => {
val tempWriter = new SparkHiveHadoopWriter(conf.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false))
tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
tempWriter.open(record._2)
writerMap += (record._2 -> tempWriter)
tempWriter
}
}
writer2.write(record._1)
}
}
buf.toString
if (dynamicPartNum == 0) {
writer.close()
writer.commit()
} else {
for ((k,v) <- writerMap) {
v.close()
v.commit()
}
}
}

sc.sparkContext.runJob(rdd, writeToFile _)
if (dynamicPartNum == 0) {
writer.commitJob()
} else {
for ((k,v) <- writerMap) {
v.commitJob()
}
writerMap.clear()
}
}

def handleNull(obj :Any, jobConf: JobConf) :String = {
if (obj == null ||obj.toString.length == 0) {
jobConf.get("hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__")


}
/*
* e.g.
* for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ...
* return: /part1=val1/part2=val2
* for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ...
* return: /part2=val2
* for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ...
* return: /part2=val2/part3=val3
* */
private def getDynamicPartDir(partCols: Array[String], row: Row, dynamicPartNum: Int, defaultPartName: String): String = {
assert(dynamicPartNum > 0)
partCols
.takeRight(dynamicPartNum)
.zip(row.takeRight(dynamicPartNum))
.map { case (c, v) => s"/$c=${handleNull(v, defaultPartName)}" }
.mkString
}
/*
* if rowVal is null or "",will return HiveConf.get(hive.exec.default.partition.name) with default
* */
private def handleNull(rowVal: Any, defaultPartName: String): String = {
if (rowVal == null ||String.valueOf(rowVal).length == 0) {
defaultPartName
} else {
obj.toString
String.valueOf(rowVal)
}
}

Expand All @@ -211,32 +251,32 @@ case class InsertIntoHiveTable(
val tableLocation = table.hiveQlTable.getDataLocation
val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
var dynamicPartNum = 0
var tmpDynamicPartNum = 0
var numStaPart = 0
var dynamicPartPath = "";
val partitionSpec = partition.map {
case (key, Some(value)) => { numStaPart += 1; key -> value }
case (key, None) => { dynamicPartNum += 1; key -> "" }
case (key, Some(value)) =>
numStaPart += 1
key -> value
case (key, None) =>
tmpDynamicPartNum += 1
key -> ""
}
// ORC stores compression information in table properties. While, there are other formats
// (e.g. RCFile) that rely on hadoop configurations to store compression information.
val dynamicPartNum = tmpDynamicPartNum
val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableWritable(jobConf)
// check if the partition spec is valid
if (dynamicPartNum > 0) {
if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
throw new SemanticException(
ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg())
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg())
}
if (numStaPart == 0 && sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg());
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg())
}
// check if static partition appear after dynamic partitions
for ((k,v) <- partitionSpec) {
if (partitionSpec(k) == "") {
if (numStaPart > 0) { // found a DP, but there exists ST as subpartition
throw new SemanticException(
ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg());
throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg())
}
} else {
numStaPart -= 1
Expand All @@ -252,96 +292,40 @@ case class InsertIntoHiveTable(
ObjectInspectorCopyOption.JAVA)
.asInstanceOf[StructObjectInspector]


val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
val outputData = new Array[Any](fieldOIs.length)
val defaultPartName = jobConfSer.value.get("hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__")
var partColStr: Array[String] = null;
if (fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") != null) {
partColStr = fileSinkConf
.getTableInfo
.getProperties
.getProperty("partition_columns")
.split("/")
}

iter.map { row =>
var dynamicPartPath: String = null
if (dynamicPartNum > 0) {
dynamicPartPath = getDynamicPartDir(partColStr, row, dynamicPartNum, defaultPartName)
}
var i = 0
while (i < fieldOIs.length) {
if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum, jobConfSer.value)
}
// Casts Strings to HiveVarchars when necessary.
outputData(i) = wrap(row(i), fieldOIs(i))
i += 1
}

serializer.serialize(outputData, standardOI)
serializer.serialize(outputData, standardOI) -> dynamicPartPath
}
}

if (dynamicPartNum > 0) {
if (outputClass == null) {
throw new SparkException("Output value class not set")
}
jobConfSer.value.setOutputValueClass(outputClass)
if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
throw new SparkException("Output format class not set")
}
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
// to store compression information.
jobConfSer.value.set("mapred.output.compress", "true")
fileSinkConf.setCompressed(true)
fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec"))
fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type"))
}
jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter])

FileOutputFormat.setOutputPath(
jobConfSer.value,
SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value))

var writerMap = new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter]
def writeToFile2(context: TaskContext, iter: Iterator[Writable]) {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
val serializer = newSerializer(fileSinkConf.getTableInfo)
var count = 0
var writer2:SparkHiveHadoopWriter = null
while(iter.hasNext) {
val record = iter.next();
val location = fileSinkConf.getDirName
val partLocation = location + dynamicPartPath
writer2=writerMap.get(dynamicPartPath) match {
case Some(writer)=> writer
case None => {
val tempWriter = new SparkHiveHadoopWriter(jobConfSer.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false))
tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
tempWriter.open(dynamicPartPath);
writerMap += (dynamicPartPath -> tempWriter)
tempWriter
}
}
count += 1
writer2.write(record)
}
for((k,v) <- writerMap) {
v.close()
v.commit()
}
}

sc.sparkContext.runJob(rdd, writeToFile2 _)

for((k,v) <- writerMap) {
v.commitJob()
}
writerMap.clear()
} else {
saveAsHiveFile(
rdd,
outputClass,
fileSinkConf,
jobConf,
sc.hiveconf.getBoolean("hive.exec.compress.output", false))
}
jobConfSer,
sc.hiveconf.getBoolean("hive.exec.compress.output", false),
dynamicPartNum)

val outputPath = FileOutputFormat.getOutputPath(jobConf)
// Have to construct the format of dbname.tablename.
Expand All @@ -358,13 +342,13 @@ case class InsertIntoHiveTable(
val inheritTableSpecs = true
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
if (dynamicPartNum > 0) {
if (dynamicPartNum>0) {
db.loadDynamicPartitions(
outputPath,
qualifiedTableName,
partitionSpec,
overwrite,
dynamicPartNum/*dpCtx.getNumDPCols()*/,
dynamicPartNum,
holdDDLTime,
isSkewedStoreAsSubdir
)
Expand Down
Empty file.
Empty file.
Loading

0 comments on commit 88d0110

Please sign in to comment.