Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jan 18, 2017
1 parent 2f24c10 commit 150efa2
Showing 1 changed file with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{JobConf, Reporter}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory}
Expand All @@ -42,7 +43,7 @@ import org.apache.spark.util.SerializableJobConf
*
* TODO: implement the read logic.
*/
class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat with Logging {
override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
Expand All @@ -59,6 +60,19 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
val tableDesc = fileSinkConf.getTableInfo
conf.set("mapred.output.format.class", tableDesc.getOutputFileFormatClassName)

// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = sparkSession.sparkContext.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = conf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use an output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

// Add table properties from storage handler to hadoopConf, so any custom storage
// handler settings can be set to hadoopConf
HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, false)
Expand Down

0 comments on commit 150efa2

Please sign in to comment.