-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-29224][ML]Implement Factorization Machines as a ml-pipeline component #26124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@srowen I don't know how to change PR target branch, so I open a new PR. I have checkout branch base master(spark 3.0.0), and implement FM python api. |
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks promising
| import org.apache.spark.storage.StorageLevel | ||
|
|
||
| /** | ||
| * Params for Factorization Machines |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any paper you can link to to explain the implementation? or, just a few paragraphs about what the implementation does?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any paper you can link to to explain the implementation? or, just a few paragraphs about what the implementation does?
FM paper: S. Rendle, “Factorization machines,” in Proceedings of IEEE International Conference on Data Mining (ICDM), pp. 995–1000, 2010.
https://www.csie.ntu.edu.tw/~b97053/paper/Rendle2010FM.pdf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I mean, this should be in the docs along with a brief explanation of what it does ... enough that an informed reader understands how the parameters you expose map to the algorithm in the paper (which is probably straightforward)
mllib/src/main/scala/org/apache/spark/ml/regression/FactorizationMachines.scala
Outdated
Show resolved
Hide resolved
| */ | ||
| @Since("3.0.0") | ||
| final val regParam: DoubleParam = new DoubleParam(this, "regParam", | ||
| "regularization for L2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be clarified a bit: strength of regularization? does this correspond to some parameter in a paper, some 'alpha'? etc . You can add a validator to ensure it's not negative, too. See some other DoubleParam for an example.
| * @group param | ||
| */ | ||
| @Since("3.0.0") | ||
| final val miniBatchFraction: DoubleParam = new DoubleParam(this, "miniBatchFraction", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, minibatch of what? and can be validated to be nonnegative/positive?
mllib/src/main/scala/org/apache/spark/ml/regression/FactorizationMachines.scala
Outdated
Show resolved
Hide resolved
| (if ($(fitLinear)) Array.fill(numFeatures)(0.0) else Array.empty[Double]) ++ | ||
| (if ($(fitBias)) Array.fill(1)(0.0) else Array.empty[Double])) | ||
|
|
||
| val data = instances.map{ case OldLabeledPoint(label, features) => (label, features) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: space before brace
| } | ||
| (0 until numFactors).foreach { f => | ||
| var sumSquare = 0.0 | ||
| var squareSum = 0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this looks like a 'sum' not squared sum.
| } | ||
|
|
||
| private[ml] class AdamWUpdater(weightSize: Int) extends Updater with Logging { | ||
| var beta1: Double = 0.9 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can these be val?
| @Since("3.0.0") | ||
| final override val loss: Param[String] = new Param[String](this, "loss", "The loss function to" + | ||
| s" be optimized. Supported options: ${supportedLosses.mkString(", ")}. (Default logisticLoss)", | ||
| ParamValidators.inArray[String](supportedLosses)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for a log-loss, should we put it into ml.classification?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for a log-loss, should we put it into
ml.classification?
I will add a FM Classifier later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the question is then, do you want to add log-loss later? it won't make sense for a regression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen When loss is logloss, labels must in {0, 1}. So, I am planning to add FMClassifier for logloss, original FactorizationMachines changes to FMRegressor for mse, then remove loss parameter. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would implement it all in one pull request then. My concern is that we're coming up on Spark 3 and we would not want to release part of this only to change the API after.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will implement Classifier and Regressor in this PR. Thank you for your suggest.
|
I already resolve change requested. Then I will add the FM classifier (logloss in FM), which needs a few days. |
| import FactorizationMachines._ | ||
|
|
||
| /** | ||
| * Param for dimensionality of the factors (>= 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Must be greater than 0 right? the check below is correct
| */ | ||
| @Since("3.0.0") | ||
| final val numFactors: IntParam = new IntParam(this, "numFactors", | ||
| "dimensionality of the factor vectors, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the doc strings should start with a Capital
| */ | ||
| @Since("3.0.0") | ||
| final val regParam: DoubleParam = new DoubleParam(this, "regParam", | ||
| "the parameter of l2-regularization term, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: l2 -> L2
Nit: can we say "strength" rather than "parameter" to clarify a little?
Rather than describe what L2 regularization is, what is being regularized? i'm just looking for a tiny bit more specificity.
| */ | ||
| @Since("3.0.0") | ||
| def setLoss(value: String): this.type = set(loss, value) | ||
| setDefault(loss -> LogisticLoss) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm just not familiar here, but if this is a regressor, can you even use logistic loss?
| (if ($(fitBias)) 1 else 0) | ||
| val initialCoefficients = | ||
| Vectors.dense(Array.fill($(numFactors) * numFeatures)(Random.nextGaussian() * $(initStd)) ++ | ||
| (if ($(fitLinear)) Array.fill(numFeatures)(0.0) else Array.empty[Double]) ++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: where you call Array.fill(...)(0.0) just new Array[Double](...). It doesn't matter much but no need to set the array to 0 as this is already its value. I dont' feel strongly about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another nit, Array.emptyDoubleArray
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another nit,
Array.emptyDoubleArray
The difference between Array.emptyDoubleArray and Array.empty[Double] is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another nit,
Array.emptyDoubleArray
Array.emptyDoubleArray is a val, Array.empty[Double] will create a new object. So Array.emptyDoubleArray is more efficient, right?
zhengruifeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I think it maybe better to move the impl to ml.regression as a private class, and add wrappers in both ml.regression & ml.classification.
And I think we need more suites, (such as withBias/withoutBias/withLinear/withoutLinear). since it will be an important impl.
| train(dataset, handlePersistence) | ||
| } | ||
|
|
||
| protected[spark] def train( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we do not need to define this function. Other impls have this function for call in the .mllib side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we do not need to define this function. Other impls have this function for call in the
.mllibside.
Yes, ml's Estimator use .fit() public function (not .train()). But in .fit() source code, it will do something then call a train function, so I need to implement it. The train function is not a public function, users can only use fit function.
| (if ($(fitBias)) 1 else 0) | ||
| val initialCoefficients = | ||
| Vectors.dense(Array.fill($(numFactors) * numFeatures)(Random.nextGaussian() * $(initStd)) ++ | ||
| (if ($(fitLinear)) Array.fill(numFeatures)(0.0) else Array.empty[Double]) ++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another nit, Array.emptyDoubleArray
| case AdamW => new AdamWUpdater(coefficientsSize) | ||
| } | ||
|
|
||
| val optimizer = new GradientDescent(gradient, updater) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, If both FM and MLP still need mini-batch solver, we may move it to the .ml side in the future, to avoid vector conversion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, If both FM and MLP still need mini-batch solver, we may move it to the .ml side in the future, to avoid vector conversion.
I agree, and I think GradientDescent supports aggregateDepth parameter, seed parameter, weightCol parameter and logInfo loss value per epoch will be better.
| @Since("3.0.0") | ||
| class FactorizationMachinesModel ( | ||
| @Since("3.0.0") override val uid: String, | ||
| @Since("3.0.0") val coefficients: OldVector, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer a ml.vector here, just call .asML method.
|
|
||
| protected def getLoss(rawPrediction: Double, label: Double): Double | ||
|
|
||
| private val sumVX = Array.fill(numFactors)(0.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this cause potential thread safety issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this cause potential thread safety issue?
I think it is thread safety.
The GD calculates the gradient code: (spark/mllib/optimization/GradientDescent.scala: 239)
GD will use treeAggregate to calculate gradient.
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
})treeAggregate code: (spark/rdd/RDD.scala: 1201)
treeAggregate will call mapPartitions to calculate local gradient pre partition. And in a partition, it will run it.aggregate.
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))scala aggregate code: (scala/collection/TraversableOnce.scala: 214)
aggregate only use seqop, I think it means cumulative gradient will be calculated sequentially in a partition.
def aggregate[B](z: =>B)(seqop: (B, A) => B, combop: (B, B) => B): B = foldLeft(z)(seqop)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, method getRawPrediction is not only used in training, but also used in prediction. What if user call method predict concurrently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, method
getRawPredictionis not only used in training, but also used in prediction. What if user call methodpredictconcurrently?
When user call method predict, val sumVX will not be used. (sumVX is only used when calculate gradient)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I will change it to avoid more worry.
| } | ||
|
|
||
| private[ml] class AdamWUpdater(weightSize: Int) extends Updater with Logging { | ||
| val beta1: Double = 0.9 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these params are not exposed to end users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these params are not exposed to end users?
I also considered this. But I was warried about that if someone adds better solver, then the solver's parameters will be too much. And in AdamW, these parameters almost not be tuning. So I am not exposed these parameters for now.
|
In practice I am using FM/FFM, and IMHO SSP or ASYNC solvers (like Difacto/PS-lite) seems more efficient than mini-batch solvers. |
|
I already resolved change requested. And I splited FM to FMClassifier and FMRegressor. |
| extends ProbabilisticClassifier[Vector, FMClassifier, FMClassifierModel] | ||
| with FMClassifierParams with DefaultParamsWritable with Logging { | ||
|
|
||
| import org.apache.spark.ml.regression.BaseFactorizationMachinesGradient.{LogisticLoss, parseLoss} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put those import above?
| train(dataset, handlePersistence) | ||
| } | ||
|
|
||
| protected[spark] def train( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think that this method is not needed, just create a singe def train(dataset: Dataset[_]): FMClassifierModel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think that this method is not needed, just create a singe
def train(dataset: Dataset[_]): FMClassifierModel?
@zhengruifeng I refer to the implementation of LR. It will handle persistence. If dataset not be persisted, it would persist the dataset, and release the cache after train finish.
Or should I cache dataset regardless of whether the dataset be cached?
LogisticRegression.scala: 481
override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = {
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
train(dataset, handlePersistence)
}
protected[spark] def train(
dataset: Dataset[_],
handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr =>
val instances = extractInstances(dataset)
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
// train model code
if (handlePersistence) instances.unpersist()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but this method def train(dataset: Dataset[_], handlePersistence: Boolean) is added for the old mllib.LogisticRegression.
As a new alg, FM do not need to have this method.
Or should I cache dataset regardless of whether the dataset be cached?
you should cache the dataset, refer to LinearSVC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
override protected[spark] def train(dataset: Dataset[_]): FMClassifierModel = {
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
val data: RDD[(Double, OldVector)] = ...
if (handlePersistence) data.persist(StorageLevel.MEMORY_AND_DISK)
...
}| .setRegParam($(regParam)) | ||
| .setMiniBatchFraction($(miniBatchFraction)) | ||
| .setConvergenceTol($(tol)) | ||
| val coefficients = optimizer.optimize(data, initialCoefficients) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the real training dataset is data not instances, why not cache data instead?
| extends ProbabilisticClassificationModel[Vector, FMClassifierModel] | ||
| with FMClassifierParams with MLWritable { | ||
|
|
||
| import org.apache.spark.ml.regression.BaseFactorizationMachinesGradient.LogisticLoss |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can put this import above
| * [i * numFactors + f] denotes i-th feature and f-th factor | ||
| * Following indices are 1-way coefficients and global bias. | ||
| */ | ||
| private val oldCoefficients: OldVector = coefficients |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can mark it lazy and transient?
| final def getInitStd: Double = $(initStd) | ||
|
|
||
| /** String name for "gd". */ | ||
| private[ml] val GD = "gd" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think GD, AdamW, supportedSolvers should be defined in an object
| with FMRegressorParams with DefaultParamsWritable with Logging { | ||
|
|
||
| import org.apache.spark.ml.regression.BaseFactorizationMachinesGradient.{SquaredError, parseLoss} | ||
| import org.apache.spark.ml.regression.FMRegressor.initCoefficients |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| */ | ||
| @Since("3.0.0") | ||
| def setMiniBatchFraction(value: Double): this.type = { | ||
| require(value > 0 && value <= 1.0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ParamValidators.inRange(0, 1, false, true) already checks input value
| .setRegParam($(regParam)) | ||
| .setMiniBatchFraction($(miniBatchFraction)) | ||
| .setConvergenceTol($(tol)) | ||
| val coefficients = optimizer.optimize(data, initialCoefficients) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
mllib/src/test/scala/org/apache/spark/ml/classification/FMClassifierSuite.scala
Show resolved
Hide resolved
|
Looking reasonable to me. @zhengruifeng ? |
|
LGTM. @mob-ai You can open another ticket for doc & examples, after this pr get merged. |
ok, "another ticket" means "another PR"? |
|
You can make another PR for the same JIRA, or really, just add it here. |
|
add examples and docs |
|
#26124 (comment) |
|
I added a new section for FM to describe the background of FM. And FM examples are placed in classification section and regression section. |
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking pretty good
docs/ml-classification-regression.md
Outdated
|
|
||
| Factorization machines are able to estimate interactions between features even in problems with huge sparsity. | ||
| For more background and more details about the implementation of factorization machines, | ||
| refer to [Factorization Machines section](ml-classification-regression.html#factorization-machines). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to -> to the
docs/ml-classification-regression.md
Outdated
|
|
||
| The following examples load a dataset in LibSVM format, split it into training and test sets, | ||
| train on the first dataset, and then evaluate on the held-out test set. | ||
| We scale features between 0 and 1 to prevent the exploding gradient problem. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
between -> to be between
| # Factorization Machines | ||
|
|
||
| [Factorization Machines](https://www.csie.ntu.edu.tw/~b97053/paper/Rendle2010FM.pdf) are able to estimate interactions | ||
| between features even in problems with huge sparsity (like advertising and recommendation system). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You maybe don't have to repeat this sentence 3 times. You can omit it above?
docs/ml-classification-regression.md
Outdated
| \sum\limits^n_{i=1} \sum\limits^n_{j=i+1} \langle v_i, v_j \rangle x_i x_j | ||
| $$ | ||
|
|
||
| First two terms denote intercept and linear term (as same as linear regression), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First -> The first
as same as -> same as in
docs/ml-classification-regression.md
Outdated
| $$ | ||
|
|
||
| First two terms denote intercept and linear term (as same as linear regression), | ||
| and last term denotes pairwise interactions term. $$v_i$$ describes the i-th variable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
last -> the last
docs/ml-classification-regression.md
Outdated
| and last term denotes pairwise interactions term. $$v_i$$ describes the i-th variable | ||
| with k factors. | ||
|
|
||
| FM can be used directly as the regression and optimization criterion is mean square error. FM also can be used as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"used directly for regression because the optimization criterion is mean squared error"?
docs/ml-classification-regression.md
Outdated
| with k factors. | ||
|
|
||
| FM can be used directly as the regression and optimization criterion is mean square error. FM also can be used as | ||
| the binary classification through wrap sigmoid function, the optimization criterion is logit loss. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the binary -> for binary
What is a wrap sigmoid function.
, the optimization -> . The optimization criterion
Is it "logistic loss" BTW?
docs/ml-classification-regression.md
Outdated
| This equation has only linear complexity in both k and n - i.e. its computation is in $$O(kn)$$. | ||
|
|
||
| In general, in order to prevent the exploding gradient problem, it is best to scale continuous features between 0 and 1, | ||
| or bin the continuous features and one-hot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one-hot encode them
|
update fm docs |
|
Merged to master |
|
Ack, wait, this didn't pass tests after the last commit. Let me monitor |
|
all the PR builders are broken now, with pyspark ML test failures. I'm reverting it to unblock other PRs. Please re-open it later, thanks! |
|
Thanks, all! |
|
Ah OK that's the reason of pyspark failure. Nice finding and thanks for the quick action! |
|
@srowen I used following command to run pyspark tests. And I fixed FM python doc error. I'm very sorry about the mistake. I added a new commit, but not display in this PR. What should I do? |
|
@mob-ai |
thanks! |
|
@cloud-fan yep this was entirely my mistake - looked at the wrong PR tab open in my browser to see if it had passed and merged it when it had not. My bad, I meant to go check and revert if needed this morning but you beat me to it |
…omponent ### What changes were proposed in this pull request? Implement Factorization Machines as a ml-pipeline component 1. loss function supports: logloss, mse 2. optimizer: GD, adamW ### Why are the changes needed? Factorization Machines is widely used in advertising and recommendation system to estimate CTR(click-through rate). Advertising and recommendation system usually has a lot of data, so we need Spark to estimate the CTR, and Factorization Machines are common ml model to estimate CTR. References: 1. S. Rendle, “Factorization machines,” in Proceedings of IEEE International Conference on Data Mining (ICDM), pp. 995–1000, 2010. https://www.csie.ntu.edu.tw/~b97053/paper/Rendle2010FM.pdf ### Does this PR introduce any user-facing change? No ### How was this patch tested? run unit tests Closes apache#26124 from mob-ai/ml/fm. Authored-by: zhanjf <zhanjf@mob.com> Signed-off-by: Sean Owen <srowen@gmail.com>
What changes were proposed in this pull request?
Implement Factorization Machines as a ml-pipeline component
Why are the changes needed?
Factorization Machines is widely used in advertising and recommendation system to estimate CTR(click-through rate).
Advertising and recommendation system usually has a lot of data, so we need Spark to estimate the CTR, and Factorization Machines are common ml model to estimate CTR.
References:
https://www.csie.ntu.edu.tw/~b97053/paper/Rendle2010FM.pdf
Does this PR introduce any user-facing change?
No
How was this patch tested?
run unit tests