Skip to content

Commit

Permalink
modify code to pass scala style checks
Browse files Browse the repository at this point in the history
  • Loading branch information
baishuo authored and liancheng committed Sep 17, 2014
1 parent 37c1c43 commit 7ce2d9f
Showing 1 changed file with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ case class InsertIntoHiveTable(
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
conf.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
conf.value.set("mapred.output.format.class",
fileSinkConf.getTableInfo.getOutputFileFormatClassName)
if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
Expand All @@ -136,7 +137,7 @@ case class InsertIntoHiveTable(
SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf.value))
log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
var writer: SparkHiveHadoopWriter = null
//Map restore writesr for Dynamic Partition
// Map restore writesr for Dynamic Partition
var writerMap: scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] = null
if (dynamicPartNum == 0) {
writer = new SparkHiveHadoopWriter(conf.value, fileSinkConf)
Expand Down Expand Up @@ -169,7 +170,8 @@ case class InsertIntoHiveTable(
writer2 = writerMap.get(record._2) match {
case Some(writer)=> writer
case None => {
val tempWriter = new SparkHiveHadoopWriter(conf.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false))
val tempWriter = new SparkHiveHadoopWriter(conf.value,
new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false))
tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
tempWriter.open(record._2)
writerMap += (record._2 -> tempWriter)
Expand Down Expand Up @@ -211,8 +213,11 @@ case class InsertIntoHiveTable(
* return: /part2=val2
* for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ...
* return: /part2=val2/part3=val3
* */
private def getDynamicPartDir(partCols: Array[String], row: Row, dynamicPartNum: Int, defaultPartName: String): String = {
*/
private def getDynamicPartDir(partCols: Array[String],
row: Row,
dynamicPartNum: Int,
defaultPartName: String): String = {
assert(dynamicPartNum > 0)
partCols
.takeRight(dynamicPartNum)
Expand Down Expand Up @@ -269,7 +274,8 @@ case class InsertIntoHiveTable(
if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg())
}
if (numStaPart == 0 && sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
if (numStaPart == 0 &&
sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg())
}
// check if static partition appear after dynamic partitions
Expand All @@ -294,7 +300,8 @@ case class InsertIntoHiveTable(

val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
val outputData = new Array[Any](fieldOIs.length)
val defaultPartName = jobConfSer.value.get("hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__")
val defaultPartName = jobConfSer.value.get(
"hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__")
var partColStr: Array[String] = null;
if (fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") != null) {
partColStr = fileSinkConf
Expand Down

0 comments on commit 7ce2d9f

Please sign in to comment.