Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed May 21, 2018
1 parent 28e33ff commit ef45539
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.util.SerializableConfiguration


/**
* Simple metrics collected during an instance of [[FileFormatWriter.ExecuteWriteTask]].
* Simple metrics collected during an instance of [[FileFormatDataWriter]].
* These were first introduced in https://github.com/apache/spark/pull/18159 (SPARK-20703).
*/
case class BasicWriteTaskStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage}
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.SerializableConfiguration

Expand All @@ -38,8 +37,7 @@ import org.apache.spark.util.SerializableConfiguration
abstract class FileFormatDataWriter(
description: WriteJobDescription,
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol
) extends DataWriter[InternalRow] {
committer: FileCommitProtocol) {
/**
* Max number of files a single task writes out due to file size. In most cases the number of
* files written should be very small. This is just a safe guard to protect some really bad
Expand All @@ -54,6 +52,9 @@ abstract class FileFormatDataWriter(
val statsTrackers: Seq[WriteTaskStatsTracker] =
description.statsTrackers.map(_.newTaskInstance())

/** Writes a record */
def write(record: InternalRow): Unit

def releaseResources(): Unit = {
if (currentWriter != null) {
try {
Expand All @@ -70,16 +71,16 @@ abstract class FileFormatDataWriter(
* to the driver and used to update the catalog. Other information will be sent back to the
* driver too and used to e.g. update the metrics in UI.
*/
override def commit(): WriteTaskResult = {
def commit(): WriteTaskResult = {
releaseResources()
committer.commitTask(taskAttemptContext)
val summary = ExecutedWriteSummary(
updatedPartitions = Set.empty,
updatedPartitions = updatedPartitions.toSet,
stats = statsTrackers.map(_.getFinalStats()))
WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
}

override def abort(): Unit = {
def abort(): Unit = {
try {
releaseResources()
} finally {
Expand Down Expand Up @@ -298,7 +299,6 @@ class WriteJobDescription(

/** The result of a successful write task. */
case class WriteTaskResult(commitMsg: TaskCommitMessage, summary: ExecutedWriteSummary)
extends WriterCommitMessage

/**
* Wrapper class for the metrics of writing data out.
Expand Down

0 comments on commit ef45539

Please sign in to comment.