Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29163][SQL] Simplify Hadoop Configuration access in DataSourcev2 #26005

Conversation

holdenk
Copy link
Contributor

@holdenk holdenk commented Oct 2, 2019

What changes were proposed in this pull request?

This adds a trait that data source v2 users can mix-in to have a the Hadoop configuration serialized and broadcasted.

Why are the changes needed?

All of our Hadoop Data Source V2 implementations do a variation of this internally. We should reduce the copy pasta code inside of Spark and simplify this for external data source implementations as well.

Does this PR introduce any user-facing change?

Not for people building Spark applications, but for people building DataSource V2 implementations this standardizes how to access the Hadoop conf.

How was this patch tested?

Spark's existing V2 data sources are now using this feature, meaning the Hadoop configuration serialization and broadcasting will now be exercised through their tests. Validated by the fact I screwed it up first and they failed.

@holdenk
Copy link
Contributor Author

holdenk commented Oct 2, 2019

cc @rdblue since we talked a bit about this before the last DSv2 sync

@SparkQA
Copy link

SparkQA commented Oct 3, 2019

Test build #111704 has finished for PR 26005 at commit a04be5f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-29163] Simplify Hadoop Configuration access in DataSourcev2 [SPARK-29163][SQL] Simplify Hadoop Configuration access in DataSourcev2 Oct 3, 2019
@holdenk
Copy link
Contributor Author

holdenk commented Oct 3, 2019

cc @dbtsai since we were just talking about DSv2

val options: CaseInsensitiveStringMap

var cachedHadoopConf: Configuration = null
var cachedBroadcastedConf: Broadcast[SerializableConfiguration] = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't plan to modify / invalidate the cache, why not just use lazy val in the following?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lazy vals behave a bit strangely with inheritance and overriden non-abstract methods. I started this off with a lazy val and had some issues with it not calling the correct hadoopConf generator. I can double check my understanding though if you think that's incorrect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh also for scope, since we want to be able to access the cachedBroadcastedConf probably protected rather than private yeah?

// changed `hadoopConf` in executors.
OrcFilters.createFilter(schema, pushedFilters).foreach { f =>
OrcInputFormat.setSearchArgument(hadoopConf, f, dataSchema.fieldNames)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a new API in BroadcastedHadoopConf like

def withHadoopConfRewrite(hadoopConf): Conf = {
  hadoopConf
}

We just overwrite it and put the custom rewriting code. The code will be simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that initially, I'm happy to switch back to it.

@holdenk holdenk force-pushed the SPARK-29163-simplify-hadoop-conf-access-in-DSv2 branch from a04be5f to fd847ff Compare November 1, 2019 14:43
@holdenk
Copy link
Contributor Author

holdenk commented Nov 1, 2019

Hey @dbtsai I think I've addressed the comments if you have the cycles to take a second look :)

* A helper trait to serialize and broadcast the Hadoop configuration for readers.
*/
@DeveloperApi
trait BroadcastedHadoopConf {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is intended to be extended by DSv2 implementations, it should be written in Java. The v2 API is entirely Java to avoid problems between Scala and Java (and the rest of the JVM ecosystem).

/**
* Override this if you need to rewrite your Hadoop configuration differently
*/
protected def withHadoopConfRewrite(hadoopConf: Configuration): Configuration = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see the two use cases inside of the Spark code base (parquet & orc), where we take some user specified settings and modify Spark's base hadoop configuration based on those prior to broadcasting.

*/
@DeveloperApi
trait BroadcastedHadoopConf {
val sparkSession: SparkSession
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkSession isn't passed to v2 implementations, so it probably shouldn't be used like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think the v2 classes have a definition of the SparkSession inside of them though I think based on how this compiles/passes tests. Am I missunderstanding something here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The session isn't passed in. The active session is accessed here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala#L42

I believe the intent is to avoid relying on SparkSession itself, which is why I think it is so helpful to add the Hadoop configuration as a broadcast variable.

In general, be cautious when looking at the file sources as examples.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. But we do need the SparkSession for this so I think having a requirement that the SparkSession is present is probably the best way to do it? I could do a default implementation with getActive though if you think that would be better?

@rdblue
Copy link
Contributor

rdblue commented Nov 1, 2019

I think this is going to need to change quite a bit because the v2 API is written in Java instead of Scala, and doesn't get the SparkSession passed in. In Java, a helper trait should be an interface so that it doesn't require changing an implementation's parent to use.

That means that creating the broadcast variable should be done by Spark and then set on the implementation. I think it would look like this:

public interface RequiresBroadcastConf {
  Configuration prepareConf(Configuration original);
  void setBroadcastConf(Broadcast<Configuration> bconf);
}

@holdenk
Copy link
Contributor Author

holdenk commented Nov 1, 2019

Ok I'm happy to rewrite this into a Java interface with default implementations.

@holdenk
Copy link
Contributor Author

holdenk commented Nov 1, 2019

Ok so rewriting this to Java is annoying, I'll still do it, but for other V2 features would it be OK if there are Java test cases that show the APIs are callable? Or is this a hard design decision? Also it doesn't seem like all of the V2 code is actually in Java, is the intention to do a rewrite or am I miss-understanding what constitutes the V2 code base?

@SparkQA
Copy link

SparkQA commented Nov 1, 2019

Test build #113096 has finished for PR 26005 at commit fd847ff.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor Author

holdenk commented Nov 1, 2019

So if we want to rewrite this to a Java interface we need to drop the caching because of the field restrictions which I don't really like doing. If that's the only path forward ok but I'd rather keep this in Scala and verify it can be called from Java if that's ok @rdblue ?

@rdblue
Copy link
Contributor

rdblue commented Nov 1, 2019

Not all of v2 is in Java, but the parts that users extend are Java interfaces. Writing these in Scala is problematic because Scala might convert a trait into an interface or might make it an abstract class. Plus, Scala visibility is only enforced in Scala.

You can see how this would be a problem for this PR. If the trait gets turned into a Java abstract class because it needs instance fields to store its broadcastedHadoopConf, then that forces Java callers to choose between extending a base class (like FileDataSourceV2) and using the broadcasted configuration feature of the API.

This problem still exists in Java, which is why it's hard to write this. Since you need to use an interface and not an abstract class, you will probably need to build the broadcast variable and then configure the implementation with it at runtime.

@holdenk
Copy link
Contributor Author

holdenk commented Nov 1, 2019

I mean I think we probably expect people to extend FileScan which is implemented in Scala. I'm happy to verify it's extendible from Java with a test case which it seems like would address your concern?

@rdblue
Copy link
Contributor

rdblue commented Nov 1, 2019

No, the API used to plug in sources is written in Java, so this should be written as a Java interface.

Even if people do choose to extend FileScan, there are many other use cases for the DSv2 API that won't do that.

@holdenk
Copy link
Contributor Author

holdenk commented Nov 1, 2019

Ok, internally most of our use cases where we're using the hadoop conf is with FileScan. I'll finish the rewrite to Java, we'll loose the caching logic but we didn't have that before

@holdenk
Copy link
Contributor Author

holdenk commented Nov 1, 2019

So finished the re-write to Java. I think the remaining active comment is around the SparkSession. What are your thoughts on that @rdblue ?

@SparkQA
Copy link

SparkQA commented Nov 2, 2019

Test build #113118 has finished for PR 26005 at commit aadd185.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk
Copy link
Contributor Author

holdenk commented Nov 6, 2019

Ping @rdblue ?

@DeveloperApi
public interface BroadcastedHadoopConf {
SparkSession sparkSession();
CaseInsensitiveStringMap options();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that the data source should be responsible for providing a SparkSession. Spark has a session and configures the data source.

I think this interface should allow Spark to pass a broadcasted Hadoop Configuration to the source instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand what you mean here, could you try and rephrase it? Sorry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: do you mean you want us to use reflection for something implementing this interface inside of the DSv2 code path? Or do you mean something else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data sources aren't passed a Spark session, so it doesn't make sense that this interface requires the source to provide one. If we think sources need access to a Spark session, then we should add an interface that sets the session directly. (I know the built-in sources have one, but that's because those are v1 sources that have been slightly modified to fit into v2.)

The benefit of having an interface like this is to avoid needing the Spark session. That's currently done by accessing the Spark session from the environment (e.g. SparkSession.builder.getOrCreate()) and we want to avoid making sources do that.

That's why I think the way this works should be to set a broadcasted configuration on a Table implementation that extends this interface using a method like setBroadcastedConf.

I'm on ASF slack if you'd like to talk about this more directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so we did a quick chat. It’s possible that this and one other small use case are the only reasons the sources currently have a Spark Session inside of them and the design goal
of DSv2 was to get rid of that. So I’ll see if this can be avoided, but if it’s more than broadcasting the Hadoop conf and case sensitive options then I’ll switch the API to just pass in a Spark session and we can stick with this API. Does that sound like a reasonable summary @rdblue?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Feb 23, 2020
@github-actions github-actions bot closed this Feb 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants