Skip to content

Commit

Permalink
[SPARK-25047][ML] Can't assign SerializedLambda to scala.Function1 in…
Browse files Browse the repository at this point in the history
… deserialization of BucketedRandomProjectionLSHModel

## What changes were proposed in this pull request?

Convert two function fields in ML classes to simple functions to avoi…d odd SerializedLambda deserialization problem

## How was this patch tested?

Existing tests.

Closes #22032 from srowen/SPARK-25047.

Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
srowen committed Aug 9, 2018
1 parent b2950ce commit 1a7e747
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 29 deletions.
Expand Up @@ -82,14 +82,12 @@ class BucketedRandomProjectionLSHModel private[ml](
override def setOutputCol(value: String): this.type = super.set(outputCol, value)

@Since("2.1.0")
override protected[ml] val hashFunction: Vector => Array[Vector] = {
key: Vector => {
val hashValues: Array[Double] = randUnitVectors.map({
randUnitVector => Math.floor(BLAS.dot(key, randUnitVector) / $(bucketLength))
})
// TODO: Output vectors of dimension numHashFunctions in SPARK-18450
hashValues.map(Vectors.dense(_))
}
override protected[ml] def hashFunction(elems: Vector): Array[Vector] = {
val hashValues = randUnitVectors.map(
randUnitVector => Math.floor(BLAS.dot(elems, randUnitVector) / $(bucketLength))
)
// TODO: Output vectors of dimension numHashFunctions in SPARK-18450
hashValues.map(Vectors.dense(_))
}

@Since("2.1.0")
Expand Down
4 changes: 2 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala
Expand Up @@ -75,7 +75,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]]
* The hash function of LSH, mapping an input feature vector to multiple hash vectors.
* @return The mapping of LSH function.
*/
protected[ml] val hashFunction: Vector => Array[Vector]
protected[ml] def hashFunction(elems: Vector): Array[Vector]

/**
* Calculate the distance between two different keys using the distance metric corresponding
Expand All @@ -97,7 +97,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]]

override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
val transformUDF = udf(hashFunction, DataTypes.createArrayType(new VectorUDT))
val transformUDF = udf(hashFunction(_: Vector), DataTypes.createArrayType(new VectorUDT))
dataset.withColumn($(outputCol), transformUDF(dataset($(inputCol))))
}

Expand Down
20 changes: 9 additions & 11 deletions mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala
Expand Up @@ -60,18 +60,16 @@ class MinHashLSHModel private[ml](
override def setOutputCol(value: String): this.type = super.set(outputCol, value)

@Since("2.1.0")
override protected[ml] val hashFunction: Vector => Array[Vector] = {
elems: Vector => {
require(elems.numNonzeros > 0, "Must have at least 1 non zero entry.")
val elemsList = elems.toSparse.indices.toList
val hashValues = randCoefficients.map { case (a, b) =>
elemsList.map { elem: Int =>
((1L + elem) * a + b) % MinHashLSH.HASH_PRIME
}.min.toDouble
}
// TODO: Output vectors of dimension numHashFunctions in SPARK-18450
hashValues.map(Vectors.dense(_))
override protected[ml] def hashFunction(elems: Vector): Array[Vector] = {
require(elems.numNonzeros > 0, "Must have at least 1 non zero entry.")
val elemsList = elems.toSparse.indices.toList
val hashValues = randCoefficients.map { case (a, b) =>
elemsList.map { elem: Int =>
((1L + elem) * a + b) % MinHashLSH.HASH_PRIME
}.min.toDouble
}
// TODO: Output vectors of dimension numHashFunctions in SPARK-18450
hashValues.map(Vectors.dense(_))
}

@Since("2.1.0")
Expand Down
Expand Up @@ -515,14 +515,13 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine
* The reweight function used to update working labels and weights
* at each iteration of [[IterativelyReweightedLeastSquares]].
*/
val reweightFunc: (OffsetInstance, WeightedLeastSquaresModel) => (Double, Double) = {
(instance: OffsetInstance, model: WeightedLeastSquaresModel) => {
val eta = model.predict(instance.features) + instance.offset
val mu = fitted(eta)
val newLabel = eta - instance.offset + (instance.label - mu) * link.deriv(mu)
val newWeight = instance.weight / (math.pow(this.link.deriv(mu), 2.0) * family.variance(mu))
(newLabel, newWeight)
}
def reweightFunc(
instance: OffsetInstance, model: WeightedLeastSquaresModel): (Double, Double) = {
val eta = model.predict(instance.features) + instance.offset
val mu = fitted(eta)
val newLabel = eta - instance.offset + (instance.label - mu) * link.deriv(mu)
val newWeight = instance.weight / (math.pow(this.link.deriv(mu), 2.0) * family.variance(mu))
(newLabel, newWeight)
}
}

Expand Down

0 comments on commit 1a7e747

Please sign in to comment.