Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23341][SQL] define some standard options for data source v2 #20535

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,61 @@

package org.apache.spark.sql.sources.v2;

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.spark.annotation.InterfaceStability;

/**
* An immutable string-to-string map in which keys are case-insensitive. This is used to represent
* data source options.
*
* Each data source implementation can define its own options and teach its users how to set them.
* Spark doesn't have any restrictions about what options a data source should or should not have.
* Instead Spark defines some standard options that data sources can optionally adopt. It's possible
* that some options are very common and many data sources use them. However different data
* sources may define the common options(key and meaning) differently, which is quite confusing to
* end users.
*
* The standard options defined by Spark:
* <table summary="standard data source options">
* <tr>
* <th><b>Option key</b></th>
* <th><b>Option value</b></th>
* </tr>
* <tr>
* <td>path</td>
* <td>A path string of the data files/directories, like
* <code>path1</code>, <code>/absolute/file2</code>, <code>path3/*</code>. The path can
* either be relative or absolute, points to either file or directory, and can contain
* wildcards. This option is commonly used by file-based data sources.</td>
* </tr>
* <tr>
* <td>paths</td>
* <td>A JSON array style paths string of the data files/directories, like
* <code>["path1", "/absolute/file2"]</code>. The format of each path is same as the
* <code>path</code> option, plus it should follow JSON string literal format, e.g. quotes
* should be escaped, <code>pa\"th</code> means pa"th.
* </td>
* </tr>
* <tr>
* <td>table</td>
* <td>A table name string representing the table name directly without any interpretation.
* For example, <code>db.tbl</code> means a table called db.tbl, not a table called tbl
* inside database db. <code>`t*b.l`</code> means a table called `t*b.l`, not t*b.l.</td>
* </tr>
* <tr>
* <td>database</td>
* <td>A database name string representing the database name directly without any
* interpretation, which is very similar to the table name option.</td>
* </tr>
* </table>
*/
@InterfaceStability.Evolving
public class DataSourceOptions {
Expand Down Expand Up @@ -97,4 +142,59 @@ public double getDouble(String key, double defaultValue) {
return keyLowerCasedMap.containsKey(lcaseKey) ?
Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
}

/**
* The option key for singular path.
*/
public static final String PATH_KEY = "path";

/**
* The option key for multiple paths.
*/
public static final String PATHS_KEY = "paths";

/**
* The option key for table name.
*/
public static final String TABLE_KEY = "table";

/**
* The option key for database name.
*/
public static final String DATABASE_KEY = "database";

/**
* Returns all the paths specified by both the singular path option and the multiple
* paths option.
*/
public String[] paths() {
String[] singularPath =
get(PATH_KEY).map(s -> new String[]{s}).orElseGet(() -> new String[0]);
Optional<String> pathsStr = get(PATHS_KEY);
if (pathsStr.isPresent()) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String[] paths = objectMapper.readValue(pathsStr.get(), String[].class);
return Stream.of(singularPath, paths).flatMap(Stream::of).toArray(String[]::new);
} catch (IOException e) {
return singularPath;
}
} else {
return singularPath;
}
}

/**
* Returns the value of the table name option.
*/
public Optional<String> tableName() {
return get(TABLE_KEY);
}

/**
* Returns the value of the database name option.
*/
public Optional<String> databaseName() {
return get(DATABASE_KEY);
}
}
14 changes: 10 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util.{Locale, Properties}

import scala.collection.JavaConverters._

import com.fasterxml.jackson.databind.ObjectMapper

import org.apache.spark.Partition
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.api.java.JavaRDD
Expand All @@ -34,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.sources.v2.{DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -171,7 +173,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0
*/
def load(path: String): DataFrame = {
option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)`
// force invocation of `load(...varargs...)`
option(DataSourceOptions.PATH_KEY, path).load(Seq.empty: _*)
}

/**
Expand All @@ -193,10 +196,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) {
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
ds = ds, conf = sparkSession.sessionState.conf)
val pathsOption = {
val objectMapper = new ObjectMapper()
DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
}
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
ds, extraOptions.toMap ++ sessionOptions,
ds, extraOptions.toMap ++ sessionOptions + pathsOption,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue an exception when extraOptions("path") is not empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we may have duplicated entries in session configs and DataFrameReader/Writer options, not only path. The rule is, DataFrameReader/Writer options should overwrite session configs.

cc @jiangxb1987 can you submit a PR to explicitly document it in SessionConfigSupport?

userSpecifiedSchema = userSpecifiedSchema))

} else {
loadV1Source(paths: _*)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,29 @@ class DataSourceOptionsSuite extends SparkFunSuite {
options.getDouble("foo", 0.1d)
}
}

test("standard options") {
val options = new DataSourceOptions(Map(
DataSourceOptions.PATH_KEY -> "abc",
DataSourceOptions.TABLE_KEY -> "tbl").asJava)

assert(options.paths().toSeq == Seq("abc"))
assert(options.tableName().get() == "tbl")
assert(!options.databaseName().isPresent)
}

test("standard options with both singular path and multi-paths") {
val options = new DataSourceOptions(Map(
DataSourceOptions.PATH_KEY -> "abc",
DataSourceOptions.PATHS_KEY -> """["c", "d"]""").asJava)

assert(options.paths().toSeq == Seq("abc", "c", "d"))
}

test("standard options with only multi-paths") {
val options = new DataSourceOptions(Map(
DataSourceOptions.PATHS_KEY -> """["c", "d\"e"]""").asJava)

assert(options.paths().toSeq == Seq("c", "d\"e"))
}
}