Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jvm-packages] initial pyspark api (WIP) #4656

Closed
wants to merge 1 commit into from

Conversation

thesuperzapper
Copy link
Contributor

@thesuperzapper thesuperzapper commented Jul 11, 2019

This is a cleaned up version of the PySpark API, which previously had the PR #3376.

There are numerous improvements:

  • You no longer need to use a special Pipeline object, the default spark ones work.
  • Parameter getters/setters are dynamically generated from the parameters present in the Scala object.
  • I have moved the code under xgboost4j-spark/src/main/resources which means the python code will end up in the .jar file
    • This means users can use the jar as if it were a python package zip.
    • This means that --packages should make the python library available automatically.
  • Internal code for saving/loading is more sensible.

Here is an example: (Don't forget to spark-submit the xgboost4j.jar and xgboost4j-spark.jar)

# Initialise a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Add the jar file (as if it were a Python zip)
spark.sparkContext.addPyFile("/PATH/TO/JAR/xgboost4j-spark-0.90.jar")

# Import Objects 
from sparkxgb import XGBoostClassifier, XGBoostClassificationModel

# Read some data
dataPath = "/PATH/TO/SPARK_HOME/data/mllib/sample_binary_classification_data.txt"
data = spark.read.format("libsvm").option("vectorType", "dense").load(dataPath)
dataSplit = data.randomSplit([0.8, 0.2], seed = 1000)
dataTrain = dataSplit[0]
dataTest = dataSplit[1]

# Define the model
paramMap = {
    "eta": 0.1,
    "maxDepth": 2,
    "objective": "binary:logistic",
    "numRound": 5,
    "numWorkers": 2
}
xgbClassifier = XGBoostClassifier(**paramMap) \
    .setFeaturesCol("features") \
    .setLabelCol("label")

# Write/Read object
xgbClassifier.write().overwrite().save("xgboost_python")
xgbClassifier_new = XGBoostClassifier().load("xgboost_python")

# Train the model
xgboostModel = xgbClassifier_new.fit(dataTrain)

# Display model parameters
for param, value in xgboostModel.extractParamMap().items():
    print((param.name, value))

# Write/Read model
xgboostModel.write().overwrite().save("xgboost_python.model")
xgboostModel_new = XGBoostClassificationModel().load("xgboost_python.model")

# Transform some data
xgboostModel_new.transform(dataTest).show()

Still to do:

  • Setup PyPi to distribute (For those who want to PIP install rather than use the .jar)
    • There is a setup.py, but it will need to be finalised.
  • Write integration tests and hook them in test_jvm_cross.sh#L38
  • Write documentation

@alibeyram
Copy link

alibeyram commented Jul 11, 2019

@thesuperzapper Where is sparkxgb located at I can see one that is committed for version .80 but is there a newer one .9 for?

@thesuperzapper
Copy link
Contributor Author

@alibeyram
I am a little confused about your statement, but here is a zip of just the pyspark code: pyspark-xgboost_0.90_261ab52e07bec461c711d209b70428ab481db470.zip

You can use it in the same way as the above example, with the following change:

spark.sparkContext.addPyFile("/PATH/TO/pyspark-xgboost_0.90_261ab52e07bec461c711d209b70428ab481db470.zip")

I would love it if people could test this and suggest changes we can make for usability.
(A general tip for testing: make sure you don't retrain the same model within 60sec, as #4628 shows this will hang, even in Scala)

@alibeyram
Copy link

@thesuperzapper Thank you I was looking for that zip file. The one I had was for xgboost version 0.80 at sparkxgb 0.80
We will test it and let you know if we have any suggestions

@debinqiu
Copy link

@thesuperzapper Thank you for the awesome package. I encountered the following error when loading the booster back using XGBoostClassificationModel.load().

/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/pyspark/ml/util.py in load(cls, path)
309 def load(cls, path):
310 """Reads an ML instance from the input path, a shortcut of read().load(path)."""
--> 311 return cls.read().load(path)
312
313

/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/pyspark/ml/util.py in load(self, path)
251 raise NotImplementedError("This Java ML type cannot be loaded into Python currently: %r"
252 % self._clazz)
--> 253 return self._clazz._from_java(java_obj)
254
255 def context(self, sqlContext):

/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/pyspark/ml/wrapper.py in _from_java(java_stage)
218 stage_name = java_stage.getClass().getName().replace("org.apache.spark", "pyspark")
219 # Generate a default new instance from the stage_name class.
--> 220 py_type = __get_class(stage_name)
221 if issubclass(py_type, JavaParams):
222 # Load information from java_stage to the instance.

/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/pyspark/ml/wrapper.py in __get_class(clazz)
212 parts = clazz.split('.')
213 module = ".".join(parts[:-1])
--> 214 m = import(module)
215 for comp in parts[1:]:
216 m = getattr(m, comp)

ModuleNotFoundError: No module named 'ml'

but I can 'import pyspark.ml' successfully without any problem. Do you have any idea why this happened? Thanks.

@thesuperzapper
Copy link
Contributor Author

@debinqiu are you 100% sure you have added either the compiled jar from this repo or the above .zip file using the following:

spark.sparkContext.addPyFile("/PATH/TO/pyspark-xgboost_0.90_261ab52e07bec461c711d209b70428ab481db470.zip")

Or with:

spark-submit ... --py-files "/PATH/TO/pyspark-xgboost_0.90_261ab52e07bec461c711d209b70428ab481db470.zip"

@debinqiu
Copy link

@thesuperzapper Thanks for the quick response. I actually used 0.82 version instead of 0.90 because I only have Spark v2.3 in my environment. 0.90 requires 2.4+, right?

I added a 'init.py' to make it as a python module so that I can call it directly. I also tested using scala XGBoostClassificationModel.load() to load the booster back in my environment, which was also working for me.

@thesuperzapper
Copy link
Contributor Author

@debinqiu its quite likely that the .zip file above will work with 0.82. (And the old .zip will not work properly with saving/loading)

Also, what do you mean you added an init.py?

@debinqiu
Copy link

@thesuperzapper Sorry, init.py is already in sparkxgb. So basically I just unzip your file and add it to my PYTHONPATH, which makes it as a normal python module.

Also, for regression problem, XGBoostRegressionModel.load() works for me to load booster back.

@cjkini
Copy link

cjkini commented Jul 19, 2019

Hello @thesuperzapper .
Thanks for this xgboost pyspark API. Really appreciate it.

I run the following code as writen by you on Jupyter Notebook


`#initialize OS
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.90.jar,xgboost4j-0.90.jar pyspark-shell'

# Initialise a SparkSession
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Add the jar file (as if it were a Python zip)
spark.sparkContext.addPyFile("C:\\spark-2.4.3\\xgboost4j-spark-0.90.jar")
spark.sparkContext.addPyFile("C:\\spark-2.4.3\\pyspark-xgboost_0.90_261ab52e07bec461c711d209b70428ab481db470.zip")

# Import Objects 
from sparkxgb import XGBoostClassifier, XGBoostClassificationModel

# Read some data
dataPath = "C:\spark-2.4.3\data\mllib\sample_binary_classification_data.txt"
data = spark.read.format("libsvm").option("vectorType", "dense").load(dataPath)
dataSplit = data.randomSplit([0.8, 0.2], seed = 1000)
dataTrain = dataSplit[0]
dataTest = dataSplit[1]

# Define the model
paramMap = {
    "eta": 0.1,
    "maxDepth": 2,
    "objective": "binary:logistic",
    "numRound": 5,
    "numWorkers": 2
}
xgbClassifier = XGBoostClassifier(**paramMap) \
    .setFeaturesCol("features") \
    .setLabelCol("label")

# Write/Read object
xgbClassifier.write().overwrite().save("xgboost_python")
xgbClassifier_new = XGBoostClassifier().load("xgboost_python")

# Train the model
xgboostModel = xgbClassifier_new.fit(dataTrain)

# Display model parameters
for param, value in xgboostModel.extractParamMap().items():
    print((param.name, value))

# Write/Read model
xgboostModel.write().overwrite().save("xgboost_python.model")
xgboostModel_new = XGBoostClassificationModel().load("xgboost_python.model")

# Transform some data
xgboostModel_new.transform(dataTest).show()`


However, i got the following error:


Py4JJavaError                             Traceback (most recent call last)
<ipython-input-11-4259f65e291d> in <module>
      2     featuresCol="features",
      3     labelCol="Survival",
----> 4     predictionCol="prediction"
      5 )

C:\spark-2.4.3\python\pyspark\__init__.py in wrapper(self, *args, **kwargs)
    108             raise TypeError("Method %s forces keyword arguments." % func.__name__)
    109         self._input_kwargs = kwargs
--> 110         return func(self, **kwargs)
    111     return wrapper
    112 

~\AppData\Local\Temp\spark-9199e390-a84e-4ff6-acfc-7e4f8e324d20\userFiles-1225a528-2091-4d6a-bf04-42b6d7532968\sparkxgb.zip\sparkxgb\xgboost.py in __init__(self, alpha, baseMarginCol, baseScore, checkpointInterval, checkpointPath, colsampleBylevel, colsampleBytree, contribPredictionCol, customEval, customObj, eta, evalMetric, featuresCol, gamm, growPolicy, labelCol, reg_lambda, lambdaBias, leafPredictionCol, maxBin, maxDeltaStep, maxDepth, minChildWeight, missing, normalizeType, nthread, numClass, numEarlyStoppingRounds, numRound, numWorkers, objective, predictionCol, probabilityCol, rateDrop, rawPredictionCol, sampleType, scalePosWeight, seed, silent, sketchEps, skipDrop, subsample, thresholds, timeoutRequestWorkers, trainTestRatio, treeLimit, treeMethod, useExternalMemory, weightCol)
     64 
     65         super(XGBoostClassifier, self).__init__()
---> 66         self._java_obj = self._new_java_obj("ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier", self.uid)
     67         self._create_params_from_java()
     68         self._setDefault()  # We get our defaults from the embedded Scala object, so no need to specify them here.

C:\spark-2.4.3\python\pyspark\ml\wrapper.py in _new_java_obj(java_class, *args)
     65             java_obj = getattr(java_obj, name)
     66         java_args = [_py2java(sc, arg) for arg in args]
---> 67         return java_obj(*java_args)
     68 
     69     @staticmethod

C:\spark-2.4.3\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1523         answer = self._gateway_client.send_command(command)
   1524         return_value = get_return_value(
-> 1525             answer, self._gateway_client, None, self._fqn)
   1526 
   1527         for temp_arg in temp_args:

C:\spark-2.4.3\python\pyspark\sql\utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

C:\spark-2.4.3\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling None.ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.
: java.lang.AbstractMethodError: ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.ml$dmlc$xgboost4j$scala$spark$params$GeneralParams$_setter_$round_$eq(Lorg/apache/spark/ml/param/IntParam;)V
	at ml.dmlc.xgboost4j.scala.spark.params.GeneralParams$class.$init$(GeneralParams.scala:28)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.<init>(XGBoostClassifier.scala:51)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.<init>(XGBoostClassifier.scala:56)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


Can you help me, how do i solve this problem?
I am using Spark 2.4.3
XGboost-0.9

Thanks in advance

@wei8171023
Copy link

@cjkini Have you solved your problem yet?I have the same problem as you!

@thesuperzapper
Copy link
Contributor Author

@wei8171023 @cjkini just to check, are you using Windows or Linux?

If your using Windows, can you try chaining the save/load paths to be fully qualified like so:

#OLD: xgbClassifier.write().overwrite().save("xgboost_python")
xgbClassifier.write().overwrite().save("file///C:/XXXXXX/xgboost_python")

#OLD: xgbClassifier_new = XGBoostClassifier().load("xgboost_python")
xgbClassifier_new = XGBoostClassifier().load("file///C:/XXXXXX/xgboost_python")

If that's not the issue, can you please clarify which line of code is throwing the error, (just progressively un-comment lines until you get the error if you have to).

@nicklavers
Copy link

nicklavers commented Aug 2, 2019

@thesuperzapper The lambda_ parameter doesn't seem to work.

This works just fine:

xgbRegressor = XGBoostRegressor(
  eta = 0.4,
  numRound = 30,
  numWorkers = 120,
  alpha = 10.0,
)

xgboostModel = xgbRegressor.fit(data_train)

But if I try to set the 'lambda_' parameter:

xgbRegressor = XGBoostRegressor(
  eta = 0.4,
  numRound = 30,
  numWorkers = 120,
  alpha = 10.0,
  lambda_ = 5.0, # <-- added
)

xgboostModel = xgbRegressor.fit(data_train)

I get an error:

AttributeError: 'XGBoostRegressor' object has no attribute 'lambda_

Note that this is not the same error that I get if I make up a fake parameter name:

xgbRegressor = XGBoostRegressor(
  eta = 0.4,
  numRound = 30,
  numWorkers = 120,
  alpha = 10.0,
  floomf = 5.0, # <-- made up
)

xgboostModel = xgbRegressor.fit(data_train)
TypeError: __init__() got an unexpected keyword argument 'floomf'

The XGBoostRegressor Init Signature lists a lambda_ parameter, but something seems to go wrong when I use it.

As long as I don't try to set lambda_, training works just fine.

@ramams
Copy link

ramams commented Aug 30, 2019

Hi,
This is a very useful functionality to have in PySpark. Thanks for your effort. Many of us want to use Python during modelling and Scala during scoring phases. Any update on when will this pull request get into master? I am hoping this will be available in 1.0 release.

BTW, when we build this python code with latest xgboost-spark, we see errors during runtime. It also seems like, this branch is using Scala 2.11.12 and Java 1.7, while the master seems to be using Scala 2.12.x and Java 1.8. I am guessing the error we are seeing is due to these version mismatches. Below is the error message we are observing while running our code:

Traceback (most recent call last):
File "/data/work_disk/Spark/Python_Scala/modeling/SmsContentClassification.py", line 64, in
xgbClassifier = XGBoostClassifier(**paramMap)
File "/Tools/spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/init.py", line 110, in wrapper
File "/tmp/spark-e9685b4f-798e-4e3a-b576-a585809f8ab7/userFiles-68443fc4-4e68-4314-a24e-e1c5c3531ea0/xgboost4j-spark_2.12-1.0.0-SNAPSHOT.jar/sparkxgb/xgboost.py", line 85, in init
File "/tmp/spark-e9685b4f-798e-4e3a-b576-a585809f8ab7/userFiles-68443fc4-4e68-4314-a24e-e1c5c3531ea0/xgboost4j-spark_2.12-1.0.0-SNAPSHOT.jar/sparkxgb/common.py", line 68, in init
File "/Tools/spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 67, in _new_java_obj
File "/Tools/spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in call
File "/Tools/spark/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/Tools/spark/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.
: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction1$mcZI$sp
at ml.dmlc.xgboost4j.scala.spark.params.GeneralParams.$init$(GeneralParams.scala:74)
at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.(XGBoostClassifier.scala:47)
at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.(XGBoostClassifier.scala:52)

@mrlsdvd
Copy link

mrlsdvd commented Sep 28, 2019

@thesuperzapper Thank you so much for this PySpark API! Would it be possible to make the evalSetsMap / eval_sets paramteter available through the API? I'd like to include early stopping evaluated on a particular dataframe, but that doesn't seem to be currently possible, since the variable is protected in the underlying scala implementation.

Thanks again!

@thesuperzapper
Copy link
Contributor Author

Hi guys, I am going to pick this back up again this week. I have been moving countries over the last month, so been a bit busy, I want to get this ready for 1.0 RC1.

The main things left to do are:

  • Fix the small bugs raised in this thread. (Missing parameters, etc)
  • Write at least one single end-to-end integration test which can run in CI(Mainly so saving/loading don't regress)
  • Write docs

Note:

  • For now, I won't setup publishing to PyPi, as users can pass the XGBoost spark jar as a python package, and PyPi introduces a dependency on the local Python environment of the Spark master, which is probably not ideal.
  • If someone wants to pick up PyPi, feel free to do it after this is merged. (I will leave a barebones setup.py in the PR)

return r


class XGboostEstimator(JavaEstimator, XGBoostReadable, JavaMLWritable, ParamGettersSetters):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need specific setXXXParam methods.
You could reference the code here (create a shared base class named _XgboostParams)
https://github.com/apache/spark/blob/8cf76f8d61b393bb3abd9780421b978e98db8cae/python/pyspark/ml/tree.py#L63

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123 This code automatically generates the getters/setters from the parameter list. Is there something I am missing?

@chenqin
Copy link
Contributor

chenqin commented Nov 6, 2019

@thesuperzapper please rebase to latest master and lint error should be gone. Also, is there any updates on ETA?

@chuanlihao
Copy link

Hi @thesuperzapper,

Thanks for providing such a great library. Is there any update on this PR? I'd be glad to help with this library if you are busy these days.

Some updates on this library: It has been included in rapidsai/xgboost. I cherry-picked this commit, fixed one or two minor bugs, and added GPU support to it: https://github.com/rapidsai/xgboost/commits/rapids-spark/jvm-packages/xgboost4j-spark/src/main/resources. I have also added some examples to demonstrate this Python library: https://github.com/rapidsai/spark-examples/tree/master/examples/apps/python/ai/rapids/spark/examples.

@danielhstahl
Copy link
Contributor

With 1.0 now out, are there any plans to start incorporating this in the main XGBoost codebase? Or any updates in this commit for 1.0 rather than using 0.90?

@WeichenXu123
Copy link
Contributor

@thesuperzapper Any updates ? If you're busy, do you mind I take over this PR ? Thanks!

@WeichenXu123
Copy link
Contributor

Seems author has no response. I will take over this. @CodingCat If you have any concern or suggestion let me know! :)

@thesuperzapper
Copy link
Contributor Author

@WeichenXu123 Just saw this, if might be easier if we work together (or at least have a chat on Zoom first), as there are only a few things left, I just never found the time to do.

@d901971
Copy link

d901971 commented Aug 18, 2020

Hi,
I am using

Spark v2.4.0
xgboost4j-spark-0.90.jar
xgboost4j-0.90.jar
pyspark-xgboost_0.90_261ab52e07bec461c711d209b70428ab481db470.zip

When i include "maxBins" as one of the parameters:

paramMap = { “evalMetric”: ‘auc’,
“numRound”: 5,
“eta”: 0.1,
“minChildWeight”: 5.0,
“maxDepth”: 6,
“subsample”: 0.8,
“numWorkers”: nworkers,
“scalePosWeight”: scale_pos_weight,
“missing”: 0.0,
“maxBins”: 20 }

xgb = xgb_est(**paramMap)
.setFeaturesCol(feature_vector)
.setLabelCol(model_label)

I received an error saying "AttributeError: ‘XGBoostClassifier’ object has no attribute ‘maxBins’".
Appreciate if someone can help me.

Thanks
Dave

@amine000
Copy link

@thesuperzapper I know this is very late, but can you let me know how you have generated the .zip archive from the source code, i.e. the pyspark-xgboost_0.90_261ab52e07bec461c711d209b70428ab481db470.zip? I am trying to generate a zip archive for my spark job using your code but slightly modified, but it seems like I am doing it incorrectly since Spark is complaining that the 'ml' module is not found.

@trivialfis
Copy link
Member

Thank you for the work and for joining the design discussion!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet