Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29163][SQL] Simplify Hadoop Configuration access in DataSourcev2 #26005

Closed
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2;


import org.apache.hadoop.conf.Configuration;

import org.apache.spark.SparkContext;
import org.apache.spark.annotation.DeveloperApi;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.SerializableConfiguration;

import java.util.Map;

import scala.reflect.ClassTag;
/**
* A helper interface to serialize and broadcast the Hadoop configuration for readers.
*/
@DeveloperApi
public interface BroadcastedHadoopConf {
SparkSession sparkSession();
CaseInsensitiveStringMap options();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that the data source should be responsible for providing a SparkSession. Spark has a session and configures the data source.

I think this interface should allow Spark to pass a broadcasted Hadoop Configuration to the source instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand what you mean here, could you try and rephrase it? Sorry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: do you mean you want us to use reflection for something implementing this interface inside of the DSv2 code path? Or do you mean something else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data sources aren't passed a Spark session, so it doesn't make sense that this interface requires the source to provide one. If we think sources need access to a Spark session, then we should add an interface that sets the session directly. (I know the built-in sources have one, but that's because those are v1 sources that have been slightly modified to fit into v2.)

The benefit of having an interface like this is to avoid needing the Spark session. That's currently done by accessing the Spark session from the environment (e.g. SparkSession.builder.getOrCreate()) and we want to avoid making sources do that.

That's why I think the way this works should be to set a broadcasted configuration on a Table implementation that extends this interface using a method like setBroadcastedConf.

I'm on ASF slack if you'd like to talk about this more directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so we did a quick chat. It’s possible that this and one other small use case are the only reasons the sources currently have a Spark Session inside of them and the design goal
of DSv2 was to get rid of that. So I’ll see if this can be avoided, but if it’s more than broadcasting the Hadoop conf and case sensitive options then I’ll switch the API to just pass in a Spark session and we can stick with this API. Does that sound like a reasonable summary @rdblue?


/**
* Override this if you need to rewrite your Hadoop configuration differently
*/
default Configuration withHadoopConfRewrite(Configuration hadoopConf) {
return hadoopConf;
}

/**
* Override this if you need to generate your Hadoop configuration differently
*/
default Configuration hadoopConf() {
// Hadoop Configurations are case sensitive.
Map<String, String> caseSensitiveMap = options().asCaseSensitiveMap();
return withHadoopConfRewrite(
sparkSession().sessionState().newHadoopConfWithOptions(caseSensitiveMap));
}

default Broadcast<SerializableConfiguration> broadcastedConf() {
SparkContext sc = sparkSession().sparkContext();
SerializableConfiguration config = new SerializableConfiguration(hadoopConf());
ClassTag<SerializableConfiguration> tag = ClassTag.apply(SerializableConfiguration.class);
return sc.broadcast(config, tag);
}
}
Expand Up @@ -26,10 +26,9 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan
import org.apache.spark.sql.execution.datasources.v2.{BroadcastedHadoopConf, TextBasedFileScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

case class CSVScan(
sparkSession: SparkSession,
Expand All @@ -38,7 +37,8 @@ case class CSVScan(
readDataSchema: StructType,
readPartitionSchema: StructType,
options: CaseInsensitiveStringMap)
extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) {
extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options)
with BroadcastedHadoopConf {

private lazy val parsedOptions: CSVOptions = new CSVOptions(
options.asScala.toMap,
Expand Down Expand Up @@ -77,14 +77,9 @@ case class CSVScan(
)
}

val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
// The partition values are already truncated in `FileScan.partitions`.
// We should use `readPartitionSchema` as the partition schema here.
CSVPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
CSVPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf(),
dataSchema, readDataSchema, readPartitionSchema, parsedOptions)
}
}
Expand Up @@ -27,10 +27,9 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.json.JsonDataSource
import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan
import org.apache.spark.sql.execution.datasources.v2.{BroadcastedHadoopConf, TextBasedFileScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

case class JsonScan(
sparkSession: SparkSession,
Expand All @@ -39,7 +38,8 @@ case class JsonScan(
readDataSchema: StructType,
readPartitionSchema: StructType,
options: CaseInsensitiveStringMap)
extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) {
extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options)
with BroadcastedHadoopConf {

private val parsedOptions = new JSONOptionsInRead(
CaseInsensitiveMap(options.asScala.toMap),
Expand Down Expand Up @@ -76,14 +76,9 @@ case class JsonScan(
"df.filter($\"_corrupt_record\".isNotNull).count()."
)
}
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
// The partition values are already truncated in `FileScan.partitions`.
// We should use `readPartitionSchema` as the partition schema here.
JsonPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
JsonPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf(),
dataSchema, readDataSchema, readPartitionSchema, parsedOptions)
}
}
Expand Up @@ -16,33 +16,43 @@
*/
package org.apache.spark.sql.execution.datasources.v2.orc

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.orc.mapreduce.OrcInputFormat

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.execution.datasources.orc.OrcFilters
import org.apache.spark.sql.execution.datasources.v2.{BroadcastedHadoopConf, FileScan}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

case class OrcScan(
sparkSession: SparkSession,
hadoopConf: Configuration,
fileIndex: PartitioningAwareFileIndex,
schema: StructType,
dataSchema: StructType,
readDataSchema: StructType,
readPartitionSchema: StructType,
options: CaseInsensitiveStringMap,
pushedFilters: Array[Filter])
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) {
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema)
with BroadcastedHadoopConf {

override def isSplitable(path: Path): Boolean = true

override def withHadoopConfRewrite(hadoopConf: Configuration): Configuration = {
OrcFilters.createFilter(schema, pushedFilters).foreach { f =>
OrcInputFormat.setSearchArgument(hadoopConf, f, dataSchema.fieldNames)
}
hadoopConf
}

override def createReaderFactory(): PartitionReaderFactory = {
val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
// The partition values are already truncated in `FileScan.partitions`.
// We should use `readPartitionSchema` as the partition schema here.
OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
Expand Down
Expand Up @@ -37,26 +37,16 @@ case class OrcScanBuilder(
dataSchema: StructType,
options: CaseInsensitiveStringMap)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) with SupportsPushDownFilters {
lazy val hadoopConf = {
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
}

override def build(): Scan = {
OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema,
OrcScan(sparkSession, fileIndex, schema, dataSchema,
readDataSchema(), readPartitionSchema(), options, pushedFilters())
}

private var _pushedFilters: Array[Filter] = Array.empty

override def pushFilters(filters: Array[Filter]): Array[Filter] = {
if (sparkSession.sessionState.conf.orcFilterPushDown) {
OrcFilters.createFilter(schema, filters).foreach { f =>
// The pushed filters will be set in `hadoopConf`. After that, we can simply use the
// changed `hadoopConf` in executors.
OrcInputFormat.setSearchArgument(hadoopConf, f, schema.fieldNames)
}
val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
_pushedFilters = OrcFilters.convertibleFilters(schema, dataTypeMap, filters).toArray
}
Expand Down
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources.v2.parquet

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetInputFormat
Expand All @@ -24,28 +26,30 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.parquet.{ParquetReadSupport, ParquetWriteSupport}
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.execution.datasources.v2.{BroadcastedHadoopConf, FileScan}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

case class ParquetScan(
sparkSession: SparkSession,
hadoopConf: Configuration,
fileIndex: PartitioningAwareFileIndex,
dataSchema: StructType,
readDataSchema: StructType,
readPartitionSchema: StructType,
pushedFilters: Array[Filter],
options: CaseInsensitiveStringMap)
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) {
extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema)
with BroadcastedHadoopConf {

override def isSplitable(path: Path): Boolean = true

override def createReaderFactory(): PartitionReaderFactory = {
override def withHadoopConfRewrite(hadoopConf: Configuration): Configuration = {
val readDataSchemaAsJson = readDataSchema.json
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
hadoopConf.set(
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
readDataSchemaAsJson)
Expand All @@ -71,9 +75,10 @@ case class ParquetScan(
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
hadoopConf
}

val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
override def createReaderFactory(): PartitionReaderFactory = {
ParquetPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
dataSchema, readDataSchema, readPartitionSchema, pushedFilters)
}
Expand Down
Expand Up @@ -35,11 +35,6 @@ case class ParquetScanBuilder(
dataSchema: StructType,
options: CaseInsensitiveStringMap)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) with SupportsPushDownFilters {
lazy val hadoopConf = {
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
}

lazy val pushedParquetFilters = {
val sqlConf = sparkSession.sessionState.conf
Expand Down Expand Up @@ -69,7 +64,7 @@ case class ParquetScanBuilder(
override def pushedFilters(): Array[Filter] = pushedParquetFilters

override def build(): Scan = {
ParquetScan(sparkSession, hadoopConf, fileIndex, dataSchema, readDataSchema(),
ParquetScan(sparkSession, fileIndex, dataSchema, readDataSchema(),
readPartitionSchema(), pushedParquetFilters, options)
}
}
Expand Up @@ -24,18 +24,18 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.text.TextOptions
import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan
import org.apache.spark.sql.execution.datasources.v2.{BroadcastedHadoopConf, TextBasedFileScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

case class TextScan(
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
readDataSchema: StructType,
readPartitionSchema: StructType,
options: CaseInsensitiveStringMap)
extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) {
extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options)
with BroadcastedHadoopConf {

private val optionsAsScala = options.asScala.toMap
private lazy val textOptions: TextOptions = new TextOptions(optionsAsScala)
Expand All @@ -57,14 +57,7 @@ case class TextScan(
assert(
readDataSchema.length <= 1,
"Text data source only produces a single data column named \"value\".")
val hadoopConf = {
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
}
val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
TextPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, readDataSchema,
TextPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf(), readDataSchema,
readPartitionSchema, textOptions)
}
}
Expand Up @@ -18,6 +18,9 @@
package org.apache.spark.sql.internal

import java.io.File
import java.util.{Map => JMap}

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -94,6 +97,10 @@ private[sql] class SessionState(
sharedState.sparkContext.hadoopConfiguration,
conf)

def newHadoopConfWithOptions(options: JMap[String, String]): Configuration = {
newHadoopConfWithOptions(options.asScala.toMap)
}

def newHadoopConfWithOptions(options: Map[String, String]): Configuration = {
val hadoopConf = newHadoopConf()
options.foreach { case (k, v) =>
Expand Down