Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Make XGBoost4j-Spark to support PySpark #7578

Closed
wbo4958 opened this issue Jan 19, 2022 · 39 comments
Closed

[FEA] Make XGBoost4j-Spark to support PySpark #7578

wbo4958 opened this issue Jan 19, 2022 · 39 comments

Comments

@wbo4958
Copy link
Contributor

wbo4958 commented Jan 19, 2022

For now, Both XGBoost4j-Spark and XGBoost4j-Spark-GPU don't support XGBoost run on PySpark env, which may cause users hard to use XGBoost4j since they may know nothing about scala language.

There were two PRs about how to make XGBoost support PySpark. #4656, and #5658. I personally prefer the #4656 which composes the XGBoost4j-Spark python wrapper into the XGBoost4j-Spark.jar, so, in this case, the users do not need to add extra jars when submitting the XGBoost applications.

@thesuperzapper Could you continue to finish your previous PR?

@candalfigomoro
Copy link

I personally prefer the #4656 which composes the XGBoost4j-Spark python wrapper into the XGBoost4j-Spark.jar, so, in this case, the users do not need to add extra jars when submitting the XGBoost applications.

What about including it within the xgboost python package? Obviously it should gracefully fail when the xgboost4j-spark JAR is unavailable.

@thesuperzapper Could you continue to finish your previous PR?

From the PR code:

## EXCLUDED: trackerConf=None,

Could you add a way to set the Scala tracker instead of the Python one?

In Java/Scala I set:

xgbClassifier.set("trackerConf", new TrackerConf(0, "scala"));

@trivialfis
Copy link
Member

My personal preference is we can get a design doc first. I spent the last 2 days learning about spark and was able to build a Python interface using the new arrow serialization format in pyspark (df.mapInPandas) along with support for normal rdd (much slower). So there are 2 possible paths forward, we can build it on top of existing jvm package, or we can start with the Python package. I help maintain the Python package so I'm biased. More inputs are welcomed.

@candalfigomoro
Copy link

candalfigomoro commented Jan 31, 2022

My personal preference is we can get a design doc first. I spent the last 2 days learning about spark and was able to build a Python interface using the new arrow serialization format in pyspark (df.mapInPandas) along with support for normal rdd (much slower). So there are 2 possible paths forward, we can build it on top of existing jvm package, or we can start with the Python package. I help maintain the Python package so I'm biased. More inputs are welcomed.

While you can train python xgboost on a single node and perform a parallel inference with .mapInPandas (in pyspark >= 3.0) or a @pandas_udf (in pyspark >= 2.3), the problem is that the training step is not distributed.

@trivialfis
Copy link
Member

the problem is that the training step is not distributed.

Thank you for the reply, could you please be more specific why it can't be distributed?

@candalfigomoro
Copy link

the problem is that the training step is not distributed.

Thank you for the reply, could you please be more specific why it can't be distributed?

.mapInPandas (or @pandas_udf) applies a function to a batch/group of Spark DataFrame rows. This can be easily used to parallelize inference of python xgboost. I don't have a deep understanding of xgboost inner working, but if you want to distribute training you have to design a distributed training stategy. This is what xgboost4j-spark apparently does (using Spark). I'm not saying you can't do that in python/pyspark, I'm pretty sure you can do it, but you would have to basically reimplement the xgboost4j-spark Spark-distributed training logic in python/pyspark. It would be nice, but I think the effort would be much higher than just wrapping xgboost4j-spark.

@trivialfis
Copy link
Member

trivialfis commented Jan 31, 2022

but I think the effort would be much higher than just wrapping xgboost4j-spark.

Indeed. I don't want to reinvent the wheel either. On the other hand I have concerns over the complexity and the lack of flexibility of building on top of JVM package. Thinking about building and distributing the package through pip and conda, debugging any issue, model compatibility between languages and lastly the difficulty of adding new features.

As for the internal working of xgboost, I think the spark package just repartitions the data to number of workers then train on each partition. In contrast, the dask package can access partitions on each worker directly so no partitioning is required. I can't be entirely sure about the ray package at the moment.

@thesuperzapper
Copy link
Contributor

thesuperzapper commented Feb 1, 2022

@trivialfis @wbo4958 Hey everyone, I do find it funny that people are still using my code from PR #4656 after all these years!

The tracking issue from 2018, still has the remaining steps to finalize PySpark support:

  1. Decide on the Python package name
  2. Develop PySpark wrapper code
  3. Decide how to package the code
  4. Create a testing framework
  5. Write API Documentation
    • Since we are adding a whole new API to XGBoost, we will need to write some docs for the "JVM Package" section of the website
    • The source for these pages on the website is found at ./doc/jvm/

@thesuperzapper
Copy link
Contributor

but I think the effort would be much higher than just wrapping xgboost4j-spark.

Indeed. I don't want to reinvent the wheel either. On the other hand I have concerns over the complexity and the lack of flexibility of building on top of JVM package. Thinking about building and distributing the package through pip and conda, debugging any issue, model compatibility between languages and lastly the difficulty of adding new features.

@trivialfis funnily enough the whole of core pyspark.ml is just a wrapper around the Java Spark API using Py4J.
So if anything, a wrapper is the expected way to build an ML library for PySpark.

For reference, here are some of the pyspark.ml packages:

@candalfigomoro
Copy link

The main advantage of starting from python xgboost instead of the JVM package is that the JVM version is always lagging behind the python version in terms of features.

@thesuperzapper
Copy link
Contributor

The main advantage of starting from python xgboost instead of the JVM package is that the JVM version is always lagging behind the python version in terms of features.

@candalfigomoro implementing a Spark package in Java/Scala is MUCH easier than Python, and since we can get Python effectively for free with Py4J (and wrapping the Java API), going through all that effort would be for nothing.

@candalfigomoro
Copy link

@thesuperzapper
Sure, but what would be the effort to implement in the JVM package all the features that it lacks compared to python xgboost?

@thesuperzapper
Copy link
Contributor

@candalfigomoro that question seems disconnected. Running the Python XGBoost library in a Spark worker is not the same as running distributed XGBoost across a Spark cluster (which is what the XGBoost4J-Spark library does).

@trivialfis
Copy link
Member

trivialfis commented Feb 4, 2022

Thank you for the discussion! To me, at this early stage, I think a better question to ask is what users want from a PySpark XGBoost integration. For example, what motivates you to use PySpark instead of the Scala version? And how can the PySpark-XGBoost interface be part of the workflow you fell in love with. Are the users required to know anything about java/scala? How to implement the interface is a different question. Hence I argue that a design doc is needed.

For what users want from the PySpark package, I have to assume it's the ecosystem of Python along with the features provided by Spark. The ecosystem consists of both toolchains and libraries. From this perspective, I think it's a good idea to ship the Python wrapper as a proper Python package instead of some files inside a jar. So that's the interface for installation. Adding to this, I think it will be a Python-oriented package so the users should not be asked to understand what a jar is, in the same way as xgboost4j-spark users are not required to know about the libxgboost.so produced by the c++ compiler. (fun to think if I ship the .class files inside a .so ;-) )

Being Python-oriented also implies that the pyspark-xgboost package should have some basic interoperability with the rest of the Python world and the XGBoost Python package (instead of the JVM package). For the former, that includes document generation with sphinx, type hints, serialization (pickle), type hierarchy, run in a Python shell when it's using local mode, etc. The PySpark package itself has done a great job on this front and we should follow. For the latter, the current JVM Spark package produces models that are not compatible with other language bindings, which is disconcerting and I really want to have it addressed, at least that should not happen to the PySpark XGBoost package. Another example is the callback functions and custom metrics/objectives in Python. Also, with the interoperability, we can use some other libraries in the Python ecosystem like SHAP to assist analysis, treelite/FIL for faster inference, etc. These might also be what users want.

Whether it's implemented using the existing JVM package or it's built from scratch is an important decision to make for both feature development and deployment. But meeting the convention and interoperability of the Python ecosystem is a more important aspect to me, to a degree that it might be the only value of having such integration. If the PySpark integration doesn't have this interoperability we might just use the scala version instead.

In summary, I think it should be a higher level, standard Python package to make the integration most useful, whether it's built on top of xgboost4j or python xgboost needs to be hidden as an implementation detail.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Feb 5, 2022

sorry for the late response. I just was on vacation.

@trivialfis, I didn't test PySpark on XGBoost-Python package, and I guess it can work well for the CPU pipeline. But for the GPU pipeline, it may not fully leverage the parallelity of Spark to accelerate the whole pipeline.

eg, I'd like to train the XGBoost model using PySpark on XGBoost-python on the Spark cluster (48 CPU cores, 4 GPU, 4 Workers)
You can only set the Spark configurations which only allow 4 Spark tasks running at the same time because XGBoost lib requires 1 XGBoost training task occupying 1 GPU (which is pretty slow for the ETL phase). Or we need the user to repartition their dataset which is just reinventing the wheel. But this is done by PySpark on XGBoost-JVM and we can run 48 Spark tasks for ETL. Trust me, 48 Spark tasks are pretty faster than 4 spark tasks.

On the other hand, 1.6.0-SNAPSHOT has introduced XGBoost-Spark-GPU which can typically accelerate the ETL phase, which makes XGBoost more competitive.

@trivialfis
Copy link
Member

trivialfis commented Feb 6, 2022

@wbo4958 Hey, I'm not against using the JVM package, please see #7578 (comment) . PySpark mllib is built on top of existing scala implementation and it's perfectly fine.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Feb 8, 2022

Hi @trivialfis,

As you mentioned in #7578 (comment), you have successfully tried to run the XGBoost Python package in PySpark env. It looks good for now that we don't need to do anything to support XGBoost-Python package in PySpark, that's cool, right?

So, here again, the FEA request, we can think it is the extension of XGBoost-JVM, because it is just an XGBoost JVM API wrapper, any logic of it will be routed to XGBoost JVM.

So, here we provide the Python users with two ways to use XGBoost.

  • the first way, the user can use the XGBoost-python package in PySpark. This way needs users to do some preparation before training/inference. And, ETL phase will not be accelerated by GPU
  • the second way, the use use the XGBoost-JVM pyspark Wrapper in PySpark, with this way, users don't need to do preparation, since XGBoost-JVM has done this for users. and ETL can be totally accelerated by GPU.

@candalfigomoro
Copy link

@candalfigomoro that question seems disconnected. Running the Python XGBoost library in a Spark worker is not the same as running distributed XGBoost across a Spark cluster (which is what the XGBoost4J-Spark library does).

That's what I said in #7578 (comment)

My usage scenario would be: train xgboost on larger-than-memory datasets by leveraging Spark-distributed computation and integrate xgboost training within a python/pyspark code base.

Potential solutions are:

  1. create a python xgboost4j-spark wrapper. Pros: it's easier to do. Cons: xgboost4j lags in terms of features behind python xgboost.
  2. add (Py)Spark-distributed training to python xgboost. Pros: python xgboost is more up-to-date in terms of features. Cons: it's harder to do because you have to reimplement the spark-distributed training logic within python xgboost

@trivialfis
Copy link
Member

trivialfis commented Feb 18, 2022

xgboost4j lags in terms of features behind python xgboost.

That's mostly caused by me not being familiar with the JVM ecosystem. Feel free to contribute. ;-)

it's harder to do because you have to reimplement the spark-distributed training logic within python xgboost

We are maintaining a dask module for distributed training and inference.

@trivialfis
Copy link
Member

Quick update: after long arguments with @wbo4958 , we came to somewhere in the middle and might follow https://nlp.johnsnowlabs.com/docs/en/install#python as an example for package distribution. In essence, the Python package and JVM package will be separately managed so Python users still have their toolchains working while Spark will manage the JVM dependencies. As a feature request from me, we might try to change the xgboost4j spark package so that the model is more portable and can be used by other Python tools. Aside from these, we will try to add some small polishments to the original PR from @thesuperzapper before getting it ready for another round of attempt.

@mengxr
Copy link

mengxr commented Apr 27, 2022

Databricks implemented PySpark XGBoost integration on top of Python XGBoost instead xgboost4j-spark. You can find API docs here. We are happy to contribute it to XGBoost official repo if the stakeholders agree it is better for PySpark users. To assist the discussion, we open sourced our implementation at https://github.com/mengxr/pyspark-xgboost with Apache License. Please take a look.

Note that it doesn't contain the code that can efficiently handle sparse data because we use a private method internally. Hopefully it is not a blocker for design discussions.

@trivialfis
Copy link
Member

trivialfis commented May 17, 2022

Hi @mengxr . Could you please share some insight into why do you choose the Python approach instead of the Jvm approach? I understand that the Python approach can lead to a better user experience for Python users. But for the longer term, we settled on the JVM approach due to memory consumption and potential performance penalty from data serialization (arrow & pickle). Also, it would be easier for us to integrate GPU into the user pipeline. But we would like to learn more about the rationale behind your decision. I was informed that you think the JVM approach doesn't meet the requirement of databricks, could you please elaborate on this part? I'm open to suggestions and not biased toward any approach (as suggested in previous comments in this thread).

@mengxr
Copy link

mengxr commented May 18, 2022

We ran some benchmark and we haven't observed performance penalty from data serialization yet. We only need the data conversion once before training. So the memory consumption during training should be the same for both implementation. Both are calling native XGBoost for distributed training. Do you have examples to demonstrate memory/performance issues? Our implementation also supports GPU. We can run benchmark to see if there are performance gaps. Again, the only difference is the initial data conversion/transfer.

Most of the benefits of going python native were discussed in this thread. I think it outweighs minor performance gaps, if any. I cherrypicked a few items from our requirements doc:

  • Support most params from XGBoost Python and automatically support new params from new XGBoost releases. This simplify the migration path from single-node XGBoost Python to distributed w/ PySpark. Using Python wrapper, we can pass most keyword args to XGBoost Python blindly. If we take the Scala wrapper approach, we might have to keep track of new features from single-node Python. It is certainly feasible. But it is a community-maintained project. I'm not sure how to guarantee feature parity between Scala and Python.

  • Obtain the XGBoost Python model object from the fitted model easily. So I can use it for single-node inference without Spark.

  • I saw the driver crash issues was fixed via barrier mode here: https://github.com/dmlc/xgboost/pull/7836/files. This was a huge pain point we saw from our customers.

This is not an issue on Databricks but it would be an issue for open-source PySpark/XGBoost users.

  • Use pip to manage environments. If we go with Scala wrapper, we either take the complexity to package jars into the XGBoost python package (like pyspark) or we need to ask users to install the jars or use --packages with spark-submit.

@wbo4958
Copy link
Contributor Author

wbo4958 commented May 19, 2022

Thx @mengxr,

Here are some comments about the requirements,

Support most params from XGBoost Python and automatically support new params from new XGBoost releases.

Jvm side has packed all the parameters to xgboost including those not-defining on Param, see, https://github.com/dmlc/xgboost/blob/master/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala#L199 . So jvm should also auto-supports the new parameters without any changing.

Obtain the XGBoost Python model object from the fitted model easily. So I can use it for single-node inference without Spark.

Yeah, the native model trained from jvm side should be xgboost-compatible, so it's not hard to convert to xgboost-python model.

I saw the driver crash issues was fixed via barrier mode here.

Yeah, the issue has been fixed and right now, the JVM is quite stable.

Use pip to manage environments. If we go with Scala wrapper,

Yeah, that's painful for jvm. But if users are trying it on Spark, then they should be supposed to have some knowledge about the parameters of spark-submit.

I also like your solution which is really good for the users. But after testing, JVM can train the datasize which is about 2x than your solution before crashing. And there is also no rank in your solution. What's more, it's really like re-inventing the wheels. And if xgboost community accepts your solution, then there will be 2 different ways to make XGBoost to run on Spark, this is really hard to maintain two different ways from XGBoost community point of view.

@mengxr
Copy link

mengxr commented May 19, 2022

Yeah, the native model trained from jvm side should be xgboost-compatible, so it's not hard to convert to xgboost-python model.

Do you plan to provide an API for PySpark users to get the Python model object directly?

Yeah, that's painful for jvm. But if users are trying it on Spark, then they should be supposed to have some knowledge about the parameters of spark-submit.

I think that underestimated the usability issues. Managing Python dependencies on PySpark is not a trivial task. See https://databricks.com/blog/2020/12/22/how-to-manage-python-dependencies-in-pyspark.html . Side tracking jars adds more complexity. For example, this is a common workflow:

  • Train a model on a single node using XGBoost (Python) over a sampled dataset for fast development.
  • Train a model on a cluster using PySpark+XGBoost over the full dataset.
  • Convert the trained model to XGBoost (Python) model object.
  • Define a Pandas UDF to wrap the model object to make predictions.
  • Embed the UDF in a batch inference pipeline.
  • Or serve the Python model object.

If we go with the scalar wrapper, a user might use the latest version of XGBoost (Python) but an older version of xgboost4j-spark, where behavior changes could happen.

There are issues with Scala version compatibility too. Spark supports Scala 2.13 since 3.2 release (7 months ago). But xgboost4j doesn't support it yet. See #6596.

I also like your solution which is really good for the users. But after testing, JVM can train the datasize which is about 2x than your solution before crashing.

Do you mind sharing the benchmark code? Both approaches are just doing data conversion and then calling XGBoost for distributed training. Using PySpark doesn't need to keep a copy of the data. XGBoost (Python) might keep an extra copy. If that is the case, we just need a stream input for DMatrix.

And there is also no rank in your solution. What's more, it's really like re-inventing the wheels. And if xgboost community accepts your solution, then there will be 2 different ways to make XGBoost to run on Spark, this is really hard to maintain two different ways from XGBoost community point of view.

Understood the extra maintenance cost. At the end, I think it depends on how you compare PySpark user experience and community maintenance cost. Note that there are also much more Python developers than Scala developers who can contribute to the project.

@mengxr
Copy link

mengxr commented May 19, 2022

Forgot to ask: how do you plan to support callback with the Scala wrapper?

@trivialfis
Copy link
Member

@mengxr Is there a clear path for the implementation of sparse data support?

@mengxr
Copy link

mengxr commented May 20, 2022

apache/spark#36408

@trivialfis
Copy link
Member

Excellent! Will get back to this on Monday.

@trivialfis
Copy link
Member

trivialfis commented May 23, 2022

We made multiple comparisons between the Pythonic approach and the JVM approach. The result is inconclusive. But I would like to push forward instead of stalling on this much-anticipated feature. @wbo4958 would like to proceed with the JVM approach but I would like to continue on the Pythonic approach (the one from @mengxr) despite I tried to be neutral for a long while. My preference is due to one simple reason: It should be a Python package targeting Python programmers. If we don’t have that, there’s little value in introducing yet another interface. The JVM package is always there.

Following is a list of pros and cons of the Pythonic approach I summarized from various conversations for future reference.

Pros

  • Dependency management

I think that underestimated the usability issues.

I strongly agree. I gave an example in an offline discussion. If I were to write a forecasting library based on a bunch of ML libraries like XGBoost, LightGBM, Pytorch, etc, and users are expected to use my library instead of interacting with underlying implementation directly, do they need to match those package versions themselves? With the size of the Python environment, things can get out of hand real quick.

  • Support for Python objective/metric/callback. There was a discussion on how to spawn a Python process inside JVM. That seems to be possible but I do not prefer this type of hacking. It always ends up with more problems than we set out to solve.

  • Easier integration with the rest of the Python library and Python ecosystem.

  • Lower maintenance cost. Yes, I think it's actually lower. Firstly, as mentioned above, there’s no point in introducing this interface if it’s not enjoyable to Python programmers, all the effort is just a waste of time from my perspective. So, even though we might write less code with the JVM approach, to me those new code has minimal value. In the longer term, with the Pythonic approach, we don't need to go through c++ -> java-> scala -> python in order to make new features or modifications. Also, we don't need to match the versions between packages in maven central and packages in pypi/conda-forge. Lastly, we can reuse all the code that is already in the Python package and don't have to maintain the “launching Python from Java” workaround for objectives.

  • Easier to set up the CI. Echo to above, the package synchronization is a burden on the CI as well.

  • Most of the issues in the Pythonic approach can be resolved by development in spark and are not specific to XGBoost. Overhead in data copying, IPC, GPU data handling, etc. For instance, the sparse data support is resolved in the upstream already as @mengxr has pointed out. If an issue can be solved by Spark, we should not worry about it. The parallel tracker replaced by the barrier mode is a good example, our workaround will eventually lag behind the one from Spark and become a problem as these workarounds are always out of the scope of the project.

Following are the concerns around the Pythonic approach that I have received:

  • Maintenance
    It's upsetting that we have to implement 2 interfaces for Spark, and that's not including the R package we currently have. Do we make yet another integration for R in the future? Looking into the Python package it’s about 1k lines of code without support for LTR. But we might be able to shrink it down a little bit with some code reusing. Most of them are just boilerplate code and should not be too difficult to understand. The expected size would be similar to what we currently have with the dask interface. I think from the perspective of users, the overhead is worth it.

  • Memory usage
    The current implementation open-sourced by @mengxr costs significantly more memory than the JVM approach @wbo4958 was working on, which is disconcerting for sure. I suspect that's caused by data serialization based on arrow and pickle along with data concatenation. Together they might make multiple copies of data in memory. We can compensate for this memory consumption by using Quantile DMatrix for CPU. #7890 and possibly consuming arrow data directly when this is supported by Spark. Also, I think there is room for improvements in that implementation.

  • GPU data serialization
    For GPU data (cuDF), Pyspark needs to copy data between host and device along with copies between different processes. I opened an issue on spark-rapids for potential optimization using CUDA IPC: [FEA] Use CUDA IPC for sharing data between Scala and Python processes. NVIDIA/spark-rapids#5561 . I think that will be much better than our own solution in the longer term.

  • Learning to Rank
    We have a dask implementation, I’m not worried about the Pyspark implementation.

My conclusion is that neither approach is perfect. But I would like to choose one of them and push forward, and the Python approach makes better sense to me as it has fewer problems for us to solve and is more promising. (I have high hopes that Spark developers can get them right). Most importantly, it actually looks like a Python package.

@trivialfis
Copy link
Member

If there are other concerns about user experience please feel free to raise them. Otherwise, let's move forward and get a working prototype for merge.

@trivialfis
Copy link
Member

@mengxr Please let us know if there's anything we can help.

@mengxr
Copy link

mengxr commented May 31, 2022

@trivialfis I want to confirm the final decision. Are we doing both (@wbo4958 on xgboost4j wrapper and others on native python wrapper) or we go with native Python wrapper only? It isn't very clear to me and I'm not familiar with the sign-off process here.

cc: @WeichenXu123 @lu-wang-dl

@trivialfis
Copy link
Member

We are going with the Python wrapper approach as there's no new concern on the user experience side.

We won't support 2 implementations, so don't worry about that. A related question, since the Python wrapper in its current state is not quite complete, will you continue the development and maintenance after the initial merge?

@mengxr
Copy link

mengxr commented Jun 2, 2022

On top of the code we open sourced, we will add back sparse data support. I guess we can add LTR as well, which should be straightforward. And yes, we would like to help maintenance after the initial merge.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Jun 4, 2022

@mengxr, Yeah, there is the only pythonic way. Please file the first PR. Thx

@candalfigomoro
Copy link

@mengxr
Does the python-based implementation support num_parallel_tree to enable RandomForest-mode and boosted RFs? This is currently unsupported by the xgboost4j implementation (see #7535).

@WeichenXu123
Copy link
Contributor

@mengxr Does the python-based implementation support num_parallel_tree to enable RandomForest-mode and boosted RFs? This is currently unsupported by the xgboost4j implementation (see #7535).

Python based implementation internally calls the xgboost.train API, https://xgboost.readthedocs.io/en/stable/python/python_api.html#xgboost.train

So it can support it.
the python-based implementation allows setting all params supported by xgboost.train API

@WeichenXu123
Copy link
Contributor

WeichenXu123 commented Jun 17, 2022

@trivialfis I created a draft PR #8020

@wbo4958
Copy link
Contributor Author

wbo4958 commented Jun 22, 2022

Close this issue, Since @WeichenXu123 has made the PR which based on the xgboost python package.

@wbo4958 wbo4958 closed this as completed Jun 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants