Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20590][SQL] Use Spark internal datasource if multiples are found for the same shorten name #17916

Closed
wants to merge 7 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented May 9, 2017

What changes were proposed in this pull request?

One of the common usability problems around reading data in spark (particularly CSV) is that there can often be a conflict between different readers in the classpath.

As an example, if someone launches a 2.x spark shell with the spark-csv package in the classpath, Spark currently fails in an extremely unfriendly way (see databricks/spark-csv#367):

./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
scala> val df = spark.read.csv("/foo/bar.csv")
java.lang.RuntimeException: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), please specify the fully qualified class name.
  at scala.sys.package$.error(package.scala:27)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:574)
  at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:85)
  at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:85)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:295)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412)
  ... 48 elided

This PR proposes a simple way of fixing this error by picking up the internal datasource if there is single (the datasource that has "org.apache.spark" prefix).

scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).

How was this patch tested?

Manually tested as below:

./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
spark.sparkContext.setLogLevel("WARN")

positive cases:

scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).

(newlines were inserted for readability).

scala> spark.range(1).write.format("com.databricks.spark.csv").mode("overwrite").save("/tmp/abc")
scala> spark.range(1).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").mode("overwrite").save("/tmp/abc")

negative cases:

scala> spark.range(1).write.format("com.databricks.spark.csv.CsvRelation").save("/tmp/abc")
java.lang.InstantiationException: com.databricks.spark.csv.CsvRelation
...
scala> spark.range(1).write.format("com.databricks.spark.csv.CsvRelatio").save("/tmp/abc")
java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.csv.CsvRelatio. Please find packages at http://spark.apache.org/third-party-projects.html
...

@HyukjinKwon
Copy link
Member Author

cc @sameeragarwal and @cloud-fan, I just came up with another way and I opened this to show my idea. What do you think about this?

@SparkQA
Copy link

SparkQA commented May 9, 2017

Test build #76658 has finished for PR 17916 at commit 03dc0f6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.



// please note that the META-INF/services had to be modified for the test directory for this to work
class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {

test("data sources with the same name") {
intercept[RuntimeException] {
spark.read.format("Fluet da Bomb").load()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still need a test case to cover the conflicting data source case.

@SparkQA
Copy link

SparkQA commented May 9, 2017

Test build #76669 has finished for PR 17916 at commit 2dce84c.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FakeSourceTwo extends RelationProvider with DataSourceRegister

@SparkQA
Copy link

SparkQA commented May 9, 2017

Test build #76681 has finished for PR 17916 at commit 741c913.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FakeSourceFour extends RelationProvider with DataSourceRegister
  • class FakeExternalSourceOne extends RelationProvider with DataSourceRegister
  • class FakeExternalSourceTwo extends RelationProvider with DataSourceRegister
  • class FakeExternalSourceThree extends RelationProvider with DataSourceRegister

@SparkQA
Copy link

SparkQA commented May 9, 2017

Test build #76682 has finished for PR 17916 at commit 8c40eab.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FakeExternalSourceOne extends RelationProvider with DataSourceRegister
  • class FakeExternalSourceTwo extends RelationProvider with DataSourceRegister
  • class FakeExternalSourceThree extends RelationProvider with DataSourceRegister

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented May 9, 2017

Test build #76696 has finished for PR 17916 at commit 8c40eab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class FakeExternalSourceOne extends RelationProvider with DataSourceRegister
  • class FakeExternalSourceTwo extends RelationProvider with DataSourceRegister
  • class FakeExternalSourceThree extends RelationProvider with DataSourceRegister

@sameeragarwal
Copy link
Member

Thanks @HyukjinKwon, I like this approach better!

One limitation of this patch however is that if there are ever two internal datasources in Spark with the same shortName, we might've introduced some inadvertent randomness here (by picking the first datasource from the sequence). Thoughts?

@HyukjinKwon
Copy link
Member Author

Yea. Probably, I think it should check if the length is single with another test as well and checking this would not harm.

assert(e.getMessage.contains("Multiple sources found for Fluet da Bomb"))
}

test("data sources with the same name - internal data source/external data source") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we will only allow this case.

val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
if (internalSources.size == 1) {
logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
"please specify the fully qualified class name. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this isn't really actionable so we can consider deleting it from here and say something like "defaulting to the internal ..."

@sameeragarwal
Copy link
Member

LGTM

@HyukjinKwon
Copy link
Member Author

Thanks for approving this approach. I will handle the comment soon.

s"Using the internal datasource (${internalSources.head.getClass.getName}).")
internalSources.head.getClass
} else {
sys.error(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's throw analysis exception

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented May 10, 2017

Test build #76709 has finished for PR 17916 at commit 4450da7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM, pending jenkins

@viirya
Copy link
Member

viirya commented May 10, 2017

@HyukjinKwon Shall we also update the PR description?

@HyukjinKwon
Copy link
Member Author

Sure.

@viirya
Copy link
Member

viirya commented May 10, 2017

LGTM

@SparkQA
Copy link

SparkQA commented May 10, 2017

Test build #76714 has finished for PR 17916 at commit 7a464ad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 10, 2017

Test build #76715 has finished for PR 17916 at commit 96cf1a9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • s\"($

asfgit pushed a commit that referenced this pull request May 10, 2017
…nd for the same shorten name

## What changes were proposed in this pull request?

One of the common usability problems around reading data in spark (particularly CSV) is that there can often be a conflict between different readers in the classpath.

As an example, if someone launches a 2.x spark shell with the spark-csv package in the classpath, Spark currently fails in an extremely unfriendly way (see databricks/spark-csv#367):

```bash
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
scala> val df = spark.read.csv("/foo/bar.csv")
java.lang.RuntimeException: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), please specify the fully qualified class name.
  at scala.sys.package$.error(package.scala:27)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:574)
  at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:85)
  at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:85)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:295)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412)
  ... 48 elided
```

This PR proposes a simple way of fixing this error by picking up the internal datasource if there is single (the datasource that has "org.apache.spark" prefix).

```scala
scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

```scala
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

## How was this patch tested?

Manually tested as below:

```bash
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
```

```scala
spark.sparkContext.setLogLevel("WARN")
```

**positive cases**:

```scala
scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

```scala
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

(newlines were inserted for readability).

```scala
scala> spark.range(1).write.format("com.databricks.spark.csv").mode("overwrite").save("/tmp/abc")
```

```scala
scala> spark.range(1).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").mode("overwrite").save("/tmp/abc")
```

**negative cases**:

```scala
scala> spark.range(1).write.format("com.databricks.spark.csv.CsvRelation").save("/tmp/abc")
java.lang.InstantiationException: com.databricks.spark.csv.CsvRelation
...
```

```scala
scala> spark.range(1).write.format("com.databricks.spark.csv.CsvRelatio").save("/tmp/abc")
java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.csv.CsvRelatio. Please find packages at http://spark.apache.org/third-party-projects.html
...
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17916 from HyukjinKwon/datasource-detect.

(cherry picked from commit 3d2131a)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

thanks, merging to master/2.2!

@asfgit asfgit closed this in 3d2131a May 10, 2017
@HyukjinKwon
Copy link
Member Author

Thanks everyone.

@chrishfish
Copy link

Awesome @HyukjinKwon glad this issue has been resolved permanently 👍

liyichao pushed a commit to liyichao/spark that referenced this pull request May 24, 2017
…nd for the same shorten name

## What changes were proposed in this pull request?

One of the common usability problems around reading data in spark (particularly CSV) is that there can often be a conflict between different readers in the classpath.

As an example, if someone launches a 2.x spark shell with the spark-csv package in the classpath, Spark currently fails in an extremely unfriendly way (see databricks/spark-csv#367):

```bash
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
scala> val df = spark.read.csv("/foo/bar.csv")
java.lang.RuntimeException: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), please specify the fully qualified class name.
  at scala.sys.package$.error(package.scala:27)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:574)
  at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:85)
  at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:85)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:295)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412)
  ... 48 elided
```

This PR proposes a simple way of fixing this error by picking up the internal datasource if there is single (the datasource that has "org.apache.spark" prefix).

```scala
scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

```scala
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

## How was this patch tested?

Manually tested as below:

```bash
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
```

```scala
spark.sparkContext.setLogLevel("WARN")
```

**positive cases**:

```scala
scala> spark.range(1).write.format("csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:44 WARN DataSource: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

```scala
scala> spark.range(1).write.format("Csv").mode("overwrite").save("/tmp/abc")
17/05/10 09:47:52 WARN DataSource: Multiple sources found for Csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,
com.databricks.spark.csv.DefaultSource15), defaulting to the internal datasource (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat).
```

(newlines were inserted for readability).

```scala
scala> spark.range(1).write.format("com.databricks.spark.csv").mode("overwrite").save("/tmp/abc")
```

```scala
scala> spark.range(1).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").mode("overwrite").save("/tmp/abc")
```

**negative cases**:

```scala
scala> spark.range(1).write.format("com.databricks.spark.csv.CsvRelation").save("/tmp/abc")
java.lang.InstantiationException: com.databricks.spark.csv.CsvRelation
...
```

```scala
scala> spark.range(1).write.format("com.databricks.spark.csv.CsvRelatio").save("/tmp/abc")
java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark.csv.CsvRelatio. Please find packages at http://spark.apache.org/third-party-projects.html
...
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes apache#17916 from HyukjinKwon/datasource-detect.
@HyukjinKwon HyukjinKwon deleted the datasource-detect branch January 2, 2018 03:42
@xy1024xiangyu
Copy link

xy1024xiangyu commented Mar 17, 2021

@HyukjinKwon @cloud-fan , according to the discussion, it seemed that the "Multiple sources found for csv" issue has been solved. However, when I running my Java jar, an error happens.
The Java code is as follows:
DataFrameReader read = spark.read();
JavaRDD<String> stringJavaRDD = read.textFile(inputPath).javaRDD();

When running the Java code in IDE, the program works well. However when using spark-submit, the error as follows:

org.apache.spark.sql.AnalysisException: Multiple sources found for text (org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2, org.apache.spark.sql.execution.datasources.text.TextFileFormat), please specify the fully qualified class name.; at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:707) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:733) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:248) at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:843) at org.apache.spark.sql.DataFrameReader.textFile(DataFrameReader.scala:880) at org.apache.spark.sql.DataFrameReader.textFile(DataFrameReader.scala:852) at com.three2three.bigfoot.vola.NormalizeSnapshotSigmaAxisImpliedVola.main(NormalizeSnapshotSigmaAxisImpliedVola.java:306) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Even, I change my code to
DataFrameReader read = spark.read();
JavaRDD<String> stringJavaRDD = read.format("org.apache.spark.sql.execution.datasources.text.TextFileFormat").textFile(inputPath).javaRDD(); does not help with this problem.

Detailed description here: https://stackoverflow.com/questions/66664181/spark-multiple-sources-found-for-text

Any idea how to solve this problem?

Does upgrading the installed spark version to the latest version help solve the problem?

@cloud-fan
Copy link
Contributor

Did you closely follow the doc to run spark-submit? https://spark.apache.org/docs/latest/submitting-applications.html Especially this part When creating assembly jars, list Spark and Hadoop as provided dependencies; these need not be bundled since they are provided by the cluster manager at runtime.

@xy1024xiangyu
Copy link

@cloud-fan , yes, I have followed the instruction. When I running spark-submit on my standalone windows machine, such error happens. However if I put my java jar on linux server, with the same way of running spark-submit , it works well.

Why it happens? Because some path is in the system path of windows and spark-submit find two datasource? Is it a bug then. I saw many post about this "Mutilple source found for ...", e.g. csv/json. My case is text. No idea why this error happens

@cloud-fan
Copy link
Contributor

If it only fails with Windows, it's probably a bug, but I have no idea what happens...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants