Skip to content

Commit

Permalink
Refactors InsertIntoHiveTable to a Command
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Sep 17, 2014
1 parent 528e84c commit fae9eff
Showing 1 changed file with 4 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.hive.execution

import scala.collection.JavaConversions._
import scala.collection.mutable

import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.conf.HiveConf
Expand All @@ -31,14 +30,12 @@ import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}

import org.apache.spark.SparkContext._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode}
import org.apache.spark.sql.hive._
import org.apache.spark.{SerializableWritable, SparkException, TaskContext}

Expand All @@ -52,7 +49,7 @@ case class InsertIntoHiveTable(
child: SparkPlan,
overwrite: Boolean)
(@transient sc: HiveContext)
extends UnaryNode {
extends UnaryNode with Command {

@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
Expand Down Expand Up @@ -172,16 +169,14 @@ case class InsertIntoHiveTable(
}
}

override def execute() = result

/**
* Inserts all the rows in the table into Hive. Row objects are properly serialized with the
* `org.apache.hadoop.hive.serde2.SerDe` and the
* `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
*
* Note: this is run once and then kept to avoid double insertions.
*/
private lazy val result: RDD[Row] = {
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
Expand Down Expand Up @@ -293,6 +288,6 @@ case class InsertIntoHiveTable(
// however for now we return an empty list to simplify compatibility checks with hive, which
// does not return anything for insert operations.
// TODO: implement hive compatibility as rules.
sc.sparkContext.makeRDD(Nil, 1)
Seq.empty[Row]
}
}

0 comments on commit fae9eff

Please sign in to comment.