From 9cffa0fccc33552e8fce3580a9a665b022f5bf22 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 21 Mar 2018 21:03:11 +0100 Subject: [PATCH 01/20] Adding tests for select only requested columns --- .../execution/datasources/csv/CSVSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 461abdd96d3f3..a20e5d6fa90ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1322,4 +1322,30 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val sampled = spark.read.option("inferSchema", true).option("samplingRatio", 1.0).csv(ds) assert(sampled.count() == ds.count()) } + + test(s"Select a little of many columns") { + withTempPath { path => + import collection.JavaConverters._ + val schema = new StructType() + .add("f1", IntegerType).add("f2", IntegerType).add("f3", IntegerType) + .add("f4", IntegerType).add("f5", IntegerType).add("f6", IntegerType) + .add("f7", IntegerType).add("f8", IntegerType).add("f9", IntegerType) + .add("f10", IntegerType).add("f11", IntegerType).add("f12", IntegerType) + .add("f13", IntegerType).add("f14", IntegerType).add("f15", IntegerType) + + val odf = spark.createDataFrame(List( + Row(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15), + Row(-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15) + ).asJava, schema) + odf.write.csv(path.getCanonicalPath) + val idf = spark.read + .schema(schema) + .csv(path.getCanonicalPath) + + checkAnswer( + idf.select('f5, 'f10, 'f15), + List(Row(5, 10, 15), Row(-5, -10, -15)) + ) + } + } } From fdbcbe3536aee04e6a84b72ac319726614416bc3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 21 Mar 2018 21:42:08 +0100 Subject: [PATCH 02/20] Select indexes of required columns only --- .../datasources/csv/UnivocityParser.scala | 25 ++++++++++++++----- .../execution/datasources/csv/CSVSuite.scala | 7 +++--- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 99557a1ceb0c8..4775d702ee8b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -73,11 +73,25 @@ class UnivocityParser( // Each input token is placed in each output row's position by mapping these. In this case, // // output row - ["A", 2] - private val valueConverters: Array[ValueConverter] = - schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray + private val valueConverters: Array[ValueConverter] = { + requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray + } + + private val tokenIndexArr: Seq[java.lang.Integer] = { + requiredSchema.map(f => new java.lang.Integer(schema.indexOf(f))) + } + + private val tokenizer = { + val parserSetting = options.asParserSettings + parserSetting.selectIndexes(tokenIndexArr: _*) + new CsvParser(parserSetting) + } - private val tokenIndexArr: Array[Int] = { - requiredSchema.map(f => schema.indexOf(f)).toArray + private val row = new GenericInternalRow(requiredSchema.length) + + // Retrieve the raw record string. + private def getCurrentInput: UTF8String = { + UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) } /** @@ -211,8 +225,7 @@ class UnivocityParser( try { var i = 0 while (i < requiredSchema.length) { - val from = tokenIndexArr(i) - row(i) = valueConverters(from).apply(tokens(from)) + row(i) = valueConverters(i).apply(tokens(i)) i += 1 } row diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index a20e5d6fa90ee..c712a6fc4ff45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1323,7 +1323,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(sampled.count() == ds.count()) } - test(s"Select a little of many columns") { + test("Select a little of many columns") { withTempPath { path => import collection.JavaConverters._ val schema = new StructType() @@ -1341,10 +1341,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val idf = spark.read .schema(schema) .csv(path.getCanonicalPath) + .select('f15, 'f10, 'f5) checkAnswer( - idf.select('f5, 'f10, 'f15), - List(Row(5, 10, 15), Row(-5, -10, -15)) + idf, + List(Row(15, 10, 5), Row(-15, -10, -5)) ) } } From 578f47b0f32a76caf6c9ede8763c9cf85a1c83e9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Mar 2018 11:41:29 +0100 Subject: [PATCH 03/20] Fix the case when number of parsed fields are not matched to required schema --- .../sql/execution/datasources/csv/UnivocityParser.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 4775d702ee8b5..161208089a868 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -199,14 +199,14 @@ class UnivocityParser( def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) private def convert(tokens: Array[String]): InternalRow = { - if (tokens.length != schema.length) { + if (tokens.length != requiredSchema.length) { // If the number of tokens doesn't match the schema, we should treat it as a malformed record. // However, we still have chance to parse some of the tokens, by adding extra null tokens in // the tail if the number is smaller, or by dropping extra tokens if the number is larger. - val checkedTokens = if (schema.length > tokens.length) { - tokens ++ new Array[String](schema.length - tokens.length) + val checkedTokens = if (requiredSchema.length > tokens.length) { + tokens ++ new Array[String](requiredSchema.length - tokens.length) } else { - tokens.take(schema.length) + tokens.take(requiredSchema.length) } def getPartialResult(): Option[InternalRow] = { try { From 0f942c308dca173dad8f421e893066b8c03d35a3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Mar 2018 12:07:55 +0100 Subject: [PATCH 04/20] Using selectIndexes if required number of columns are less than its total number. --- .../execution/datasources/csv/UnivocityParser.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 161208089a868..b323b972470a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -77,13 +77,14 @@ class UnivocityParser( requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray } - private val tokenIndexArr: Seq[java.lang.Integer] = { - requiredSchema.map(f => new java.lang.Integer(schema.indexOf(f))) - } - private val tokenizer = { val parserSetting = options.asParserSettings - parserSetting.selectIndexes(tokenIndexArr: _*) + if (requiredSchema.length < schema.length) { + val tokenIndexArr: Seq[java.lang.Integer] = { + requiredSchema.map(f => new java.lang.Integer(schema.indexOf(f))) + } + parserSetting.selectIndexes(tokenIndexArr: _*) + } new CsvParser(parserSetting) } From c4b11601e9c264729e141fff3dc653d868a7ad69 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 24 Mar 2018 12:48:43 +0100 Subject: [PATCH 05/20] Fix the test: force to read all columns --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index c712a6fc4ff45..32ee9f003c42d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -267,7 +267,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .options(Map("header" -> "true", "mode" -> "dropmalformed")) .load(testFile(carsFile)) - assert(cars.select("year").collect().size === 2) + assert(cars.collect().size === 2) } } From 8cf6eab952d79628cb8ee2ff7b92dadae60ec686 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 6 Apr 2018 22:55:35 +0200 Subject: [PATCH 06/20] Fix merging conflicts --- .../sql/execution/datasources/csv/UnivocityParser.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index b323b972470a4..abd63d66576ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -45,15 +45,6 @@ class UnivocityParser( // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any - private val tokenizer = new CsvParser(options.asParserSettings) - - private val row = new GenericInternalRow(requiredSchema.length) - - // Retrieve the raw record string. - private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) - } - // This parser first picks some tokens from the input tokens, according to the required schema, // then parse these tokens and put the values in a row, with the order specified by the required // schema. From 5b2f0b9d7346f927842bc1a2089a7299876f1894 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 29 Apr 2018 13:52:08 +0200 Subject: [PATCH 07/20] Benchmarks for many columns --- .../datasources/csv/CSVBenchmarks.scala | 128 ++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala new file mode 100644 index 0000000000000..0a41fb506b422 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.csv + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType} +import org.apache.spark.util.{Benchmark, Utils} + +/** + * Benchmark to measure CSV read/write performance. + * To run this: + * spark-submit --class --jars + */ +object CSVBenchmarks { + val conf = new SparkConf() + + val spark = SparkSession.builder + .master("local[1]") + .appName("benchmark-csv-datasource") + .config(conf) + .getOrCreate() + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { + val path = Utils.createTempDir() + path.delete() + try f(path) finally Utils.deleteRecursively(path) + } + + + def perlineBenchmark(rowsNum: Int): Unit = { + val benchmark = new Benchmark("CSV parsing in the per-line mode", rowsNum) + + withTempPath { path => + // scalastyle:off + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on + + spark.sparkContext.range(0, rowsNum, 1) + .map(_ => "a") + .toDF("colA") + .write.csv(path.getAbsolutePath) + + val schema = new StructType().add("colA", StringType) + val ds = spark.read.schema(schema).csv(path.getAbsolutePath) + + benchmark.addCase("Read CSV file with one column", 3) { _ => + ds.count() + } + + /* + Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz + + CSV parsing in the per-line mode: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------ + Read CSV file with one column 23352 / 23495 4.3 233.5 1.0X + */ + benchmark.run() + } + } + + def multiColumnsBenchmark(rowsNum: Int): Unit = { + val colsNum = 1000 + val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum) + + withTempPath { path => + // scalastyle:off println + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on println + + val fields = for (i <- 0 until colsNum) yield StructField(s"col$i", DoubleType) + val schema = StructType(fields) + val values = (0 until colsNum).map(i => s"$i.$i").mkString(",") + val columnNames = schema.fieldNames + + val rdd = spark.sparkContext.range(0, rowsNum, 1) + .map(_ => Row.fromSeq((0 until colsNum).map(_ + 0.1))) + val df = spark.createDataFrame(rdd, schema) + df.write.option("header", true).csv(path.getAbsolutePath) + + val ds = spark.read.schema(schema).csv(path.getAbsolutePath) + + benchmark.addCase(s"Select all columns", 3) { _ => + ds.select("*").count() + } + benchmark.addCase(s"Select 10 columns", 3) { _ => + ds.select($"col0", $"col10", $"col20", $"col30", $"col40", + $"col50", $"col60", $"col70", $"col80", $"col90").count() + } + benchmark.addCase(s"Select one column", 3) { _ => + ds.select($"col0").count() + } + + /* + Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz + + Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + --------------------------------------------------------------------------------------------- + Select all columns 53338 / 53477 0.0 53337.6 1.0X + Select 10 columns 52724 / 53132 0.0 52723.6 1.0X + Select one column 52277 / 52494 0.0 52276.7 1.0X + */ + benchmark.run() + } + } + + def main(args: Array[String]): Unit = { + perlineBenchmark(100 * 1000 * 1000) + multiColumnsBenchmark(rowsNum = 1000 * 1000) + } +} From 6d1e902c0011e88dbafb65c4ad6e7431370ed12d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 29 Apr 2018 14:59:58 +0200 Subject: [PATCH 08/20] Make size of requiredSchema equals to amount of selected columns --- .../datasources/csv/CSVBenchmarks.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala index 0a41fb506b422..2dc8afa93fe99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala @@ -98,24 +98,25 @@ object CSVBenchmarks { val ds = spark.read.schema(schema).csv(path.getAbsolutePath) benchmark.addCase(s"Select all columns", 3) { _ => - ds.select("*").count() + ds.select("*").filter((row: Row) => true).count() } benchmark.addCase(s"Select 10 columns", 3) { _ => ds.select($"col0", $"col10", $"col20", $"col30", $"col40", - $"col50", $"col60", $"col70", $"col80", $"col90").count() + $"col50", $"col60", $"col70", $"col80", $"col90") + .filter((row: Row) => true).count() } benchmark.addCase(s"Select one column", 3) { _ => - ds.select($"col0").count() + ds.select($"col0").filter((row: Row) => true).count() } /* Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz - Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - --------------------------------------------------------------------------------------------- - Select all columns 53338 / 53477 0.0 53337.6 1.0X - Select 10 columns 52724 / 53132 0.0 52723.6 1.0X - Select one column 52277 / 52494 0.0 52276.7 1.0X + Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + -------------------------------------------------------------------------------------------- + Select all columns 135378 / 206881 0.0 135378.2 1.0X + Select 10 columns 57290 / 57581 0.0 57289.7 2.4X + Select one column 55718 / 56358 0.0 55718.3 2.4X */ benchmark.run() } From 4525795f7337cbd081f569cd79d7f90cb58edbee Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 29 Apr 2018 15:36:54 +0200 Subject: [PATCH 09/20] Removing selection of all columns --- .../datasources/csv/CSVBenchmarks.scala | 27 ++++++++----------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala index 2dc8afa93fe99..dba9f9362745a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.File import org.apache.spark.SparkConf -import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType} +import org.apache.spark.sql.{Column, Row, SparkSession} +import org.apache.spark.sql.types._ import org.apache.spark.util.{Benchmark, Utils} /** @@ -85,28 +85,24 @@ object CSVBenchmarks { benchmark.out.println("Preparing data for benchmarking ...") // scalastyle:on println - val fields = for (i <- 0 until colsNum) yield StructField(s"col$i", DoubleType) + val fields = for (i <- 0 until colsNum) yield StructField(s"col$i", IntegerType) val schema = StructType(fields) - val values = (0 until colsNum).map(i => s"$i.$i").mkString(",") + val values = (0 until colsNum).map(i => i.toString).mkString(",") val columnNames = schema.fieldNames val rdd = spark.sparkContext.range(0, rowsNum, 1) - .map(_ => Row.fromSeq((0 until colsNum).map(_ + 0.1))) + .map(_ => Row.fromSeq((0 until colsNum))) val df = spark.createDataFrame(rdd, schema) df.write.option("header", true).csv(path.getAbsolutePath) val ds = spark.read.schema(schema).csv(path.getAbsolutePath) - benchmark.addCase(s"Select all columns", 3) { _ => - ds.select("*").filter((row: Row) => true).count() - } - benchmark.addCase(s"Select 10 columns", 3) { _ => - ds.select($"col0", $"col10", $"col20", $"col30", $"col40", - $"col50", $"col60", $"col70", $"col80", $"col90") - .filter((row: Row) => true).count() + val cols100 = columnNames.take(100).map(Column(_)) + benchmark.addCase(s"Select 100 columns", 3) { _ => + ds.select(cols100: _*).filter((row: Row) => true).count() } benchmark.addCase(s"Select one column", 3) { _ => - ds.select($"col0").filter((row: Row) => true).count() + ds.select($"col1").filter((row: Row) => true).count() } /* @@ -114,9 +110,8 @@ object CSVBenchmarks { Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------- - Select all columns 135378 / 206881 0.0 135378.2 1.0X - Select 10 columns 57290 / 57581 0.0 57289.7 2.4X - Select one column 55718 / 56358 0.0 55718.3 2.4X + Select 100 columns 46345 / 58202 0.0 46345.3 1.0X + Select one column 44344 / 44650 0.0 44344.0 1.0X */ benchmark.run() } From 8809cecf93d8e7a97eca827d9e8637a7eb5b2449 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 29 Apr 2018 15:50:44 +0200 Subject: [PATCH 10/20] Updating benchmarks for select indexes --- .../spark/sql/execution/datasources/csv/CSVBenchmarks.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala index dba9f9362745a..89121962691ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala @@ -110,8 +110,8 @@ object CSVBenchmarks { Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------- - Select 100 columns 46345 / 58202 0.0 46345.3 1.0X - Select one column 44344 / 44650 0.0 44344.0 1.0X + Select 100 columns 33388 / 33661 0.0 33388.3 1.0X + Select one column 22825 / 23111 0.0 22825.5 1.5X */ benchmark.run() } From dc97ceb96185ed2eaa05fbe1aee8ecfe8ccb7e7d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 5 May 2018 21:19:17 +0200 Subject: [PATCH 11/20] Addressing Herman's review comments --- .../datasources/csv/UnivocityParser.scala | 4 +--- .../datasources/csv/CSVBenchmarks.scala | 23 ++++++++++--------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index abd63d66576ad..e0c7c39224f77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -71,9 +71,7 @@ class UnivocityParser( private val tokenizer = { val parserSetting = options.asParserSettings if (requiredSchema.length < schema.length) { - val tokenIndexArr: Seq[java.lang.Integer] = { - requiredSchema.map(f => new java.lang.Integer(schema.indexOf(f))) - } + val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(schema.indexOf(f))) parserSetting.selectIndexes(tokenIndexArr: _*) } new CsvParser(parserSetting) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala index 89121962691ea..92d0d13d0e152 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala @@ -20,6 +20,7 @@ import java.io.File import org.apache.spark.SparkConf import org.apache.spark.sql.{Column, Row, SparkSession} +import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types._ import org.apache.spark.util.{Benchmark, Utils} @@ -81,22 +82,21 @@ object CSVBenchmarks { val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum) withTempPath { path => - // scalastyle:off println - benchmark.out.println("Preparing data for benchmarking ...") - // scalastyle:on println - - val fields = for (i <- 0 until colsNum) yield StructField(s"col$i", IntegerType) + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) val schema = StructType(fields) val values = (0 until colsNum).map(i => i.toString).mkString(",") val columnNames = schema.fieldNames - val rdd = spark.sparkContext.range(0, rowsNum, 1) - .map(_ => Row.fromSeq((0 until colsNum))) - val df = spark.createDataFrame(rdd, schema) - df.write.option("header", true).csv(path.getAbsolutePath) + spark.range(rowsNum) + .select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*) + .write.option("header", true) + .csv(path.getAbsolutePath) val ds = spark.read.schema(schema).csv(path.getAbsolutePath) + benchmark.addCase(s"Select $colsNum columns", 3) { _ => + ds.select("*").filter((row: Row) => true).count() + } val cols100 = columnNames.take(100).map(Column(_)) benchmark.addCase(s"Select 100 columns", 3) { _ => ds.select(cols100: _*).filter((row: Row) => true).count() @@ -110,8 +110,9 @@ object CSVBenchmarks { Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------- - Select 100 columns 33388 / 33661 0.0 33388.3 1.0X - Select one column 22825 / 23111 0.0 22825.5 1.5X + Select 1000 columns 87971 / 106054 0.0 87970.7 1.0X + Select 100 columns 26335 / 28599 0.0 26334.8 3.3X + Select one column 18345 / 18569 0.1 18345.3 4.8X */ benchmark.run() } From 51b31483263e13cd85b19b3efea65188945eda99 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 10 May 2018 20:39:38 +0200 Subject: [PATCH 12/20] Updated benchmark result for recent changes --- .../spark/sql/execution/datasources/csv/CSVBenchmarks.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala index 92d0d13d0e152..a217b162eee78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala @@ -110,9 +110,9 @@ object CSVBenchmarks { Wide rows with 1000 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------- - Select 1000 columns 87971 / 106054 0.0 87970.7 1.0X - Select 100 columns 26335 / 28599 0.0 26334.8 3.3X - Select one column 18345 / 18569 0.1 18345.3 4.8X + Select 1000 columns 76910 / 78065 0.0 76909.8 1.0X + Select 100 columns 28625 / 32884 0.0 28625.1 2.7X + Select one column 22498 / 22669 0.0 22497.8 3.4X */ benchmark.run() } From e3958b1468b490b548574b53512f0d83850e6f6f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 10 May 2018 20:46:17 +0200 Subject: [PATCH 13/20] Add ticket number to test title --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 32ee9f003c42d..e6569d66fdebf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1323,7 +1323,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(sampled.count() == ds.count()) } - test("Select a little of many columns") { + test("SPARK-24244: Select a little of many columns") { withTempPath { path => import collection.JavaConverters._ val schema = new StructType() From a4a0a549156a15011c33c7877a35f244d75b7a4f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 10 May 2018 21:02:24 +0200 Subject: [PATCH 14/20] Removing unnecessary benchmark --- .../datasources/csv/CSVBenchmarks.scala | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala index a217b162eee78..5fb1efd95d28b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala @@ -45,38 +45,6 @@ object CSVBenchmarks { try f(path) finally Utils.deleteRecursively(path) } - - def perlineBenchmark(rowsNum: Int): Unit = { - val benchmark = new Benchmark("CSV parsing in the per-line mode", rowsNum) - - withTempPath { path => - // scalastyle:off - benchmark.out.println("Preparing data for benchmarking ...") - // scalastyle:on - - spark.sparkContext.range(0, rowsNum, 1) - .map(_ => "a") - .toDF("colA") - .write.csv(path.getAbsolutePath) - - val schema = new StructType().add("colA", StringType) - val ds = spark.read.schema(schema).csv(path.getAbsolutePath) - - benchmark.addCase("Read CSV file with one column", 3) { _ => - ds.count() - } - - /* - Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz - - CSV parsing in the per-line mode: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------ - Read CSV file with one column 23352 / 23495 4.3 233.5 1.0X - */ - benchmark.run() - } - } - def multiColumnsBenchmark(rowsNum: Int): Unit = { val colsNum = 1000 val benchmark = new Benchmark(s"Wide rows with $colsNum columns", rowsNum) @@ -119,7 +87,6 @@ object CSVBenchmarks { } def main(args: Array[String]): Unit = { - perlineBenchmark(100 * 1000 * 1000) multiColumnsBenchmark(rowsNum = 1000 * 1000) } } From fa860157c982846524bd8f151daf8a2154117b34 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 13 May 2018 20:49:49 +0200 Subject: [PATCH 15/20] Updating the migration guide --- docs/sql-programming-guide.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 3f79ed6422205..b5b72a8bcb492 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1814,6 +1814,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround. - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. + - Since Spark 2.4, handling of malformed rows in CSV files was changed. Previously, all column values of every row are parsed independently of its future usage. A row was considered as malformed if the CSV parser wasn't able to handle any column value in the row even if the value wasn't requested. Starting from version 2.4, only requested column values are parsed, and other values can be ignored. In such way, correct column values that were considered as malformed in previous Spark version only because of other malformed values become correct in Spark version 2.4. ## Upgrading From Spark SQL 2.2 to 2.3 From 15528d20a74904c14c58bf3ad54c9a552c519430 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 13 May 2018 20:55:06 +0200 Subject: [PATCH 16/20] Moving some values back as it was. --- .../datasources/csv/UnivocityParser.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index e0c7c39224f77..6034d431c0c4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -45,6 +45,22 @@ class UnivocityParser( // A `ValueConverter` is responsible for converting the given value to a desired type. private type ValueConverter = String => Any + private val tokenizer = { + val parserSetting = options.asParserSettings + if (requiredSchema.length < schema.length) { + val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(schema.indexOf(f))) + parserSetting.selectIndexes(tokenIndexArr: _*) + } + new CsvParser(parserSetting) + } + + private val row = new GenericInternalRow(requiredSchema.length) + + // Retrieve the raw record string. + private def getCurrentInput: UTF8String = { + UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) + } + // This parser first picks some tokens from the input tokens, according to the required schema, // then parse these tokens and put the values in a row, with the order specified by the required // schema. @@ -68,22 +84,6 @@ class UnivocityParser( requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray } - private val tokenizer = { - val parserSetting = options.asParserSettings - if (requiredSchema.length < schema.length) { - val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(schema.indexOf(f))) - parserSetting.selectIndexes(tokenIndexArr: _*) - } - new CsvParser(parserSetting) - } - - private val row = new GenericInternalRow(requiredSchema.length) - - // Retrieve the raw record string. - private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) - } - /** * Create a converter which converts the string value to a value according to a desired type. * Currently, we do not support complex types (`ArrayType`, `MapType`, `StructType`). From f90daa7ea33d119be978c27de10978c2d6281e25 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 13 May 2018 20:58:20 +0200 Subject: [PATCH 17/20] Renaming the test title --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index e6569d66fdebf..4d1f52e44536a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1323,7 +1323,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(sampled.count() == ds.count()) } - test("SPARK-24244: Select a little of many columns") { + test("SPARK-24244: Select a subset of all columns") { withTempPath { path => import collection.JavaConverters._ val schema = new StructType() From 4d9873d39277b9cbaee892957c06bfc2cb9a52f1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 17 May 2018 22:02:47 +0200 Subject: [PATCH 18/20] Improving of the migration guide --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index b5b72a8bcb492..968a72b720132 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1814,7 +1814,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround. - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - - Since Spark 2.4, handling of malformed rows in CSV files was changed. Previously, all column values of every row are parsed independently of its future usage. A row was considered as malformed if the CSV parser wasn't able to handle any column value in the row even if the value wasn't requested. Starting from version 2.4, only requested column values are parsed, and other values can be ignored. In such way, correct column values that were considered as malformed in previous Spark version only because of other malformed values become correct in Spark version 2.4. + - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234,". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, all column values must be requested. This example demonstrates how to achieve that with filter in Scala `spark.read.option("header", true).option("mode", "dropmalformed").csv("a.csv").filter(_ => true).select("id")`. ## Upgrading From Spark SQL 2.2 to 2.3 From f89eeb7f7ba86888ad3f7994577a4d4ebbf09197 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 17 May 2018 22:39:10 +0200 Subject: [PATCH 19/20] Fix example --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 968a72b720132..887d2dc2c9da7 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1814,7 +1814,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround. - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234,". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, all column values must be requested. This example demonstrates how to achieve that with filter in Scala `spark.read.option("header", true).option("mode", "dropmalformed").csv("a.csv").filter(_ => true).select("id")`. + - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, all column values must be requested. This example demonstrates how to achieve that with filter in Scala `spark.read.option("header", true).option("mode", "dropmalformed").csv("a.csv").filter(_ => true).select("id")`. ## Upgrading From Spark SQL 2.2 to 2.3 From 6ff6d4fda9f7e8ee43d7aa04818204de4c49440b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 18 May 2018 17:25:50 +0200 Subject: [PATCH 20/20] Adding spark.sql.csv.parser.columnPruning.enabled --- docs/sql-programming-guide.md | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 7 ++++++ .../datasources/csv/CSVOptions.scala | 3 +++ .../datasources/csv/UnivocityParser.scala | 23 ++++++++++--------- .../execution/datasources/csv/CSVSuite.scala | 16 +++++++------ 5 files changed, 32 insertions(+), 19 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 887d2dc2c9da7..937eb5b6095c7 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1814,7 +1814,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see - In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround. - In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files. - Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior. - - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, all column values must be requested. This example demonstrates how to achieve that with filter in Scala `spark.read.option("header", true).option("mode", "dropmalformed").csv("a.csv").filter(_ => true).select("id")`. + - In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. ## Upgrading From Spark SQL 2.2 to 2.3 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0b1965c438e27..068e8ae8a73f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1266,6 +1266,13 @@ object SQLConf { object Replaced { val MAPREDUCE_JOB_REDUCES = "mapreduce.job.reduces" } + + val CSV_PARSER_COLUMN_PRUNING = buildConf("spark.sql.csv.parser.columnPruning.enabled") + .internal() + .doc("If it is set to true, column names of the requested schema are passed to CSV parser. " + + "Other column values can be ignored during parsing even if they are malformed.") + .booleanConf + .createWithDefault(true) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 1066d156acd74..dd41aee0f2ebc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -25,6 +25,7 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.internal.SQLConf class CSVOptions( @transient val parameters: CaseInsensitiveMap[String], @@ -80,6 +81,8 @@ class CSVOptions( } } + private[csv] val columnPruning = SQLConf.get.getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING) + val delimiter = CSVUtils.toChar( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) val parseMode: ParseMode = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 6034d431c0c4e..4f00cc5eb3f39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -34,10 +34,10 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String class UnivocityParser( - schema: StructType, + dataSchema: StructType, requiredSchema: StructType, val options: CSVOptions) extends Logging { - require(requiredSchema.toSet.subsetOf(schema.toSet), + require(requiredSchema.toSet.subsetOf(dataSchema.toSet), "requiredSchema should be the subset of schema.") def this(schema: StructType, options: CSVOptions) = this(schema, schema, options) @@ -47,14 +47,15 @@ class UnivocityParser( private val tokenizer = { val parserSetting = options.asParserSettings - if (requiredSchema.length < schema.length) { - val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(schema.indexOf(f))) + if (options.columnPruning && requiredSchema.length < dataSchema.length) { + val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f))) parserSetting.selectIndexes(tokenIndexArr: _*) } new CsvParser(parserSetting) } + private val schema = if (options.columnPruning) requiredSchema else dataSchema - private val row = new GenericInternalRow(requiredSchema.length) + private val row = new GenericInternalRow(schema.length) // Retrieve the raw record string. private def getCurrentInput: UTF8String = { @@ -81,7 +82,7 @@ class UnivocityParser( // // output row - ["A", 2] private val valueConverters: Array[ValueConverter] = { - requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray + schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray } /** @@ -189,14 +190,14 @@ class UnivocityParser( def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) private def convert(tokens: Array[String]): InternalRow = { - if (tokens.length != requiredSchema.length) { + if (tokens.length != schema.length) { // If the number of tokens doesn't match the schema, we should treat it as a malformed record. // However, we still have chance to parse some of the tokens, by adding extra null tokens in // the tail if the number is smaller, or by dropping extra tokens if the number is larger. - val checkedTokens = if (requiredSchema.length > tokens.length) { - tokens ++ new Array[String](requiredSchema.length - tokens.length) + val checkedTokens = if (schema.length > tokens.length) { + tokens ++ new Array[String](schema.length - tokens.length) } else { - tokens.take(requiredSchema.length) + tokens.take(schema.length) } def getPartialResult(): Option[InternalRow] = { try { @@ -214,7 +215,7 @@ class UnivocityParser( } else { try { var i = 0 - while (i < requiredSchema.length) { + while (i < schema.length) { row(i) = valueConverters(i).apply(tokens(i)) i += 1 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index ce8ff6b0fa78e..5f9f799a6c466 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -260,14 +260,16 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } test("test for DROPMALFORMED parsing mode") { - Seq(false, true).foreach { multiLine => - val cars = spark.read - .format("csv") - .option("multiLine", multiLine) - .options(Map("header" -> "true", "mode" -> "dropmalformed")) - .load(testFile(carsFile)) + withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false") { + Seq(false, true).foreach { multiLine => + val cars = spark.read + .format("csv") + .option("multiLine", multiLine) + .options(Map("header" -> "true", "mode" -> "dropmalformed")) + .load(testFile(carsFile)) - assert(cars.collect().size === 2) + assert(cars.select("year").collect().size === 2) + } } }