Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import org.apache.spark.util.Utils


object FileCommitProtocol {
class TaskCommitMessage(obj: Any) extends Serializable
class TaskCommitMessage(val obj: Any) extends Serializable

object EmptyTaskCommitMessage extends TaskCommitMessage(Unit)
object EmptyTaskCommitMessage extends TaskCommitMessage(null)

/**
* Instantiates a FileCommitProtocol using the given className.
Expand Down Expand Up @@ -62,8 +62,11 @@ object FileCommitProtocol {


/**
* An interface to define how a Spark job commits its outputs. Implementations must be serializable,
* as the committer instance instantiated on the driver will be used for tasks on executors.
* An interface to define how a single Spark job commits its outputs. Two notes:
*
* 1. Implementations must be serializable, as the committer instance instantiated on the driver
* will be used for tasks on executors.
* 2. A committer should not be reused across multiple Spark jobs.
*
* The proper call sequence is:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,6 @@ trait FileFormat {
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory

/**
* Returns a [[OutputWriterFactory]] for generating output writers that can write data.
* This method is current used only by FileStreamSinkWriter to generate output writers that
* does not use output committers to write data. The OutputWriter generated by the returned
* [[OutputWriterFactory]] must implement the method `newWriter(path)`..
*/
def buildWriter(
sqlContext: SQLContext,
dataSchema: StructType,
options: Map[String, String]): OutputWriterFactory = {
// TODO: Remove this default implementation when the other formats have been ported
throw new UnsupportedOperationException(s"buildWriter is not supported for $this")
}

/**
* Returns whether this format support returning columnar batch or not.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter


/** A helper object for writing data out to a location. */
object WriteOutput extends Logging {
/** A helper object for writing FileFormat data out to a location. */
object FileFormatWriter extends Logging {

/** A shared job description for all the write tasks. */
private class WriteJobDescription(
Expand All @@ -55,7 +55,6 @@ object WriteOutput extends Logging {
val partitionColumns: Seq[Attribute],
val nonPartitionColumns: Seq[Attribute],
val bucketSpec: Option[BucketSpec],
val isAppend: Boolean,
val path: String)
extends Serializable {

Expand All @@ -82,18 +81,18 @@ object WriteOutput extends Logging {
sparkSession: SparkSession,
plan: LogicalPlan,
fileFormat: FileFormat,
outputPath: Path,
committer: FileCommitProtocol,
outputPath: String,
hadoopConf: Configuration,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
refreshFunction: (Seq[TablePartitionSpec]) => Unit,
options: Map[String, String],
isAppend: Boolean): Unit = {
options: Map[String, String]): Unit = {

val job = Job.getInstance(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, outputPath)
FileOutputFormat.setOutputPath(job, new Path(outputPath))

val partitionSet = AttributeSet(partitionColumns)
val dataColumns = plan.output.filterNot(partitionSet.contains)
Expand All @@ -111,16 +110,11 @@ object WriteOutput extends Logging {
partitionColumns = partitionColumns,
nonPartitionColumns = dataColumns,
bucketSpec = bucketSpec,
isAppend = isAppend,
path = outputPath.toString)
path = outputPath)

SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
// This call shouldn't be put into the `try` block below because it only initializes and
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
outputPath.toString,
isAppend)
committer.setupJob(job)

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,22 @@ case class InsertIntoHadoopFsRelationCommand(
val isAppend = pathExists && (mode == SaveMode.Append)

if (doInsertion) {
WriteOutput.write(
sparkSession,
query,
fileFormat,
qualifiedOutputPath,
hadoopConf,
partitionColumns,
bucketSpec,
refreshFunction,
options,
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
outputPath.toString,
isAppend)

FileFormatWriter.write(
sparkSession = sparkSession,
plan = query,
fileFormat = fileFormat,
committer = committer,
outputPath = qualifiedOutputPath.toString,
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
refreshFunction = refreshFunction,
options = options)
} else {
logInfo("Skipping insertion into a relation that already exists.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,17 +415,6 @@ class ParquetFileFormat
}
}
}

override def buildWriter(
sqlContext: SQLContext,
dataSchema: StructType,
options: Map[String, String]): OutputWriterFactory = {
new ParquetOutputWriterFactory(
sqlContext.conf,
dataSchema,
sqlContext.sessionState.newHadoopConf(),
options)
}
}

object ParquetFileFormat extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,125 +17,13 @@

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.hadoop.{ParquetOutputFormat, ParquetRecordWriter}
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration


/**
* A factory for generating OutputWriters for writing parquet files. This implemented is different
* from the [[ParquetOutputWriter]] as this does not use any [[OutputCommitter]]. It simply
* writes the data to the path used to generate the output writer. Callers of this factory
* has to ensure which files are to be considered as committed.
*/
private[parquet] class ParquetOutputWriterFactory(
sqlConf: SQLConf,
dataSchema: StructType,
hadoopConf: Configuration,
options: Map[String, String])
extends OutputWriterFactory {

private val serializableConf: SerializableConfiguration = {
val job = Job.getInstance(hadoopConf)
val conf = ContextUtil.getConfiguration(job)
val parquetOptions = new ParquetOptions(options, sqlConf)

// We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
// it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why
// we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])

ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushing down filters.
val dataSchemaToWrite = StructType.removeMetadata(
StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)

// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
conf.set(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sqlConf.isParquetBinaryAsString.toString)

conf.set(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sqlConf.isParquetINT96AsTimestamp.toString)

conf.set(
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
sqlConf.writeLegacyParquetFormat.toString)

// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
new SerializableConfiguration(conf)
}

/**
* Returns a [[OutputWriter]] that writes data to the give path without using
* [[OutputCommitter]].
*/
override def newWriter(path: String): OutputWriter = new OutputWriter {

// Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter
private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0)
private val hadoopAttemptContext = new TaskAttemptContextImpl(
serializableConf.value, hadoopTaskAttemptId)

// Instance of ParquetRecordWriter that does not use OutputCommitter
private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext)

override def write(row: Row): Unit = {
throw new UnsupportedOperationException("call writeInternal")
}

protected[sql] override def writeInternal(row: InternalRow): Unit = {
recordWriter.write(null, row)
}

override def close(): Unit = recordWriter.close(hadoopAttemptContext)
}

/** Create a [[ParquetRecordWriter]] that writes the given path without using OutputCommitter */
private def createNoCommitterRecordWriter(
path: String,
hadoopAttemptContext: TaskAttemptContext): RecordWriter[Void, InternalRow] = {
// Custom ParquetOutputFormat that disable use of committer and writes to the given path
val outputFormat = new ParquetOutputFormat[InternalRow]() {
override def getOutputCommitter(c: TaskAttemptContext): OutputCommitter = { null }
override def getDefaultWorkFile(c: TaskAttemptContext, ext: String): Path = { new Path(path) }
}
outputFormat.getRecordWriter(hadoopAttemptContext)
}

/** Disable the use of the older API. */
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
throw new UnsupportedOperationException("this version of newInstance not supported for " +
"ParquetOutputWriterFactory")
}

override def getFileExtension(context: TaskAttemptContext): String = {
CodecConfig.from(context).getCodec.getExtension + ".parquet"
}
}

import org.apache.spark.sql.execution.datasources.OutputWriter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this down here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not. This is the top.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh... i see


// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[parquet] class ParquetOutputWriter(path: String, context: TaskAttemptContext)
Expand Down
Loading