Skip to content

Commit

Permalink
Update InsertIntoHiveTable.scala
Browse files Browse the repository at this point in the history
  • Loading branch information
baishuo authored and liancheng committed Sep 17, 2014
1 parent 0a50db9 commit 60f70aa
Showing 1 changed file with 44 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import java.util.{HashMap => JHashMap}
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.metastore.MetaStoreUtils
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.ErrorMsg
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.parse.SemanticException
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector._
Expand All @@ -40,6 +42,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter}
import org.apache.hadoop.hive.conf.HiveConf

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -159,7 +162,7 @@ case class InsertIntoHiveTable(
writer.commitJob()
}

def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int) :String = {
def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int, jobConf: JobConf) :String = {
dynamicPartNum2 match {
case 0 =>""
case i => {
Expand All @@ -169,18 +172,26 @@ case class InsertIntoHiveTable(
var buf = new StringBuffer()
if (partCols.length == dynamicPartNum2) {
for (j <- 0 until partCols.length) {
buf.append("/").append(partCols(j)).append("=").append(row(j + row.length - colsNum))
buf.append("/").append(partCols(j)).append("=").append(handleNull(row(colsNum + j ), jobConf))
}
} else {
for (j <- 0 until dynamicPartNum2) {
buf.append("/").append(partCols(j + partCols.length - dynamicPartNum2)).append("=").append(row(j + colsNum))
buf.append("/").append(partCols(j + partCols.length - dynamicPartNum2)).append("=").append(handleNull(row(colsNum + j), jobConf))
}
}
buf.toString
}
}
}

def handleNull(obj :Any, jobConf: JobConf) :String = {
if (obj == null ||obj.toString.length == 0) {
jobConf.get("hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__")
} else {
obj.toString
}
}

override def execute() = result

/**
Expand All @@ -201,11 +212,38 @@ case class InsertIntoHiveTable(
val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
var dynamicPartNum = 0
var numStaPart = 0
var dynamicPartPath = "";
val partitionSpec = partition.map {
case (key, Some(value)) => key -> value
case (key, None) => { dynamicPartNum += 1; key -> "" }// Should not reach here right now.
case (key, Some(value)) => { numStaPart += 1; key -> value }
case (key, None) => { dynamicPartNum += 1; key -> "" }
}
// ORC stores compression information in table properties. While, there are other formats
// (e.g. RCFile) that rely on hadoop configurations to store compression information.
val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableWritable(jobConf)
// check if the partition spec is valid
if (dynamicPartNum > 0) {
if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
throw new SemanticException(
ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg())
}
if (numStaPart == 0 && sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg());
}
// check if static partition appear after dynamic partitions
for ((k,v) <- partitionSpec) {
if (partitionSpec(k) == "") {
if (numStaPart > 0) { // found a DP, but there exists ST as subpartition
throw new SemanticException(
ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg());
}
} else {
numStaPart -= 1
}
}
}

val rdd = childRdd.mapPartitions { iter =>
val serializer = newSerializer(fileSinkConf.getTableInfo)
val standardOI = ObjectInspectorUtils
Expand All @@ -221,7 +259,7 @@ case class InsertIntoHiveTable(
var i = 0
while (i < fieldOIs.length) {
if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum)
dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum, jobConfSer.value)
}
// Casts Strings to HiveVarchars when necessary.
outputData(i) = wrap(row(i), fieldOIs(i))
Expand All @@ -232,10 +270,6 @@ case class InsertIntoHiveTable(
}
}

// ORC stores compression information in table properties. While, there are other formats
// (e.g. RCFile) that rely on hadoop configurations to store compression information.
val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableWritable(jobConf)
if (dynamicPartNum>0) {
if (outputClass == null) {
throw new SparkException("Output value class not set")
Expand Down Expand Up @@ -300,8 +334,6 @@ case class InsertIntoHiveTable(
v.commitJob()
}
writerMap.clear()
//writer.commitJob()

} else {
saveAsHiveFile(
rdd,
Expand Down

0 comments on commit 60f70aa

Please sign in to comment.