Skip to content

Commit

Permalink
[SPARK-18408][ML] API Improvements for LSH
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

(1) Change output schema to `Array of Vector` instead of `Vectors`
(2) Use `numHashTables` as the dimension of Array
(3) Rename `RandomProjection` to `BucketedRandomProjectionLSH`, `MinHash` to `MinHashLSH`
(4) Make `randUnitVectors/randCoefficients` private
(5) Make Multi-Probe NN Search and `hashDistance` private for future discussion

Saved for future PRs:
(1) AND-amplification and `numHashFunctions` as the dimension of Vector are saved for a future PR.
(2) `hashDistance` and MultiProbe NN Search needs more discussion. The current implementation is just a backward compatible one.

## How was this patch tested?
Related unit tests are modified to make sure the performance of LSH are ensured, and the outputs of the APIs meets expectation.

Author: Yun Ni <yunn@uber.com>
Author: Yunni <Euler57721@gmail.com>

Closes #15874 from Yunni/SPARK-18408-yunn-api-improvements.

(cherry picked from commit 05f7c6f)
Signed-off-by: Joseph K. Bradley <joseph@databricks.com>
  • Loading branch information
Yunni authored and jkbradley committed Nov 28, 2016
1 parent 75d73d1 commit cdf315b
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import org.apache.spark.sql.types.StructType
/**
* :: Experimental ::
*
* Params for [[RandomProjection]].
* Params for [[BucketedRandomProjectionLSH]].
*/
private[ml] trait RandomProjectionParams extends Params {
private[ml] trait BucketedRandomProjectionLSHParams extends Params {

/**
* The length of each hash bucket, a larger bucket lowers the false negative rate. The number of
Expand All @@ -58,8 +58,8 @@ private[ml] trait RandomProjectionParams extends Params {
/**
* :: Experimental ::
*
* Model produced by [[RandomProjection]], where multiple random vectors are stored. The vectors
* are normalized to be unit vectors and each vector is used in a hash function:
* Model produced by [[BucketedRandomProjectionLSH]], where multiple random vectors are stored. The
* vectors are normalized to be unit vectors and each vector is used in a hash function:
* `h_i(x) = floor(r_i.dot(x) / bucketLength)`
* where `r_i` is the i-th random unit vector. The number of buckets will be `(max L2 norm of input
* vectors) / bucketLength`.
Expand All @@ -68,18 +68,19 @@ private[ml] trait RandomProjectionParams extends Params {
*/
@Experimental
@Since("2.1.0")
class RandomProjectionModel private[ml] (
class BucketedRandomProjectionLSHModel private[ml](
override val uid: String,
@Since("2.1.0") val randUnitVectors: Array[Vector])
extends LSHModel[RandomProjectionModel] with RandomProjectionParams {
private[ml] val randUnitVectors: Array[Vector])
extends LSHModel[BucketedRandomProjectionLSHModel] with BucketedRandomProjectionLSHParams {

@Since("2.1.0")
override protected[ml] val hashFunction: (Vector) => Vector = {
override protected[ml] val hashFunction: Vector => Array[Vector] = {
key: Vector => {
val hashValues: Array[Double] = randUnitVectors.map({
randUnitVector => Math.floor(BLAS.dot(key, randUnitVector) / $(bucketLength))
})
Vectors.dense(hashValues)
// TODO: Output vectors of dimension numHashFunctions in SPARK-18450
hashValues.map(Vectors.dense(_))
}
}

Expand All @@ -89,27 +90,29 @@ class RandomProjectionModel private[ml] (
}

@Since("2.1.0")
override protected[ml] def hashDistance(x: Vector, y: Vector): Double = {
override protected[ml] def hashDistance(x: Seq[Vector], y: Seq[Vector]): Double = {
// Since it's generated by hashing, it will be a pair of dense vectors.
x.toDense.values.zip(y.toDense.values).map(pair => math.abs(pair._1 - pair._2)).min
x.zip(y).map(vectorPair => Vectors.sqdist(vectorPair._1, vectorPair._2)).min
}

@Since("2.1.0")
override def copy(extra: ParamMap): this.type = defaultCopy(extra)

@Since("2.1.0")
override def write: MLWriter = new RandomProjectionModel.RandomProjectionModelWriter(this)
override def write: MLWriter = {
new BucketedRandomProjectionLSHModel.BucketedRandomProjectionLSHModelWriter(this)
}
}

/**
* :: Experimental ::
*
* This [[RandomProjection]] implements Locality Sensitive Hashing functions for Euclidean
* distance metrics.
* This [[BucketedRandomProjectionLSH]] implements Locality Sensitive Hashing functions for
* Euclidean distance metrics.
*
* The input is dense or sparse vectors, each of which represents a point in the Euclidean
* distance space. The output will be vectors of configurable dimension. Hash value in the same
* dimension is calculated by the same hash function.
* distance space. The output will be vectors of configurable dimension. Hash values in the
* same dimension are calculated by the same hash function.
*
* References:
*
Expand All @@ -121,8 +124,9 @@ class RandomProjectionModel private[ml] (
*/
@Experimental
@Since("2.1.0")
class RandomProjection(override val uid: String) extends LSH[RandomProjectionModel]
with RandomProjectionParams with HasSeed {
class BucketedRandomProjectionLSH(override val uid: String)
extends LSH[BucketedRandomProjectionLSHModel]
with BucketedRandomProjectionLSHParams with HasSeed {

@Since("2.1.0")
override def setInputCol(value: String): this.type = super.setInputCol(value)
Expand All @@ -131,11 +135,11 @@ class RandomProjection(override val uid: String) extends LSH[RandomProjectionMod
override def setOutputCol(value: String): this.type = super.setOutputCol(value)

@Since("2.1.0")
override def setOutputDim(value: Int): this.type = super.setOutputDim(value)
override def setNumHashTables(value: Int): this.type = super.setNumHashTables(value)

@Since("2.1.0")
def this() = {
this(Identifiable.randomUID("random projection"))
this(Identifiable.randomUID("brp-lsh"))
}

/** @group setParam */
Expand All @@ -147,15 +151,16 @@ class RandomProjection(override val uid: String) extends LSH[RandomProjectionMod
def setSeed(value: Long): this.type = set(seed, value)

@Since("2.1.0")
override protected[this] def createRawLSHModel(inputDim: Int): RandomProjectionModel = {
override protected[this] def createRawLSHModel(
inputDim: Int): BucketedRandomProjectionLSHModel = {
val rand = new Random($(seed))
val randUnitVectors: Array[Vector] = {
Array.fill($(outputDim)) {
Array.fill($(numHashTables)) {
val randArray = Array.fill(inputDim)(rand.nextGaussian())
Vectors.fromBreeze(normalize(breeze.linalg.Vector(randArray)))
}
}
new RandomProjectionModel(uid, randUnitVectors)
new BucketedRandomProjectionLSHModel(uid, randUnitVectors)
}

@Since("2.1.0")
Expand All @@ -169,23 +174,25 @@ class RandomProjection(override val uid: String) extends LSH[RandomProjectionMod
}

@Since("2.1.0")
object RandomProjection extends DefaultParamsReadable[RandomProjection] {
object BucketedRandomProjectionLSH extends DefaultParamsReadable[BucketedRandomProjectionLSH] {

@Since("2.1.0")
override def load(path: String): RandomProjection = super.load(path)
override def load(path: String): BucketedRandomProjectionLSH = super.load(path)
}

@Since("2.1.0")
object RandomProjectionModel extends MLReadable[RandomProjectionModel] {
object BucketedRandomProjectionLSHModel extends MLReadable[BucketedRandomProjectionLSHModel] {

@Since("2.1.0")
override def read: MLReader[RandomProjectionModel] = new RandomProjectionModelReader
override def read: MLReader[BucketedRandomProjectionLSHModel] = {
new BucketedRandomProjectionLSHModelReader
}

@Since("2.1.0")
override def load(path: String): RandomProjectionModel = super.load(path)
override def load(path: String): BucketedRandomProjectionLSHModel = super.load(path)

private[RandomProjectionModel] class RandomProjectionModelWriter(instance: RandomProjectionModel)
extends MLWriter {
private[BucketedRandomProjectionLSHModel] class BucketedRandomProjectionLSHModelWriter(
instance: BucketedRandomProjectionLSHModel) extends MLWriter {

// TODO: Save using the existing format of Array[Vector] once SPARK-12878 is resolved.
private case class Data(randUnitVectors: Matrix)
Expand All @@ -203,20 +210,22 @@ object RandomProjectionModel extends MLReadable[RandomProjectionModel] {
}
}

private class RandomProjectionModelReader extends MLReader[RandomProjectionModel] {
private class BucketedRandomProjectionLSHModelReader
extends MLReader[BucketedRandomProjectionLSHModel] {

/** Checked against metadata when loading model */
private val className = classOf[RandomProjectionModel].getName
private val className = classOf[BucketedRandomProjectionLSHModel].getName

override def load(path: String): RandomProjectionModel = {
override def load(path: String): BucketedRandomProjectionLSHModel = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)

val dataPath = new Path(path, "data").toString
val data = sparkSession.read.parquet(dataPath)
val Row(randUnitVectors: Matrix) = MLUtils.convertMatrixColumnsToML(data, "randUnitVectors")
.select("randUnitVectors")
.head()
val model = new RandomProjectionModel(metadata.uid, randUnitVectors.rowIter.toArray)
val model = new BucketedRandomProjectionLSHModel(metadata.uid,
randUnitVectors.rowIter.toArray)

DefaultParamsReader.getAndSetParams(model, metadata)
model
Expand Down

0 comments on commit cdf315b

Please sign in to comment.