Skip to content

Commit

Permalink
[SPARK-23341][SQL] define some standard options for data source v2
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Each data source implementation can define its own options and teach its users how to set them. Spark doesn't have any restrictions about what options a data source should or should not have. It's possible that some options are very common and many data sources use them. However different data sources may define the common options(key and meaning) differently, which is quite confusing to end users.

This PR defines some standard options that data sources can optionally adopt: path, table and database.

## How was this patch tested?

a new test case.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20535 from cloud-fan/options.
  • Loading branch information
cloud-fan committed Apr 18, 2018
1 parent 1e3b876 commit 310a8cd
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 4 deletions.
Expand Up @@ -17,16 +17,61 @@

package org.apache.spark.sql.sources.v2;

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.spark.annotation.InterfaceStability;

/**
* An immutable string-to-string map in which keys are case-insensitive. This is used to represent
* data source options.
*
* Each data source implementation can define its own options and teach its users how to set them.
* Spark doesn't have any restrictions about what options a data source should or should not have.
* Instead Spark defines some standard options that data sources can optionally adopt. It's possible
* that some options are very common and many data sources use them. However different data
* sources may define the common options(key and meaning) differently, which is quite confusing to
* end users.
*
* The standard options defined by Spark:
* <table summary="standard data source options">
* <tr>
* <th><b>Option key</b></th>
* <th><b>Option value</b></th>
* </tr>
* <tr>
* <td>path</td>
* <td>A path string of the data files/directories, like
* <code>path1</code>, <code>/absolute/file2</code>, <code>path3/*</code>. The path can
* either be relative or absolute, points to either file or directory, and can contain
* wildcards. This option is commonly used by file-based data sources.</td>
* </tr>
* <tr>
* <td>paths</td>
* <td>A JSON array style paths string of the data files/directories, like
* <code>["path1", "/absolute/file2"]</code>. The format of each path is same as the
* <code>path</code> option, plus it should follow JSON string literal format, e.g. quotes
* should be escaped, <code>pa\"th</code> means pa"th.
* </td>
* </tr>
* <tr>
* <td>table</td>
* <td>A table name string representing the table name directly without any interpretation.
* For example, <code>db.tbl</code> means a table called db.tbl, not a table called tbl
* inside database db. <code>`t*b.l`</code> means a table called `t*b.l`, not t*b.l.</td>
* </tr>
* <tr>
* <td>database</td>
* <td>A database name string representing the database name directly without any
* interpretation, which is very similar to the table name option.</td>
* </tr>
* </table>
*/
@InterfaceStability.Evolving
public class DataSourceOptions {
Expand Down Expand Up @@ -97,4 +142,59 @@ public double getDouble(String key, double defaultValue) {
return keyLowerCasedMap.containsKey(lcaseKey) ?
Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
}

/**
* The option key for singular path.
*/
public static final String PATH_KEY = "path";

/**
* The option key for multiple paths.
*/
public static final String PATHS_KEY = "paths";

/**
* The option key for table name.
*/
public static final String TABLE_KEY = "table";

/**
* The option key for database name.
*/
public static final String DATABASE_KEY = "database";

/**
* Returns all the paths specified by both the singular path option and the multiple
* paths option.
*/
public String[] paths() {
String[] singularPath =
get(PATH_KEY).map(s -> new String[]{s}).orElseGet(() -> new String[0]);
Optional<String> pathsStr = get(PATHS_KEY);
if (pathsStr.isPresent()) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String[] paths = objectMapper.readValue(pathsStr.get(), String[].class);
return Stream.of(singularPath, paths).flatMap(Stream::of).toArray(String[]::new);
} catch (IOException e) {
return singularPath;
}
} else {
return singularPath;
}
}

/**
* Returns the value of the table name option.
*/
public Optional<String> tableName() {
return get(TABLE_KEY);
}

/**
* Returns the value of the database name option.
*/
public Optional<String> databaseName() {
return get(DATABASE_KEY);
}
}
14 changes: 10 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Expand Up @@ -21,6 +21,8 @@ import java.util.{Locale, Properties}

import scala.collection.JavaConverters._

import com.fasterxml.jackson.databind.ObjectMapper

import org.apache.spark.Partition
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.api.java.JavaRDD
Expand All @@ -34,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.sources.v2.{DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -171,7 +173,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0
*/
def load(path: String): DataFrame = {
option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)`
// force invocation of `load(...varargs...)`
option(DataSourceOptions.PATH_KEY, path).load(Seq.empty: _*)
}

/**
Expand All @@ -193,10 +196,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) {
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
ds = ds, conf = sparkSession.sessionState.conf)
val pathsOption = {
val objectMapper = new ObjectMapper()
DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
}
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
ds, extraOptions.toMap ++ sessionOptions,
ds, extraOptions.toMap ++ sessionOptions + pathsOption,
userSpecifiedSchema = userSpecifiedSchema))

} else {
loadV1Source(paths: _*)
}
Expand Down
Expand Up @@ -79,4 +79,29 @@ class DataSourceOptionsSuite extends SparkFunSuite {
options.getDouble("foo", 0.1d)
}
}

test("standard options") {
val options = new DataSourceOptions(Map(
DataSourceOptions.PATH_KEY -> "abc",
DataSourceOptions.TABLE_KEY -> "tbl").asJava)

assert(options.paths().toSeq == Seq("abc"))
assert(options.tableName().get() == "tbl")
assert(!options.databaseName().isPresent)
}

test("standard options with both singular path and multi-paths") {
val options = new DataSourceOptions(Map(
DataSourceOptions.PATH_KEY -> "abc",
DataSourceOptions.PATHS_KEY -> """["c", "d"]""").asJava)

assert(options.paths().toSeq == Seq("abc", "c", "d"))
}

test("standard options with only multi-paths") {
val options = new DataSourceOptions(Map(
DataSourceOptions.PATHS_KEY -> """["c", "d\"e"]""").asJava)

assert(options.paths().toSeq == Seq("c", "d\"e"))
}
}

0 comments on commit 310a8cd

Please sign in to comment.