From b6da2287b8d7924a978106b953a83dabad288980 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 22 May 2016 16:54:11 +0800 Subject: [PATCH 1/2] Some refactor --- .../datasources/parquet/ParquetRelation.scala | 4 ++-- .../spark/sql/execution/streaming/FileStreamSink.scala | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index cf5c8e94f468d..ea323a9060d78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -494,8 +494,8 @@ private[sql] class ParquetOutputWriterFactory( dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { throw new UnsupportedOperationException( - "this verison of newInstance not supported for " + - "ParquetOutputWriterFactory") + "this version of newInstance is not supported for " + + classOf[ParquetOutputWriterFactory].getSimpleName) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index e19101032967b..ee6657ca6a0ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -40,11 +40,11 @@ object FileStreamSink { } /** - * A sink that writes out results to parquet files. Each batch is written out to a unique - * directory. After all of the files in a batch have been successfully written, the list of - * file paths is appended to the log atomically. In the case of partial failures, some duplicate - * data may be present in the target directory, but only one copy of each file will be present - * in the log. + * A sink that writes out results to files on a HDFS-compatible file system. Each batch is written + * out to a unique directory. After all of the files in a batch have been successfully written, the + * list of file paths is appended to the log atomically. In the case of partial failures, some + * duplicate data may be present in the target directory, but only one copy of each file will be + * present in the log. */ class FileStreamSink( sparkSession: SparkSession, From b58e97f97930a81183d737319ba431e6722905a5 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 22 May 2016 16:55:26 +0800 Subject: [PATCH 2/2] Add `text` format support for FileStreamSink --- .../execution/datasources/DataSource.scala | 7 +- .../datasources/text/DefaultSource.scala | 86 ++++++++++- .../sql/streaming/FileStreamSinkSuite.scala | 139 +++++++++++++----- 3 files changed, 193 insertions(+), 39 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e5dd4d81d6779..81d36a157b394 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -218,12 +218,15 @@ case class DataSource( providingClass.newInstance() match { case s: StreamSinkProvider => s.createSink(sparkSession.sqlContext, options, partitionColumns) - case parquet: parquet.DefaultSource => + // TODO: Remove the `isInstanceOf` check when other formats have been ported + case fileFormat: FileFormat + if fileFormat.isInstanceOf[parquet.DefaultSource] + || fileFormat.isInstanceOf[text.DefaultSource] => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") }) - new FileStreamSink(sparkSession, path, parquet, partitionColumns, options) + new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, options) case _ => throw new UnsupportedOperationException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index f091615a9a714..52dc71774e355 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -20,14 +20,16 @@ package org.apache.spark.sql.execution.datasources.text import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{NullWritable, Text} -import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} +import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -39,6 +41,8 @@ class DefaultSource extends FileFormat with DataSourceRegister { override def shortName(): String = "text" + override def toString: String = "Text" + private def verifySchema(schema: StructType): Unit = { if (schema.size != 1) { throw new AnalysisException( @@ -108,6 +112,17 @@ class DefaultSource extends FileFormat with DataSourceRegister { } } } + + override def buildWriter( + sqlContext: SQLContext, + dataSchema: StructType, + options: Map[String, String]): OutputWriterFactory = { + new TextOutputWriterFactory( + sqlContext.conf, + dataSchema, + sqlContext.sparkContext.hadoopConfiguration, + options) + } } class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext) @@ -139,3 +154,70 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp recordWriter.close(context) } } + +/** + * A factory for generating [[OutputWriter]]s for writing text files. This implemented is different + * from the [[TextOutputWriter]] as this does not use any [[OutputCommitter]]. It simply + * writes the data to the path used to generate the output writer. Callers of this factory + * has to ensure which files are to be considered as committed. + */ +private[sql] class TextOutputWriterFactory( + sqlConf: SQLConf, + dataSchema: StructType, + hadoopConf: Configuration, + options: Map[String, String]) extends OutputWriterFactory { + + private val serializableConf = + new SerializableConfiguration(Job.getInstance(hadoopConf).getConfiguration) + + /** + * Returns a [[OutputWriter]] that writes data to the give path without using an + * [[OutputCommitter]]. + */ + override private[sql] def newWriter(path: String): OutputWriter = new OutputWriter { + + private val hadoopTaskAttempId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) + private val hadoopAttemptContext = + new TaskAttemptContextImpl(serializableConf.value, hadoopTaskAttempId) + + // Instance of RecordWriter that does not use OutputCommitter + private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext) + + private[this] val buffer = new Text() + + override def write(row: Row): Unit = { + throw new UnsupportedOperationException("call writeInternal") + } + + protected[sql] override def writeInternal(row: InternalRow): Unit = { + val utf8string = row.getUTF8String(0) + buffer.set(utf8string.getBytes) + recordWriter.write(NullWritable.get(), buffer) + } + + override def close(): Unit = recordWriter.close(hadoopAttemptContext) + } + + /** Create a [[RecordWriter]] that writes the given path without using an [[OutputCommitter]]. */ + private def createNoCommitterRecordWriter( + path: String, + hadoopAttemptContext: TaskAttemptContext): RecordWriter[NullWritable, Text] = { + // Custom TextOutputFormat that disable use of committer and writes to the given path + val outputFormat = new TextOutputFormat[NullWritable, Text]() { + override def getOutputCommitter(c: TaskAttemptContext): OutputCommitter = { null } + override def getDefaultWorkFile(c: TaskAttemptContext, ext: String): Path = { new Path(path) } + } + outputFormat.getRecordWriter(hadoopAttemptContext) + } + + /** Disable the use of the older API. */ + def newInstance( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + throw new UnsupportedOperationException( + "this version of newInstance is not supported for " + + classOf[TextOutputWriterFactory].getSimpleName) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 6238b74ffac56..86636da420f30 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -26,27 +26,44 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileCatalog} +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ +import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils class FileStreamSinkSuite extends StreamTest with SharedSQLContext { import testImplicits._ + test("FileStreamSinkWriter - parquet - unpartitioned data") { + testUnpartitionedData(new parquet.DefaultSource()) + } + + test("FileStreamSinkWriter - text - unpartitioned data") { + testUnpartitionedData(new text.DefaultSource()) + } - test("FileStreamSinkWriter - unpartitioned data") { + private def testUnpartitionedData(fileFormat: FileFormat with DataSourceRegister) { val path = Utils.createTempDir() path.delete() val hadoopConf = spark.sparkContext.hadoopConfiguration - val fileFormat = new parquet.DefaultSource() + val testingAgainstText = fileFormat.isInstanceOf[text.DefaultSource] def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = { - val df = spark - .range(start, end, 1, numPartitions) - .select($"id", lit(100).as("data")) + val df = if (testingAgainstText) { + spark + .range(start, end, 1, numPartitions) + .map(_.toString) + .toDF("id") + } + else { + spark + .range(start, end, 1, numPartitions) + .select($"id", lit(100).as("data")) + } + val writer = new FileStreamSinkWriter( df, fileFormat, path.toString, partitionColumnNames = Nil, hadoopConf, Map.empty) writer.write().map(_.path.stripPrefix("file://")) @@ -56,7 +73,9 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { val files1 = writeRange(0, 10, 2) assert(files1.size === 2, s"unexpected number of files: $files1") checkFilesExist(path, files1, "file not written") - checkAnswer(spark.read.load(path.getCanonicalPath), (0 until 10).map(Row(_, 100))) + checkAnswer( + spark.read.format(fileFormat.shortName()).load(path.getCanonicalPath), + (0 until 10).map { id => if (testingAgainstText) Row(id.toString) else Row(id, 100)} ) // Append and check whether new files are written correctly and old files still exist val files2 = writeRange(10, 20, 3) @@ -64,23 +83,40 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { assert(files2.intersect(files1).isEmpty, "old files returned") checkFilesExist(path, files2, s"New file not written") checkFilesExist(path, files1, s"Old file not found") - checkAnswer(spark.read.load(path.getCanonicalPath), (0 until 20).map(Row(_, 100))) + checkAnswer( + spark.read.format(fileFormat.shortName()).load(path.getCanonicalPath), + (0 until 20).map { id => if (testingAgainstText) Row(id.toString) else Row(id, 100)} ) + } + + test("FileStreamSinkWriter - parquet - partitioned data") { + testPartitionedData(new parquet.DefaultSource()) + } + + test("FileStreamSinkWriter - text - partitioned data") { + testPartitionedData(new text.DefaultSource()) } - test("FileStreamSinkWriter - partitioned data") { + private def testPartitionedData(fileFormat: FileFormat with DataSourceRegister) { implicit val e = ExpressionEncoder[java.lang.Long] val path = Utils.createTempDir() path.delete() val hadoopConf = spark.sparkContext.hadoopConfiguration - val fileFormat = new parquet.DefaultSource() + val testingAgainstText = fileFormat.isInstanceOf[text.DefaultSource] def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = { - val df = spark - .range(start, end, 1, numPartitions) - .flatMap(x => Iterator(x, x, x)).toDF("id") - .select($"id", lit(100).as("data1"), lit(1000).as("data2")) - + val df = if (testingAgainstText) { + spark + .range(start, end, 1, numPartitions) + .map(_.toString) + .flatMap(x => Iterator(x, x, x)).toDF("id") + .select($"id", lit("100").as("data")) + } else { + spark + .range(start, end, 1, numPartitions) + .flatMap(x => Iterator(x, x, x)).toDF("id") + .select($"id", lit(100).as("data1"), lit(1000).as("data2")) + } require(df.rdd.partitions.size === numPartitions) val writer = new FileStreamSinkWriter( df, fileFormat, path.toString, partitionColumnNames = Seq("id"), hadoopConf, Map.empty) @@ -102,8 +138,12 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { checkFilesExist(path, files1, "file not written") checkOneFileWrittenPerKey(0 until 10, files1) - val answer1 = (0 until 10).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _)) - checkAnswer(spark.read.load(path.getCanonicalPath), answer1) + val answer1 = if (testingAgainstText) { + (0 until 10).flatMap(x => Iterator(x, x, x)).map(Row("100", _)) + } else { + (0 until 10).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _)) + } + checkAnswer(spark.read.format(fileFormat.shortName()).load(path.getCanonicalPath), answer1) // Append and check whether new files are written correctly and old files still exist val files2 = writeRange(0, 20, 3) @@ -113,12 +153,30 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { checkFilesExist(path, files1, s"Old file not found") checkOneFileWrittenPerKey(0 until 20, files2) - val answer2 = (0 until 20).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _)) - checkAnswer(spark.read.load(path.getCanonicalPath), answer1 ++ answer2) + val answer2 = if (testingAgainstText) { + (0 until 20).flatMap(x => Iterator(x, x, x)).map(Row("100", _)) + } + else { + (0 until 20).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _)) + } + + checkAnswer( + spark.read.format(fileFormat.shortName()).load(path.getCanonicalPath), + answer1 ++ answer2) } - test("FileStreamSink - unpartitioned writing and batch reading") { - val inputData = MemoryStream[Int] + test("FileStreamSink - parquet - unpartitioned writing and batch reading") { + testUnpartitionedWritingAndBatchReading(new parquet.DefaultSource()) + } + + test("FileStreamSink - text - unpartitioned writing and batch reading") { + testUnpartitionedWritingAndBatchReading(new text.DefaultSource()) + } + + private def testUnpartitionedWritingAndBatchReading( + fileFormat: FileFormat with DataSourceRegister) { + + val inputData = MemoryStream[String] val df = inputData.toDF() val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath @@ -129,18 +187,18 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { try { query = df.write - .format("parquet") + .format(fileFormat.shortName()) .option("checkpointLocation", checkpointDir) .startStream(outputDir) - inputData.addData(1, 2, 3) + inputData.addData("1", "2", "3") failAfter(streamingTimeout) { query.processAllAvailable() } - val outputDf = spark.read.parquet(outputDir).as[Int] - checkDataset(outputDf, 1, 2, 3) + val outputDf = spark.read.format(fileFormat.shortName()).load(outputDir).as[String] + checkDataset(outputDf, "1", "2", "3") } finally { if (query != null) { @@ -149,7 +207,17 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { } } - test("FileStreamSink - partitioned writing and batch reading") { + test("FileStreamSink - parquet - partitioned writing and batch reading") { + testPartitionedWritingAndBatchReading(new parquet.DefaultSource()) + } + + test("FileStreamSink - text - partitioned writing and batch reading") { + testPartitionedWritingAndBatchReading(new text.DefaultSource()) + } + + private def testPartitionedWritingAndBatchReading( + fileFormat: FileFormat with DataSourceRegister) { + val inputData = MemoryStream[Int] val ds = inputData.toDS() @@ -160,10 +228,10 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { try { query = - ds.map(i => (i, i * 1000)) + ds.map(i => (i, (i * 1000).toString)) .toDF("id", "value") .write - .format("parquet") + .format(fileFormat.shortName()) .partitionBy("id") .option("checkpointLocation", checkpointDir) .startStream(outputDir) @@ -173,9 +241,9 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { query.processAllAvailable() } - val outputDf = sqlContext.read.parquet(outputDir) + val outputDf = sqlContext.read.format(fileFormat.shortName()).load(outputDir) val expectedSchema = new StructType() - .add(StructField("value", IntegerType)) + .add(StructField("value", StringType)) .add(StructField("id", IntegerType)) assert(outputDf.schema === expectedSchema) @@ -192,8 +260,8 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { // Verify the data is correctly read checkDataset( - outputDf.as[(Int, Int)], - (1000, 1), (2000, 2), (3000, 3)) + outputDf.as[(String, Int)], + ("1000", 1), ("2000", 2), ("3000", 3)) /** Check some condition on the partitions of the FileScanRDD generated by a DF */ def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = { @@ -262,10 +330,11 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { testFormat(None) // should not throw error as default format parquet when not specified testFormat(Some("parquet")) + testFormat(Some("text")) val e = intercept[UnsupportedOperationException] { - testFormat(Some("text")) + testFormat(Some("csv")) } - Seq("text", "not support", "stream").foreach { s => + Seq("csv", "not support", "stream").foreach { s => assert(e.getMessage.contains(s)) } }