Skip to content

Commit

Permalink
Removing columnPruning from CSVOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxGekk committed May 23, 2018
1 parent a40ffc6 commit 6fdd435
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 13 deletions.
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -500,9 +501,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, parsedOptions))
}.getOrElse(filteredLines.rdd)

val columnPruning = sparkSession.sessionState.conf.getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)
val parsed = linesWithoutHeader.mapPartitions { iter =>
val rawParser = new UnivocityParser(actualSchema, parsedOptions)
val rawParser = new UnivocityParser(actualSchema, parsedOptions, columnPruning)
val parser = new FailureSafeParser[String](
input => Seq(rawParser.parse(input)),
parsedOptions.parseMode,
Expand Down
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -122,13 +123,15 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
"df.filter($\"_corrupt_record\".isNotNull).count()."
)
}
val columnPruning = sparkSession.sessionState.conf.getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)

(file: PartitionedFile) => {
val conf = broadcastedHadoopConf.value.value
val parser = new UnivocityParser(
StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
parsedOptions)
parsedOptions,
columnPruning)
CSVDataSource(parsedOptions).readFile(conf, file, parser, requiredSchema)
}
}
Expand Down
Expand Up @@ -25,7 +25,6 @@ import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf

class CSVOptions(
@transient val parameters: CaseInsensitiveMap[String],
Expand Down Expand Up @@ -81,8 +80,6 @@ class CSVOptions(
}
}

private[csv] val columnPruning = SQLConf.get.getConf(SQLConf.CSV_PARSER_COLUMN_PRUNING)

val delimiter = CSVUtils.toChar(
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
val parseMode: ParseMode =
Expand Down
Expand Up @@ -36,24 +36,27 @@ import org.apache.spark.unsafe.types.UTF8String
class UnivocityParser(
dataSchema: StructType,
requiredSchema: StructType,
val options: CSVOptions) extends Logging {
val options: CSVOptions,
columnPruning: Boolean) extends Logging {
require(requiredSchema.toSet.subsetOf(dataSchema.toSet),
"requiredSchema should be the subset of schema.")

def this(schema: StructType, options: CSVOptions) = this(schema, schema, options)
def this(schema: StructType, options: CSVOptions, columnPruning: Boolean) = {
this(schema, schema, options, columnPruning)
}

// A `ValueConverter` is responsible for converting the given value to a desired type.
private type ValueConverter = String => Any

private val tokenizer = {
val parserSetting = options.asParserSettings
if (options.columnPruning && requiredSchema.length < dataSchema.length) {
if (columnPruning && requiredSchema.length < dataSchema.length) {
val tokenIndexArr = requiredSchema.map(f => java.lang.Integer.valueOf(dataSchema.indexOf(f)))
parserSetting.selectIndexes(tokenIndexArr: _*)
}
new CsvParser(parserSetting)
}
private val schema = if (options.columnPruning) requiredSchema else dataSchema
private val schema = if (columnPruning) requiredSchema else dataSchema

private val row = new GenericInternalRow(schema.length)

Expand Down
Expand Up @@ -18,16 +18,17 @@
package org.apache.spark.sql.execution.datasources.csv

import java.math.BigDecimal
import java.util.Locale

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

class UnivocityParserSuite extends SparkFunSuite {
private val parser =
new UnivocityParser(StructType(Seq.empty), new CSVOptions(Map.empty[String, String], "GMT"))
private val parser = new UnivocityParser(
StructType(Seq.empty),
new CSVOptions(Map.empty[String, String], "GMT"),
true)

private def assertNull(v: Any) = assert(v == null)

Expand Down

0 comments on commit 6fdd435

Please sign in to comment.