Skip to content
Permalink
Browse files

[SPARK-30227][SQL] Add close() on DataWriter interface

### What changes were proposed in this pull request?

This patch adds close() method to the DataWriter interface, which will become the place to cleanup the resource.

### Why are the changes needed?

The lifecycle of DataWriter instance ends at either commit() or abort(). That makes datasource implementors to feel they can place resource cleanup in both sides, but abort() can be called when commit() fails; so they have to ensure they don't do double-cleanup if cleanup is not idempotent.

### Does this PR introduce any user-facing change?

Depends on the definition of user; if they're developers of custom DSv2 source, they have to add close() in their DataWriter implementations. It's OK to just add close() with empty content as they should have already dealt with resource cleanup in commit/abort, but they would love to migrate the resource cleanup logic to close() as it avoids double cleanup. If they're just end users using the provided DSv2 source (regardless of built-in/3rd party), no change.

### How was this patch tested?

Existing tests.

Closes #26855 from HeartSaVioR/SPARK-30227.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
HeartSaVioR authored and cloud-fan committed Dec 13, 2019
1 parent cb6d2b3 commit 94eb66593a328dd3fcecc5f5f1772d82843ec14f
@@ -63,7 +63,10 @@ private[kafka010] class KafkaDataWriter(

def abort(): Unit = {}

def close(): Unit = {
def close(): Unit = {}

/** explicitly invalidate producer from pool. only for testing. */
private[kafka010] def invalidateProducer(): Unit = {
checkForErrors()
if (producer != null) {
producer.flush()
@@ -370,7 +370,7 @@ class KafkaContinuousSinkSuite extends KafkaSinkStreamingSuiteBase {
iter.foreach(writeTask.write(_))
writeTask.commit()
} finally {
writeTask.close()
writeTask.invalidateProducer()
}
}
}
@@ -17,6 +17,7 @@

package org.apache.spark.sql.connector.write;

import java.io.Closeable;
import java.io.IOException;

import org.apache.spark.annotation.Evolving;
@@ -31,8 +32,9 @@
* the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will
* not be processed. If all records are successfully written, {@link #commit()} is called.
*
* Once a data writer returns successfully from {@link #commit()} or {@link #abort()}, its lifecycle
* is over and Spark will not use it again.
* Once a data writer returns successfully from {@link #commit()} or {@link #abort()}, Spark will
* call {@link #close()} to let DataWriter doing resource cleanup. After calling {@link #close()},
* its lifecycle is over and Spark will not use it again.
*
* If this data writer succeeds(all records are successfully written and {@link #commit()}
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
@@ -56,7 +58,7 @@
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow}.
*/
@Evolving
public interface DataWriter<T> {
public interface DataWriter<T> extends Closeable {

/**
* Writes one record.
@@ -252,4 +252,6 @@ private class BufferWriter extends DataWriter[InternalRow] {
override def commit(): WriterCommitMessage = buffer

override def abort(): Unit = {}

override def close(): Unit = {}
}
@@ -86,6 +86,8 @@ abstract class FileFormatDataWriter(
committer.abortTask(taskAttemptContext)
}
}

override def close(): Unit = {}
}

/** FileFormatWriteTask for empty partitions */
@@ -277,6 +277,8 @@ object FileFormatWriter extends Logging {
// If there is an error, abort the task
dataWriter.abort()
logError(s"Job $jobId aborted.")
}, finallyBlock = {
dataWriter.close()
})
} catch {
case e: FetchFailedException =>
@@ -72,6 +72,7 @@ private[noop] object NoopWriter extends DataWriter[InternalRow] {
override def write(record: InternalRow): Unit = {}
override def commit(): WriterCommitMessage = null
override def abort(): Unit = {}
override def close(): Unit = {}
}

private[noop] object NoopStreamingWrite extends StreamingWrite {
@@ -467,6 +467,8 @@ object DataWritingSparkTask extends Logging {
dataWriter.abort()
logError(s"Aborted commit for partition $partId (task $taskId, attempt $attemptId, " +
s"stage $stageId.$stageAttempt)")
}, finallyBlock = {
dataWriter.close()
})
}
}
@@ -80,6 +80,8 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], writerFactory: StreamingDat
logError(s"Writer for partition ${context.partitionId()} is aborting.")
if (dataWriter != null) dataWriter.abort()
logError(s"Writer for partition ${context.partitionId()} aborted.")
}, finallyBlock = {
dataWriter.close()
})
}

@@ -135,7 +135,7 @@ class ForeachDataWriter[T](

// If open returns false, we should skip writing rows.
private val opened = writer.open(partitionId, epochId)
private var closeCalled: Boolean = false
private var errorOrNull: Throwable = _

override def write(record: InternalRow): Unit = {
if (!opened) return
@@ -144,25 +144,24 @@ class ForeachDataWriter[T](
writer.process(rowConverter(record))
} catch {
case t: Throwable =>
closeWriter(t)
errorOrNull = t
throw t
}

}

override def commit(): WriterCommitMessage = {
closeWriter(null)
ForeachWriterCommitMessage
}

override def abort(): Unit = {
closeWriter(new SparkException("Foreach writer has been aborted due to a task failure"))
if (errorOrNull == null) {
errorOrNull = new SparkException("Foreach writer has been aborted due to a task failure")
}
}

private def closeWriter(errorOrNull: Throwable): Unit = {
if (!closeCalled) {
closeCalled = true
writer.close(errorOrNull)
}
override def close(): Unit = {
writer.close(errorOrNull)
}
}

@@ -56,10 +56,12 @@ class PackedRowDataWriter() extends DataWriter[InternalRow] with Logging {
override def write(row: InternalRow): Unit = data.append(row.copy())

override def commit(): PackedRowCommitMessage = {
val msg = PackedRowCommitMessage(data.toArray)
data.clear()
msg
PackedRowCommitMessage(data.toArray)
}

override def abort(): Unit = data.clear()
override def abort(): Unit = {}

override def close(): Unit = {
data.clear()
}
}
@@ -191,6 +191,8 @@ class MemoryDataWriter(partition: Int, schema: StructType)
}

override def abort(): Unit = {}

override def close(): Unit = {}
}


@@ -240,4 +240,6 @@ class CSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[InternalRow]
fs.delete(file, false)
}
}

override def close(): Unit = {}
}

0 comments on commit 94eb665

Please sign in to comment.
You can’t perform that action at this time.