Skip to content

Commit

Permalink
[SPARK-22387][SQL] Propagate session configs to data source read/writ…
Browse files Browse the repository at this point in the history
…e options

## What changes were proposed in this pull request?

Introduce a new interface `SessionConfigSupport` for `DataSourceV2`, it can help to propagate session configs with the specified key-prefix to all data source operations in this session.

## How was this patch tested?

Add new test suite `DataSourceV2UtilsSuite`.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #19861 from jiangxb1987/datasource-configs.
  • Loading branch information
jiangxb1987 authored and cloud-fan committed Dec 21, 2017
1 parent fb0562f commit 9c289a5
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 8 deletions.
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.InterfaceStability;

import java.util.List;
import java.util.Map;

/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* propagate session configs with the specified key-prefix to all data source operations in this
* session.
*/
@InterfaceStability.Evolving
public interface SessionConfigSupport {

/**
* Key prefix of the session configs to propagate. Spark will extract all session configs that
* starts with `spark.datasource.$keyPrefix`, turn `spark.datasource.$keyPrefix.xxx -&gt; yyy`
* into `xxx -&gt; yyy`, and propagate them to all data source operations in this session.
*/
String keyPrefix();
}
Expand Up @@ -33,7 +33,8 @@ import org.apache.spark.sql.execution.datasources.csv._
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -184,9 +185,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val options = new DataSourceV2Options(extraOptions.asJava)
val ds = cls.newInstance()
val options = new DataSourceV2Options((extraOptions ++
DataSourceV2Utils.extractSessionConfigs(
ds = ds.asInstanceOf[DataSourceV2],
conf = sparkSession.sessionState.conf)).asJava)

val reader = (cls.newInstance(), userSpecifiedSchema) match {
val reader = (ds, userSpecifiedSchema) match {
case (ds: ReadSupportWithSchema, Some(schema)) =>
ds.createReader(schema, options)

Expand Down
15 changes: 10 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Expand Up @@ -30,9 +30,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, WriteSupport}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -236,14 +237,18 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
cls.newInstance() match {
case ds: WriteSupport =>
val options = new DataSourceV2Options(extraOptions.asJava)
val ds = cls.newInstance()
ds match {
case ws: WriteSupport =>
val options = new DataSourceV2Options((extraOptions ++
DataSourceV2Utils.extractSessionConfigs(
ds = ds.asInstanceOf[DataSourceV2],
conf = df.sparkSession.sessionState.conf)).asJava)
// Using a timestamp and a random UUID to distinguish different writing jobs. This is good
// enough as there won't be tons of writing jobs created at the same second.
val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
.format(new Date()) + "-" + UUID.randomUUID()
val writer = ds.createWriter(jobId, df.logicalPlan.schema, mode, options)
val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options)
if (writer.isPresent) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get(), df.logicalPlan)
Expand Down
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import java.util.regex.Pattern

import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.{DataSourceV2, SessionConfigSupport}

private[sql] object DataSourceV2Utils extends Logging {

/**
* Helper method that extracts and transforms session configs into k/v pairs, the k/v pairs will
* be used to create data source options.
* Only extract when `ds` implements [[SessionConfigSupport]], in this case we may fetch the
* specified key-prefix from `ds`, and extract session configs with config keys that start with
* `spark.datasource.$keyPrefix`. A session config `spark.datasource.$keyPrefix.xxx -> yyy` will
* be transformed into `xxx -> yyy`.
*
* @param ds a [[DataSourceV2]] object
* @param conf the session conf
* @return an immutable map that contains all the extracted and transformed k/v pairs.
*/
def extractSessionConfigs(ds: DataSourceV2, conf: SQLConf): Map[String, String] = ds match {
case cs: SessionConfigSupport =>
val keyPrefix = cs.keyPrefix()
require(keyPrefix != null, "The data source config key prefix can't be null.")

val pattern = Pattern.compile(s"^spark\\.datasource\\.$keyPrefix\\.(.+)")

conf.getAllConfs.flatMap { case (key, value) =>
val m = pattern.matcher(key)
if (m.matches() && m.groupCount() > 0) {
Seq((m.group(1), value))
} else {
Seq.empty
}
}

case _ => Map.empty
}
}
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.internal.SQLConf

class DataSourceV2UtilsSuite extends SparkFunSuite {

private val keyPrefix = new DataSourceV2WithSessionConfig().keyPrefix

test("method withSessionConfig() should propagate session configs correctly") {
// Only match configs with keys start with "spark.datasource.${keyPrefix}".
val conf = new SQLConf
conf.setConfString(s"spark.datasource.$keyPrefix.foo.bar", "false")
conf.setConfString(s"spark.datasource.$keyPrefix.whateverConfigName", "123")
conf.setConfString(s"spark.sql.$keyPrefix.config.name", "false")
conf.setConfString("spark.datasource.another.config.name", "123")
conf.setConfString(s"spark.datasource.$keyPrefix.", "123")
val cs = classOf[DataSourceV2WithSessionConfig].newInstance()
val confs = DataSourceV2Utils.extractSessionConfigs(cs.asInstanceOf[DataSourceV2], conf)
assert(confs.size == 2)
assert(confs.keySet.filter(_.startsWith("spark.datasource")).size == 0)
assert(confs.keySet.filter(_.startsWith("not.exist.prefix")).size == 0)
assert(confs.keySet.contains("foo.bar"))
assert(confs.keySet.contains("whateverConfigName"))
}
}

class DataSourceV2WithSessionConfig extends SimpleDataSourceV2 with SessionConfigSupport {

override def keyPrefix: String = "userDefinedDataSource"
}

0 comments on commit 9c289a5

Please sign in to comment.