From 3efa7b615f7c37538edb0afca30d4f300ac07aee Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 15 Nov 2018 20:44:47 +0100 Subject: [PATCH 01/10] Added a test for text datasource --- .../spark/sql/execution/datasources/text/TextSuite.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 0e7f3afa9c3ab..4400952f77ad3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -233,4 +233,13 @@ class TextSuite extends QueryTest with SharedSQLContext { assert(data(3) == Row("\"doh\"")) assert(data.length == 4) } + + test("do not produce empty files") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.emptyDataset[String].write.text(path) + val files = new File(path).listFiles() + assert(!files.exists(_.getName.endsWith("txt"))) + } + } } From 80aadf645ab63885ce6f43ac74b0c02871e10883 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 15 Nov 2018 21:11:00 +0100 Subject: [PATCH 02/10] Creating output stream on the first write --- .../execution/datasources/text/TextFileFormat.scala | 13 +++++++++---- .../sql/execution/datasources/text/TextSuite.scala | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 268297148b522..40e20c5387a83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.text +import java.io.OutputStream + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -148,17 +150,20 @@ class TextOutputWriter( context: TaskAttemptContext) extends OutputWriter { - private val writer = CodecStreams.createOutputStream(context, new Path(path)) + private var writer: Option[OutputStream] = None override def write(row: InternalRow): Unit = { + val os = writer.getOrElse(CodecStreams.createOutputStream(context, new Path(path))) + writer = Some(os) + if (!row.isNullAt(0)) { val utf8string = row.getUTF8String(0) - utf8string.writeTo(writer) + utf8string.writeTo(os) } - writer.write(lineSeparator) + os.write(lineSeparator) } override def close(): Unit = { - writer.close() + writer.map(_.close()) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 4400952f77ad3..a86d5ee37f3db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -234,7 +234,7 @@ class TextSuite extends QueryTest with SharedSQLContext { assert(data.length == 4) } - test("do not produce empty files") { + test("do not produce empty files for empty partitions") { withTempPath { dir => val path = dir.getCanonicalPath spark.emptyDataset[String].write.text(path) From 0a774ef9e4de987c9f3073b90396215b9f04ca16 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 15 Nov 2018 21:20:27 +0100 Subject: [PATCH 03/10] Test for csv --- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2efe1dda475c5..62deeb67a4aec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1859,4 +1859,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te checkAnswer(df, Row(null, csv)) } } + + test("do not produce empty files for empty partitions") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.emptyDataset[String].write.csv(path) + val files = new File(path).listFiles() + assert(!files.exists(_.getName.endsWith("csv"))) + } + } } From 47b71b7a235ffcdfa79753307f1afcb377a17977 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 15 Nov 2018 21:21:06 +0100 Subject: [PATCH 04/10] Don't produce empty CSV files --- .../datasources/csv/CSVFileFormat.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 964b56e706a0b..b90a42a7fe348 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -174,13 +174,18 @@ private[csv] class CsvOutputWriter( context: TaskAttemptContext, params: CSVOptions) extends OutputWriter with Logging { - private val charset = Charset.forName(params.charset) + private var univocityGenerator: Option[UnivocityGenerator] = None - private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) - - private val gen = new UnivocityGenerator(dataSchema, writer, params) + override def write(row: InternalRow): Unit = { + val gen = univocityGenerator.getOrElse { + val charset = Charset.forName(params.charset) + val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) + new UnivocityGenerator(dataSchema, os, params) + } + univocityGenerator = Some(gen) - override def write(row: InternalRow): Unit = gen.write(row) + gen.write(row) + } - override def close(): Unit = gen.close() + override def close(): Unit = univocityGenerator.map(_.close()) } From 040c71f8ea49ca10160cfa242095d6ebd2d76a8d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 15 Nov 2018 21:22:23 +0100 Subject: [PATCH 05/10] Test for JSON --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 06032ded42a53..12e99cb937da9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2550,4 +2550,13 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { emptyString(StringType, "") emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8)) } + + test("do not produce empty files for empty partitions") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.emptyDataset[String].write.json(path) + val files = new File(path).listFiles() + assert(!files.exists(_.getName.endsWith("json"))) + } + } } From 6f3cb18d5a863f6aded763bdeb5395f6622876ff Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 15 Nov 2018 21:32:32 +0100 Subject: [PATCH 06/10] Do not produce empty JSON files --- .../datasources/json/JsonFileFormat.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 1f7c9d73f19fe..51bebcc35ba7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -180,19 +180,19 @@ private[json] class JsonOutputWriter( " which can be read back by Spark only if multiLine is enabled.") } - private val writer = CodecStreams.createOutputStreamWriter( - context, new Path(path), encoding) - - // create the Generator without separator inserted between 2 records - private[this] val gen = new JacksonGenerator(dataSchema, writer, options) + private var jacksonGenerator: Option[JacksonGenerator] = None override def write(row: InternalRow): Unit = { + val gen = jacksonGenerator.getOrElse { + val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding) + // create the Generator without separator inserted between 2 records + new JacksonGenerator(dataSchema, os, options) + } + jacksonGenerator = Some(gen) + gen.write(row) gen.writeLineEnding() } - override def close(): Unit = { - gen.close() - writer.close() - } + override def close(): Unit = jacksonGenerator.map(_.close()) } From 586ab316ed2b9bce07a879dc89766dc854807c21 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 28 Nov 2018 13:33:22 +0100 Subject: [PATCH 07/10] Fix json tests --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index a524e05308f74..8807b131ceaf6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1898,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .text(path) val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path) - assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file + assert(jsonDF.count() === corruptRecordCount) assert(jsonDF.schema === new StructType() .add("_corrupt_record", StringType) .add("dummy", StringType)) @@ -1911,7 +1911,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { F.count($"dummy").as("valid"), F.count($"_corrupt_record").as("corrupt"), F.count("*").as("count")) - checkAnswer(counts, Row(1, 5, 7)) // null row for empty file + checkAnswer(counts, Row(1, 4, 6)) // null row for empty file } } From ed93caaa4a881d47c0c978d9ef825abe84ab2441 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 28 Nov 2018 23:03:38 +0100 Subject: [PATCH 08/10] Assigning new uniVocity parser only once --- .../spark/sql/execution/datasources/csv/CSVFileFormat.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 8a9b9dc271398..4c5a1d327023c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -175,9 +175,10 @@ private[csv] class CsvOutputWriter( val gen = univocityGenerator.getOrElse { val charset = Charset.forName(params.charset) val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) - new UnivocityGenerator(dataSchema, os, params) + val newGen = new UnivocityGenerator(dataSchema, os, params) + univocityGenerator = Some(newGen) + newGen } - univocityGenerator = Some(gen) gen.write(row) } From 129b3935e09d3ac2aea220419dfa19409c2a8f2d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 28 Nov 2018 23:06:19 +0100 Subject: [PATCH 09/10] Assigning new Jackson generator only once --- .../sql/execution/datasources/json/JsonFileFormat.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 525cc7e1e11ac..3042133ee43aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -181,9 +181,10 @@ private[json] class JsonOutputWriter( val gen = jacksonGenerator.getOrElse { val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding) // create the Generator without separator inserted between 2 records - new JacksonGenerator(dataSchema, os, options) + val newGen = new JacksonGenerator(dataSchema, os, options) + jacksonGenerator = Some(newGen) + newGen } - jacksonGenerator = Some(gen) gen.write(row) gen.writeLineEnding() From 083d411ec1822986dbb82fbe1896a6c0d846c7d8 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 28 Nov 2018 23:11:31 +0100 Subject: [PATCH 10/10] Assigning new output strean only once --- .../execution/datasources/text/TextFileFormat.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 40e20c5387a83..01948ab25d63c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -150,11 +150,14 @@ class TextOutputWriter( context: TaskAttemptContext) extends OutputWriter { - private var writer: Option[OutputStream] = None + private var outputStream: Option[OutputStream] = None override def write(row: InternalRow): Unit = { - val os = writer.getOrElse(CodecStreams.createOutputStream(context, new Path(path))) - writer = Some(os) + val os = outputStream.getOrElse{ + val newStream = CodecStreams.createOutputStream(context, new Path(path)) + outputStream = Some(newStream) + newStream + } if (!row.isNullAt(0)) { val utf8string = row.getUTF8String(0) @@ -164,6 +167,6 @@ class TextOutputWriter( } override def close(): Unit = { - writer.map(_.close()) + outputStream.map(_.close()) } }