From e6f9f9541f0b00c14b7c5a201b22aeb400eb9f19 Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Thu, 16 Feb 2017 12:54:22 -0800 Subject: [PATCH 1/2] Scala API Change for AND-amplification --- .../feature/BucketedRandomProjectionLSH.scala | 8 +++++--- .../org/apache/spark/ml/feature/LSH.scala | 19 ++++++++++++++++++- .../apache/spark/ml/feature/MinHashLSH.scala | 8 +++++--- .../BucketedRandomProjectionLSHSuite.scala | 9 ++++++++- .../spark/ml/feature/MinHashLSHSuite.scala | 15 +++++++++------ 5 files changed, 45 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala index cbac16345a292..a0141bef935b3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala @@ -79,8 +79,7 @@ class BucketedRandomProjectionLSHModel private[ml]( val hashValues: Array[Double] = randUnitVectors.map({ randUnitVector => Math.floor(BLAS.dot(key, randUnitVector) / $(bucketLength)) }) - // TODO: Output vectors of dimension numHashFunctions in SPARK-18450 - hashValues.map(Vectors.dense(_)) + hashValues.grouped($(numHashFunctions)).map(Vectors.dense).toArray } } @@ -137,6 +136,9 @@ class BucketedRandomProjectionLSH(override val uid: String) @Since("2.1.0") override def setNumHashTables(value: Int): this.type = super.setNumHashTables(value) + @Since("2.2.0") + override def setNumHashFunctions(value: Int): this.type = super.setNumHashFunctions(value) + @Since("2.1.0") def this() = { this(Identifiable.randomUID("brp-lsh")) @@ -155,7 +157,7 @@ class BucketedRandomProjectionLSH(override val uid: String) inputDim: Int): BucketedRandomProjectionLSHModel = { val rand = new Random($(seed)) val randUnitVectors: Array[Vector] = { - Array.fill($(numHashTables)) { + Array.fill($(numHashTables) * $(numHashFunctions)) { val randArray = Array.fill(inputDim)(rand.nextGaussian()) Vectors.fromBreeze(normalize(breeze.linalg.Vector(randArray))) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index 1c9f47a0b201d..3a632b3a5baca 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -43,10 +43,24 @@ private[ml] trait LSHParams extends HasInputCol with HasOutputCol { "tables, where increasing number of hash tables lowers the false negative rate, and " + "decreasing it improves the running performance", ParamValidators.gt(0)) + /** + * Param for the number of hash functions used in LSH AND-amplification. + * + * LSH AND-amplification can be used to reduce the false positive rate. Higher values for this + * param lead to a reduced false positive rate and lower computational complexity. + * @group param + */ + final val numHashFunctions: IntParam = new IntParam(this, "numHashFunctions", "number of hash " + + "functions, where increasing number of hash functions lowers the false positive rate, and " + + "decreasing it improves the false negative rate", ParamValidators.gt(0)) + /** @group getParam */ final def getNumHashTables: Int = $(numHashTables) - setDefault(numHashTables -> 1) + /** @group getParam */ + final def getNumHashFunctions: Int = $(numHashFunctions) + + setDefault(numHashTables -> 1, numHashFunctions -> 1) /** * Transform the Schema for LSH @@ -308,6 +322,9 @@ private[ml] abstract class LSH[T <: LSHModel[T]] /** @group setParam */ def setNumHashTables(value: Int): this.type = set(numHashTables, value) + /** @group setParam */ + def setNumHashFunctions(value: Int): this.type = set(numHashFunctions, value) + /** * Validate and create a new instance of concrete LSHModel. Because different LSHModel may have * different initial setting, developer needs to define how their LSHModel is created instead of diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index 620e1fbb09ff7..4c8ab330d13ed 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -61,8 +61,7 @@ class MinHashLSHModel private[ml]( ((1 + elem) * a + b) % MinHashLSH.HASH_PRIME }.min.toDouble } - // TODO: Output vectors of dimension numHashFunctions in SPARK-18450 - hashValues.map(Vectors.dense(_)) + hashValues.grouped($(numHashFunctions)).map(Vectors.dense).toArray } } @@ -119,6 +118,9 @@ class MinHashLSH(override val uid: String) extends LSH[MinHashLSHModel] with Has @Since("2.1.0") override def setNumHashTables(value: Int): this.type = super.setNumHashTables(value) + @Since("2.2.0") + override def setNumHashFunctions(value: Int): this.type = super.setNumHashFunctions(value) + @Since("2.1.0") def this() = { this(Identifiable.randomUID("mh-lsh")) @@ -133,7 +135,7 @@ class MinHashLSH(override val uid: String) extends LSH[MinHashLSHModel] with Has require(inputDim <= MinHashLSH.HASH_PRIME, s"The input vector dimension $inputDim exceeds the threshold ${MinHashLSH.HASH_PRIME}.") val rand = new Random($(seed)) - val randCoefs: Array[(Int, Int)] = Array.fill($(numHashTables)) { + val randCoefs: Array[(Int, Int)] = Array.fill($(numHashTables) * $(numHashFunctions)) { (1 + rand.nextInt(MinHashLSH.HASH_PRIME - 1), rand.nextInt(MinHashLSH.HASH_PRIME - 1)) } new MinHashLSHModel(uid, randCoefs) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala index ab937685a555c..9c5929fce156e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala @@ -52,6 +52,7 @@ class BucketedRandomProjectionLSHSuite test("BucketedRandomProjectionLSH: default params") { val brp = new BucketedRandomProjectionLSH assert(brp.getNumHashTables === 1.0) + assert(brp.getNumHashFunctions === 1.0) } test("read/write") { @@ -85,6 +86,7 @@ class BucketedRandomProjectionLSHSuite test("BucketedRandomProjectionLSH: randUnitVectors") { val brp = new BucketedRandomProjectionLSH() .setNumHashTables(20) + .setNumHashFunctions(10) .setInputCol("keys") .setOutputCol("values") .setBucketLength(1.0) @@ -119,6 +121,7 @@ class BucketedRandomProjectionLSHSuite // Project from 100 dimensional Euclidean Space to 10 dimensions val brp = new BucketedRandomProjectionLSH() .setNumHashTables(10) + .setNumHashFunctions(5) .setInputCol("keys") .setOutputCol("values") .setBucketLength(2.5) @@ -133,7 +136,8 @@ class BucketedRandomProjectionLSHSuite val key = Vectors.dense(1.2, 3.4) val brp = new BucketedRandomProjectionLSH() - .setNumHashTables(2) + .setNumHashTables(8) + .setNumHashFunctions(2) .setInputCol("keys") .setOutputCol("values") .setBucketLength(4.0) @@ -150,6 +154,7 @@ class BucketedRandomProjectionLSHSuite val brp = new BucketedRandomProjectionLSH() .setNumHashTables(20) + .setNumHashFunctions(10) .setInputCol("keys") .setOutputCol("values") .setBucketLength(1.0) @@ -182,6 +187,7 @@ class BucketedRandomProjectionLSHSuite val dataset2 = spark.createDataFrame(data2.map(Tuple1.apply)).toDF("keys") val brp = new BucketedRandomProjectionLSH() + .setNumHashTables(8) .setNumHashTables(2) .setInputCol("keys") .setOutputCol("values") @@ -200,6 +206,7 @@ class BucketedRandomProjectionLSHSuite val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") val brp = new BucketedRandomProjectionLSH() + .setNumHashTables(8) .setNumHashTables(2) .setInputCol("keys") .setOutputCol("values") diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala index 3461cdf82460f..8bac0b628136a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala @@ -44,8 +44,9 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa } test("MinHashLSH: default params") { - val rp = new MinHashLSH - assert(rp.getNumHashTables === 1.0) + val mh = new MinHashLSH + assert(mh.getNumHashTables === 1.0) + assert(mh.getNumHashFunctions === 1.0) } test("read/write") { @@ -109,7 +110,8 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa test("approxNearestNeighbors for min hash") { val mh = new MinHashLSH() - .setNumHashTables(20) + .setNumHashTables(64) + .setNumHashFunctions(2) .setInputCol("keys") .setOutputCol("values") .setSeed(12345) @@ -119,8 +121,8 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa val (precision, recall) = LSHTest.calculateApproxNearestNeighbors(mh, dataset, key, 20, singleProbe = true) - assert(precision >= 0.7) - assert(recall >= 0.7) + assert(precision >= 0.6) + assert(recall >= 0.6) } test("approxNearestNeighbors for numNeighbors <= 0") { @@ -149,7 +151,8 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa val df2 = spark.createDataFrame(data2.map(Tuple1.apply)).toDF("keys") val mh = new MinHashLSH() - .setNumHashTables(20) + .setNumHashTables(64) + .setNumHashFunctions(2) .setInputCol("keys") .setOutputCol("values") .setSeed(12345) From 83a155699df4b15f1ab1fc427730613b63f7d1d6 Mon Sep 17 00:00:00 2001 From: Yunni Date: Sun, 26 Feb 2017 23:04:37 -0500 Subject: [PATCH 2/2] Fix typos in unit tests --- .../spark/ml/feature/BucketedRandomProjectionLSHSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala index 9c5929fce156e..2497e8f4f6c62 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala @@ -187,7 +187,7 @@ class BucketedRandomProjectionLSHSuite val dataset2 = spark.createDataFrame(data2.map(Tuple1.apply)).toDF("keys") val brp = new BucketedRandomProjectionLSH() - .setNumHashTables(8) + .setNumHashFunctions(4) .setNumHashTables(2) .setInputCol("keys") .setOutputCol("values") @@ -206,14 +206,14 @@ class BucketedRandomProjectionLSHSuite val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") val brp = new BucketedRandomProjectionLSH() - .setNumHashTables(8) + .setNumHashFunctions(4) .setNumHashTables(2) .setInputCol("keys") .setOutputCol("values") .setBucketLength(4.0) .setSeed(12345) - val (precision, recall) = LSHTest.calculateApproxSimilarityJoin(brp, df, df, 3.0) + val (precision, recall) = LSHTest.calculateApproxSimilarityJoin(brp, df, df, 2.0) assert(precision == 1.0) assert(recall >= 0.7) }